This commit is contained in:
苏尹岚
2020-10-27 11:53:46 +08:00
parent a7032ae943
commit 55dddcbb75
4 changed files with 115 additions and 28 deletions

View File

@@ -32,12 +32,16 @@ var (
DayTimeEnd time.Time
WeekTimeBegin time.Time
WeekTimeEnd time.Time
JobTimerMap map[int64]*time.Timer
)
func init() {
DayTimeBegin = utils.Str2Time(utils.Time2Str(utils.Time2Date(time.Now())) + " 00:00:00")
DayTimeEnd = utils.Str2Time(utils.Time2Str(utils.Time2Date(time.Now())) + " 23:59:59")
WeekTimeBegin, WeekTimeEnd = getWeekTime()
JobTimerMap = make(map[int64]*time.Timer)
}
func getWeekTime() (weekTimeBegin, weekTimeEnd time.Time) {
@@ -251,7 +255,10 @@ func AcceptJob(ctx *jxcontext.Context, jobID int) (errCode string, err error) {
}
dao.Commit(db)
//任务限时完成
defer checkLimitJobOrders(db, job, jobOrder)
defer func() {
timer := checkLimitJobOrders(db, job, jobOrder)
JobTimerMap[jobOrder.JobOrderID] = timer
}()
//特殊任务,如美团会员,是直接要支付
if jobID == model.JobIDMtMembers {
userBill, err := dao.GetUserBill(db, ctx.GetUserID(), "")
@@ -336,23 +343,43 @@ func checkJobOrders(db *dao.DaoDB, jobID int, statusCompareStr, userID string, f
return len(jobOrders), err
}
func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) {
utils.AfterFuncWithRecover(time.Hour*time.Duration(job.JobLimitAt), func() {
utils.CallFuncAsync(func() {
func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) *time.Timer {
// timer := time.NewTimer(time.Hour * time.Duration(job.JobLimitAt))
timer := time.NewTimer(time.Minute * 3)
//插到定时任务表里,主要是重启项目后的重启定时器用
jobTimer := &model.JobTimer{
JobID: job.ID,
JobOrderID: jobOrder.JobOrderID,
Type: model.JobTimerTypeAccept,
Status: model.JobTimerStatusWait,
StartAt: jobOrder.CreatedAt,
LimitAt: job.JobLimitAt,
}
dao.WrapAddIDCULEntity(jobTimer, jxcontext.AdminCtx.GetUserName())
dao.CreateEntity(db, jobTimer)
// utils.AfterFuncWithRecover(time.Hour*time.Duration(job.JobLimitAt), func() {
utils.CallFuncAsync(func() {
select {
case <-timer.C:
globals.SugarLogger.Debugf("checkLimitJobOrders jobID: %v, jobOrderID: %v", job.ID, jobOrder.JobOrderID)
jobOrders, _ := dao.GetJobOrdersNoPage(db, job.ID, jobOrder.JobOrderID, job.UserID, "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil)
defer timer.Stop()
jobOrders, _ := dao.GetJobOrdersNoPage(db, job.ID, jobOrder.JobOrderID, jobOrder.UserID, "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil)
if len(jobOrders) == 0 {
return
}
jobOrder := jobOrders[0]
if jobOrder.Status == model.JobOrderStatusCancel {
if jobOrder.Status > model.JobOrderStatusAccept {
return
}
jobOrder.Status = model.JobOrderStatusCancel
dao.UpdateEntity(db, jobOrder, "Status")
})
if _, err := dao.UpdateEntity(db, jobOrder, "Status"); err == nil {
jobTimer.Status = model.JobTimerStatusFinish
dao.UpdateEntity(db, jobTimer, "Status")
}
}
})
return timer
// })
}
func checkLimitAuditJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) {
@@ -396,8 +423,19 @@ func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) {
jobOrder2.SubmitAuditAt = time.Now()
jobOrder2.Status = model.JobOrderStatusWaitAudit
if _, err = dao.UpdateEntity(db, jobOrder2, "Img", "Content", "SubmitAuditAt", "Status"); err == nil {
//审核定时
checkLimitAuditJobOrders(db, job, jobOrder2)
//任务定时器停止
JobTimerMap[jobOrder2.JobOrderID].Stop()
//任务定时表状态完成
jobTimer := &model.JobTimer{
JobID: job.ID,
JobOrderID: jobOrder2.ID,
}
if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID"); err == nil {
jobTimer.Status = model.JobTimerStatusFinish
dao.UpdateEntity(db, jobTimer, "Status")
}
//审核定时开启
// checkLimitAuditJobOrders(db, job, jobOrder2)
}
return err
}
@@ -644,3 +682,14 @@ func CancelJdDelivery(ctx *jxcontext.Context, vendorWaybillID, reason string) (e
func GetJdDelivery(ctx *jxcontext.Context, status int, fromTime, toTime string, pageSize, offset int) (pagedInfo *model.PagedInfo, err error) {
return dao.GetDeliveryOrders(dao.GetDB(), []string{ctx.GetUserID()}, []int{status}, utils.Str2Time(fromTime), utils.Str2Time(toTime), pageSize, offset)
}
func ResetJobTimers() {
var (
db = dao.GetDB()
)
jobTimers, err := dao.GetJobTimers(db, model.JobTimerStatusWait)
if err != nil {
return
}
}

View File

@@ -274,3 +274,18 @@ func GetMtMember(db *DaoDB) (mtMember *model.MtMember, err error) {
err = GetRow(db, &mtMember, sql, sqlParams)
return mtMember, err
}
func GetJobTimers(db *DaoDB, status int) (jobTimers []*model.JobTimer, err error) {
sql := `
SELECT *
FROM job_timer
WHERE 1 = 1
`
sqlParams := []interface{}{}
if status != -1 {
sql += ` AND status = ?`
sqlParams = append(sqlParams, status)
}
err = GetRows(db, &jobTimers, sql, sqlParams)
return jobTimers, err
}

View File

@@ -22,6 +22,12 @@ const (
JobOrderStatusCancel = 115
JobIDMtMembers = 1
JobTimerTypeAccept = 1 //接受任务
JobTimerTypeSubmit = 2 //交任务
JobTimerStatusWait = 0 //正在进行
JobTimerStatusFinish = 1 //定时任务已完成
)
type Job struct {
@@ -122,6 +128,24 @@ func (v *JobOrder) TableIndex() [][]string {
}
}
type JobTimer struct {
ModelIDCUL
JobID int `orm:"column(job_id)" json:"jobID"` //任务ID
JobOrderID int64 `orm:"column(job_order_id)" json:"jobOrderID"` //任务订单号
Type int `json:"type"` //定时任务类型1为接受任务2为提交审核
Status int `json:"status"` //定时任务的状态0表示正在进行1表示已经结束
StartAt time.Time `json:"startAt"` //定时任务开始时间
LimitAt int `json:"limitAt"` //定时任务时长(小时数)
}
func (v *JobTimer) TableIndex() [][]string {
return [][]string{
[]string{"JobID"},
[]string{"JobOrderID"},
}
}
type MtMember struct {
ModelIDCULD

33
main.go
View File

@@ -19,7 +19,6 @@ import (
_ "git.rosy.net.cn/jx-callback/business/auth2/authprovider/mobile"
_ "git.rosy.net.cn/jx-callback/business/auth2/authprovider/password"
_ "git.rosy.net.cn/jx-callback/business/auth2/authprovider/weixin"
"git.rosy.net.cn/jx-callback/business/jxutils/tasks"
_ "git.rosy.net.cn/jx-callback/routers"
)
@@ -81,22 +80,22 @@ func checkCmdFlags() bool {
func main() {
if !checkCmdFlags() {
Init()
if err := tasks.RefreshWeixinToken(); err != nil {
globals.SugarLogger.Errorf("RefreshWeixinToken failed with error:%s", err)
return
}
if err := tasks.RefreshWeixin2Token(); err != nil {
globals.SugarLogger.Errorf("RefreshWeixin2Token failed with error:%s", err)
return
}
if err := tasks.RefreshWeixin3Token(); err != nil {
globals.SugarLogger.Errorf("RefreshWeixin3Token failed with error:%s", err)
return
}
if err := tasks.RefreshPushToken(); err != nil {
globals.SugarLogger.Errorf("RefreshPushToken failed with error:%s", err)
return
}
// if err := tasks.RefreshWeixinToken(); err != nil {
// globals.SugarLogger.Errorf("RefreshWeixinToken failed with error:%s", err)
// return
// }
// if err := tasks.RefreshWeixin2Token(); err != nil {
// globals.SugarLogger.Errorf("RefreshWeixin2Token failed with error:%s", err)
// return
// }
// if err := tasks.RefreshWeixin3Token(); err != nil {
// globals.SugarLogger.Errorf("RefreshWeixin3Token failed with error:%s", err)
// return
// }
// if err := tasks.RefreshPushToken(); err != nil {
// globals.SugarLogger.Errorf("RefreshPushToken failed with error:%s", err)
// return
// }
if beego.BConfig.RunMode != "prod" {
beego.BConfig.WebConfig.DirectoryIndex = true