From 55dddcbb75e33f27b0ac0917bc03b1219fdd5eb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=B0=B9=E5=B2=9A?= <770236076@qq.com> Date: Tue, 27 Oct 2020 11:53:46 +0800 Subject: [PATCH] jobtime --- business/jxstore/cms/job.go | 71 +++++++++++++++++++++++++++++------ business/model/dao/dao_job.go | 15 ++++++++ business/model/job.go | 24 ++++++++++++ main.go | 33 ++++++++-------- 4 files changed, 115 insertions(+), 28 deletions(-) diff --git a/business/jxstore/cms/job.go b/business/jxstore/cms/job.go index 1c0400689..090a34c79 100644 --- a/business/jxstore/cms/job.go +++ b/business/jxstore/cms/job.go @@ -32,12 +32,16 @@ var ( DayTimeEnd time.Time WeekTimeBegin time.Time WeekTimeEnd time.Time + + JobTimerMap map[int64]*time.Timer ) func init() { DayTimeBegin = utils.Str2Time(utils.Time2Str(utils.Time2Date(time.Now())) + " 00:00:00") DayTimeEnd = utils.Str2Time(utils.Time2Str(utils.Time2Date(time.Now())) + " 23:59:59") WeekTimeBegin, WeekTimeEnd = getWeekTime() + + JobTimerMap = make(map[int64]*time.Timer) } func getWeekTime() (weekTimeBegin, weekTimeEnd time.Time) { @@ -251,7 +255,10 @@ func AcceptJob(ctx *jxcontext.Context, jobID int) (errCode string, err error) { } dao.Commit(db) //任务限时完成 - defer checkLimitJobOrders(db, job, jobOrder) + defer func() { + timer := checkLimitJobOrders(db, job, jobOrder) + JobTimerMap[jobOrder.JobOrderID] = timer + }() //特殊任务,如美团会员,是直接要支付 if jobID == model.JobIDMtMembers { userBill, err := dao.GetUserBill(db, ctx.GetUserID(), "") @@ -336,23 +343,43 @@ func checkJobOrders(db *dao.DaoDB, jobID int, statusCompareStr, userID string, f return len(jobOrders), err } -func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) { - utils.AfterFuncWithRecover(time.Hour*time.Duration(job.JobLimitAt), func() { - utils.CallFuncAsync(func() { +func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) *time.Timer { + // timer := time.NewTimer(time.Hour * time.Duration(job.JobLimitAt)) + timer := time.NewTimer(time.Minute * 3) + //插到定时任务表里,主要是重启项目后的重启定时器用 + jobTimer := &model.JobTimer{ + JobID: job.ID, + JobOrderID: jobOrder.JobOrderID, + Type: model.JobTimerTypeAccept, + Status: model.JobTimerStatusWait, + StartAt: jobOrder.CreatedAt, + LimitAt: job.JobLimitAt, + } + dao.WrapAddIDCULEntity(jobTimer, jxcontext.AdminCtx.GetUserName()) + dao.CreateEntity(db, jobTimer) + // utils.AfterFuncWithRecover(time.Hour*time.Duration(job.JobLimitAt), func() { + utils.CallFuncAsync(func() { + select { + case <-timer.C: globals.SugarLogger.Debugf("checkLimitJobOrders jobID: %v, jobOrderID: %v", job.ID, jobOrder.JobOrderID) - jobOrders, _ := dao.GetJobOrdersNoPage(db, job.ID, jobOrder.JobOrderID, job.UserID, "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil) + defer timer.Stop() + jobOrders, _ := dao.GetJobOrdersNoPage(db, job.ID, jobOrder.JobOrderID, jobOrder.UserID, "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil) if len(jobOrders) == 0 { - return } jobOrder := jobOrders[0] - if jobOrder.Status == model.JobOrderStatusCancel { + if jobOrder.Status > model.JobOrderStatusAccept { return } jobOrder.Status = model.JobOrderStatusCancel - dao.UpdateEntity(db, jobOrder, "Status") - }) + if _, err := dao.UpdateEntity(db, jobOrder, "Status"); err == nil { + jobTimer.Status = model.JobTimerStatusFinish + dao.UpdateEntity(db, jobTimer, "Status") + } + } }) + return timer + // }) } func checkLimitAuditJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) { @@ -396,8 +423,19 @@ func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) { jobOrder2.SubmitAuditAt = time.Now() jobOrder2.Status = model.JobOrderStatusWaitAudit if _, err = dao.UpdateEntity(db, jobOrder2, "Img", "Content", "SubmitAuditAt", "Status"); err == nil { - //审核定时 - checkLimitAuditJobOrders(db, job, jobOrder2) + //任务定时器停止 + JobTimerMap[jobOrder2.JobOrderID].Stop() + //任务定时表状态完成 + jobTimer := &model.JobTimer{ + JobID: job.ID, + JobOrderID: jobOrder2.ID, + } + if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID"); err == nil { + jobTimer.Status = model.JobTimerStatusFinish + dao.UpdateEntity(db, jobTimer, "Status") + } + //审核定时开启 + // checkLimitAuditJobOrders(db, job, jobOrder2) } return err } @@ -644,3 +682,14 @@ func CancelJdDelivery(ctx *jxcontext.Context, vendorWaybillID, reason string) (e func GetJdDelivery(ctx *jxcontext.Context, status int, fromTime, toTime string, pageSize, offset int) (pagedInfo *model.PagedInfo, err error) { return dao.GetDeliveryOrders(dao.GetDB(), []string{ctx.GetUserID()}, []int{status}, utils.Str2Time(fromTime), utils.Str2Time(toTime), pageSize, offset) } + +func ResetJobTimers() { + var ( + db = dao.GetDB() + ) + jobTimers, err := dao.GetJobTimers(db, model.JobTimerStatusWait) + if err != nil { + return + } + +} diff --git a/business/model/dao/dao_job.go b/business/model/dao/dao_job.go index 2ac8551f7..f32b38954 100644 --- a/business/model/dao/dao_job.go +++ b/business/model/dao/dao_job.go @@ -274,3 +274,18 @@ func GetMtMember(db *DaoDB) (mtMember *model.MtMember, err error) { err = GetRow(db, &mtMember, sql, sqlParams) return mtMember, err } + +func GetJobTimers(db *DaoDB, status int) (jobTimers []*model.JobTimer, err error) { + sql := ` + SELECT * + FROM job_timer + WHERE 1 = 1 + ` + sqlParams := []interface{}{} + if status != -1 { + sql += ` AND status = ?` + sqlParams = append(sqlParams, status) + } + err = GetRows(db, &jobTimers, sql, sqlParams) + return jobTimers, err +} diff --git a/business/model/job.go b/business/model/job.go index bccbc6866..5e9adfda9 100644 --- a/business/model/job.go +++ b/business/model/job.go @@ -22,6 +22,12 @@ const ( JobOrderStatusCancel = 115 JobIDMtMembers = 1 + + JobTimerTypeAccept = 1 //接受任务 + JobTimerTypeSubmit = 2 //交任务 + + JobTimerStatusWait = 0 //正在进行 + JobTimerStatusFinish = 1 //定时任务已完成 ) type Job struct { @@ -122,6 +128,24 @@ func (v *JobOrder) TableIndex() [][]string { } } +type JobTimer struct { + ModelIDCUL + + JobID int `orm:"column(job_id)" json:"jobID"` //任务ID + JobOrderID int64 `orm:"column(job_order_id)" json:"jobOrderID"` //任务订单号 + Type int `json:"type"` //定时任务类型,1为接受任务,2为提交审核 + Status int `json:"status"` //定时任务的状态,0表示正在进行,1表示已经结束 + StartAt time.Time `json:"startAt"` //定时任务开始时间 + LimitAt int `json:"limitAt"` //定时任务时长(小时数) +} + +func (v *JobTimer) TableIndex() [][]string { + return [][]string{ + []string{"JobID"}, + []string{"JobOrderID"}, + } +} + type MtMember struct { ModelIDCULD diff --git a/main.go b/main.go index ba7083e63..56e8f9acc 100644 --- a/main.go +++ b/main.go @@ -19,7 +19,6 @@ import ( _ "git.rosy.net.cn/jx-callback/business/auth2/authprovider/mobile" _ "git.rosy.net.cn/jx-callback/business/auth2/authprovider/password" _ "git.rosy.net.cn/jx-callback/business/auth2/authprovider/weixin" - "git.rosy.net.cn/jx-callback/business/jxutils/tasks" _ "git.rosy.net.cn/jx-callback/routers" ) @@ -81,22 +80,22 @@ func checkCmdFlags() bool { func main() { if !checkCmdFlags() { Init() - if err := tasks.RefreshWeixinToken(); err != nil { - globals.SugarLogger.Errorf("RefreshWeixinToken failed with error:%s", err) - return - } - if err := tasks.RefreshWeixin2Token(); err != nil { - globals.SugarLogger.Errorf("RefreshWeixin2Token failed with error:%s", err) - return - } - if err := tasks.RefreshWeixin3Token(); err != nil { - globals.SugarLogger.Errorf("RefreshWeixin3Token failed with error:%s", err) - return - } - if err := tasks.RefreshPushToken(); err != nil { - globals.SugarLogger.Errorf("RefreshPushToken failed with error:%s", err) - return - } + // if err := tasks.RefreshWeixinToken(); err != nil { + // globals.SugarLogger.Errorf("RefreshWeixinToken failed with error:%s", err) + // return + // } + // if err := tasks.RefreshWeixin2Token(); err != nil { + // globals.SugarLogger.Errorf("RefreshWeixin2Token failed with error:%s", err) + // return + // } + // if err := tasks.RefreshWeixin3Token(); err != nil { + // globals.SugarLogger.Errorf("RefreshWeixin3Token failed with error:%s", err) + // return + // } + // if err := tasks.RefreshPushToken(); err != nil { + // globals.SugarLogger.Errorf("RefreshPushToken failed with error:%s", err) + // return + // } if beego.BConfig.RunMode != "prod" { beego.BConfig.WebConfig.DirectoryIndex = true