任务定时

This commit is contained in:
苏尹岚
2020-10-27 14:34:38 +08:00
parent 55dddcbb75
commit 5199ed7df4
4 changed files with 128 additions and 96 deletions

View File

@@ -18,6 +18,7 @@ import (
"git.rosy.net.cn/jx-callback/globals/api" "git.rosy.net.cn/jx-callback/globals/api"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/model/dao"
) )
@@ -33,7 +34,8 @@ var (
WeekTimeBegin time.Time WeekTimeBegin time.Time
WeekTimeEnd time.Time WeekTimeEnd time.Time
JobTimerMap map[int64]*time.Timer JobTimerMap map[int64]*time.Timer
JobAuditTimerMap map[int64]*time.Timer
) )
func init() { func init() {
@@ -42,6 +44,7 @@ func init() {
WeekTimeBegin, WeekTimeEnd = getWeekTime() WeekTimeBegin, WeekTimeEnd = getWeekTime()
JobTimerMap = make(map[int64]*time.Timer) JobTimerMap = make(map[int64]*time.Timer)
JobAuditTimerMap = make(map[int64]*time.Timer)
} }
func getWeekTime() (weekTimeBegin, weekTimeEnd time.Time) { func getWeekTime() (weekTimeBegin, weekTimeEnd time.Time) {
@@ -124,13 +127,7 @@ func PublishJob(ctx *jxcontext.Context, job *model.Job) (err error) {
} }
//发布任务要扣除任务总额的保证金,不够扣就要进行充值 //发布任务要扣除任务总额的保证金,不够扣就要进行充值
if err == nil && job.Status != model.JobStatusFailed { if err == nil && job.Status != model.JobStatusFailed {
//1、账户支出增加一条记录 if err = financial.AddExpendUpdateAccount(db, userBill, model.BillTypeDeposit, job.TotalPrice); err != nil {
if err = financial.AddBillExpend(db, userBill.BillID, model.BillTypeDeposit, job.TotalPrice); err != nil {
dao.Rollback(db)
}
//2、账户表余额减少相应值
userBill.AccountBalance -= job.TotalPrice
if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
} }
@@ -160,13 +157,7 @@ func CancelPublishJob(ctx *jxcontext.Context, jobID int) (err error) {
panic(r) panic(r)
} }
}() }()
//1、根据任务剩余数量退钱到账户余额中 if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeJobCancelOverdue, job.SurplusCount*job.AvgPrice); err != nil {
userBill.AccountBalance += job.SurplusCount * job.AvgPrice
if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil {
dao.Rollback(db)
}
//2、账户收入增加一条记录
if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeJobCancelOverdue, job.SurplusCount*job.AvgPrice); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
//3、任务状态被取消 //3、任务状态被取消
@@ -256,7 +247,7 @@ func AcceptJob(ctx *jxcontext.Context, jobID int) (errCode string, err error) {
dao.Commit(db) dao.Commit(db)
//任务限时完成 //任务限时完成
defer func() { defer func() {
timer := checkLimitJobOrders(db, job, jobOrder) timer := checkLimitJobOrders(db, job, jobOrder, model.JobTimerTypeAccept)
JobTimerMap[jobOrder.JobOrderID] = timer JobTimerMap[jobOrder.JobOrderID] = timer
}() }()
//特殊任务,如美团会员,是直接要支付 //特殊任务,如美团会员,是直接要支付
@@ -275,14 +266,8 @@ func AcceptJob(ctx *jxcontext.Context, jobID int) (errCode string, err error) {
panic(r) panic(r)
} }
}() }()
//账户支出明细 //账户支出
//1、账户支出增加一条记录 if err = financial.AddExpendUpdateAccount(db, userBill, model.BillTypeSpJob, job.AvgPrice); err != nil {
if err = financial.AddBillExpend(db, userBill.BillID, model.BillTypeSpJob, job.AvgPrice); err != nil {
dao.Rollback(db)
}
//2、账户表余额减少相应值
userBill.AccountBalance -= job.AvgPrice
if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
dao.Commit(db) dao.Commit(db)
@@ -319,13 +304,8 @@ func CancelAcceptJob(ctx *jxcontext.Context, jobID int, jobOrderID int64) (err e
return fmt.Errorf("未查询到该用户的账单!") return fmt.Errorf("未查询到该用户的账单!")
} }
//如果状态不正常(取消或者过期)就要把这一笔退回去 //如果状态不正常(取消或者过期)就要把这一笔退回去
//1、根据任务剩余数量退钱到任务保证金余额中 //2、账户收入
userBill.AccountBalance += job.AvgPrice if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeJobCancelOverdue, job.AvgPrice); err != nil {
if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil {
dao.Rollback(db)
}
//2、账户收入增加一条记录
if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeJobCancelOverdue, job.AvgPrice); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
//3、任务订单状态被取消 //3、任务订单状态被取消
@@ -343,57 +323,63 @@ func checkJobOrders(db *dao.DaoDB, jobID int, statusCompareStr, userID string, f
return len(jobOrders), err return len(jobOrders), err
} }
func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) *time.Timer { func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder, jobTimerType int) (timer *time.Timer) {
// timer := time.NewTimer(time.Hour * time.Duration(job.JobLimitAt))
timer := time.NewTimer(time.Minute * 3)
//插到定时任务表里,主要是重启项目后的重启定时器用 //插到定时任务表里,主要是重启项目后的重启定时器用
jobTimer := &model.JobTimer{ jobTimer := &model.JobTimer{
JobID: job.ID, JobID: job.ID,
JobOrderID: jobOrder.JobOrderID, JobOrderID: jobOrder.JobOrderID,
Type: model.JobTimerTypeAccept, Type: jobTimerType,
Status: model.JobTimerStatusWait, Status: model.JobTimerStatusWait,
StartAt: jobOrder.CreatedAt, StartAt: jobOrder.CreatedAt,
LimitAt: job.JobLimitAt, LimitAt: job.JobLimitAt,
} }
dao.WrapAddIDCULEntity(jobTimer, jxcontext.AdminCtx.GetUserName()) dao.WrapAddIDCULEntity(jobTimer, jxcontext.AdminCtx.GetUserName())
dao.CreateEntity(db, jobTimer) dao.CreateEntity(db, jobTimer)
// utils.AfterFuncWithRecover(time.Hour*time.Duration(job.JobLimitAt), func() { switch jobTimerType {
case model.JobTimerTypeAccept:
timer = time.NewTimer(time.Hour * time.Duration(job.JobLimitAt))
case model.JobTimerTypeSubmit:
timer = time.NewTimer(time.Hour * time.Duration(job.AuditLimitAt))
}
utils.CallFuncAsync(func() { utils.CallFuncAsync(func() {
select { select {
case <-timer.C: case <-timer.C:
globals.SugarLogger.Debugf("checkLimitJobOrders jobID: %v, jobOrderID: %v", job.ID, jobOrder.JobOrderID) switch jobTimerType {
defer timer.Stop() case model.JobTimerTypeAccept:
jobOrders, _ := dao.GetJobOrdersNoPage(db, job.ID, jobOrder.JobOrderID, jobOrder.UserID, "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil) UpdateLimitJobOrders(db, timer, job.ID, jobOrder, jobTimer)
if len(jobOrders) == 0 { case model.JobTimerTypeSubmit:
return UpdateLimitAuditJobOrders(db, timer, job.ID, jobOrder, jobTimer)
}
jobOrder := jobOrders[0]
if jobOrder.Status > model.JobOrderStatusAccept {
return
}
jobOrder.Status = model.JobOrderStatusCancel
if _, err := dao.UpdateEntity(db, jobOrder, "Status"); err == nil {
jobTimer.Status = model.JobTimerStatusFinish
dao.UpdateEntity(db, jobTimer, "Status")
} }
} }
}) })
return timer return timer
// })
} }
func checkLimitAuditJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) { func UpdateLimitJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrder *model.JobOrder, jobTimer *model.JobTimer) {
utils.AfterFuncWithRecover(time.Hour*time.Duration(job.AuditLimitAt), func() { globals.SugarLogger.Debugf("updateLimitJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrder.JobOrderID)
utils.CallFuncAsync(func() { defer timer.Stop()
globals.SugarLogger.Debugf("checkLimitAuditJobOrders jobID: %v, jobOrderID: %v", job.ID, jobOrder.JobOrderID) if jobOrder.Status > model.JobOrderStatusAccept {
if jobOrder.Status == model.JobOrderStatusWaitAudit { return
err := AuditJob(jxcontext.AdminCtx, int(jobOrder.JobOrderID), model.JobOrderStatusAuditPass, "超时系统通过") }
if err != nil { jobOrder.Status = model.JobOrderStatusCancel
globals.SugarLogger.Debugf("checkLimitAuditJobOrders err: %v jobID: %v, jobOrderID: %v", err, job.ID, jobOrder.JobOrderID) if _, err := dao.UpdateEntity(db, jobOrder, "Status"); err == nil {
} jobTimer.Status = model.JobTimerStatusFinish
} dao.UpdateEntity(db, jobTimer, "Status")
}) }
}) }
func UpdateLimitAuditJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrder *model.JobOrder, jobTimer *model.JobTimer) {
globals.SugarLogger.Debugf("checkLimitAuditJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrder.JobOrderID)
defer timer.Stop()
if jobOrder.Status == model.JobOrderStatusWaitAudit {
err := AuditJob(jxcontext.AdminCtx, int(jobOrder.JobOrderID), model.JobOrderStatusAuditPass, "超时系统通过")
if err != nil {
globals.SugarLogger.Debugf("checkLimitAuditJobOrders err: %v jobID: %v, jobOrderID: %v", err, jobID, jobOrder.JobOrderID)
} else {
jobTimer.Status = model.JobTimerStatusFinish
dao.UpdateEntity(db, jobTimer, "Status")
}
}
} }
func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) { func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) {
@@ -428,14 +414,16 @@ func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) {
//任务定时表状态完成 //任务定时表状态完成
jobTimer := &model.JobTimer{ jobTimer := &model.JobTimer{
JobID: job.ID, JobID: job.ID,
JobOrderID: jobOrder2.ID, JobOrderID: jobOrder2.JobOrderID,
Type: model.JobTimerTypeAccept,
} }
if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID"); err == nil { if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID", "Type"); err == nil {
jobTimer.Status = model.JobTimerStatusFinish jobTimer.Status = model.JobTimerStatusFinish
dao.UpdateEntity(db, jobTimer, "Status") dao.UpdateEntity(db, jobTimer, "Status")
} }
//审核定时开启 //审核定时开启
// checkLimitAuditJobOrders(db, job, jobOrder2) timer := checkLimitJobOrders(db, job, jobOrder2, model.JobTimerTypeSubmit)
JobAuditTimerMap[jobOrder2.JobOrderID] = timer
} }
return err return err
} }
@@ -470,26 +458,15 @@ func AuditJob(ctx *jxcontext.Context, jobOrderID, status int, comment string) (e
dao.Rollback(db) dao.Rollback(db)
} }
if status == model.JobOrderStatusAuditPass { if status == model.JobOrderStatusAuditPass {
//接收人账户增加任务单次 //接收人账户收入
userBillJobOrder, err := dao.GetUserBill(db, jobOrder.UserID, "") userBillJobOrder, err := dao.GetUserBill(db, jobOrder.UserID, "")
userBillJobOrder.AccountBalance += job.AvgPrice if err = financial.AddIncomeUpdateAccount(db, userBillJobOrder, model.BillTypeJob, job.AvgPrice); err != nil {
if _, err = dao.UpdateEntity(db, userBillJobOrder, "AccountBalance"); err != nil {
dao.Rollback(db)
}
//接收人账户收入增加一条
if err = financial.AddBillIncome(db, userBillJobOrder.BillID, model.BillTypeJob, job.AvgPrice); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
} else { } else {
if job.Status < 0 { if job.Status < 0 {
userBill, err := dao.GetUserBill(db, job.UserID, "") userBill, err := dao.GetUserBill(db, job.UserID, "")
//1、根据任务剩余数量退钱到任务保证金余额中 if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeJobAuditUnPassWithCancelOverdue, job.AvgPrice); err != nil {
userBill.AccountBalance += job.AvgPrice
if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil {
dao.Rollback(db)
}
//2、账户收入增加一条记录
if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeJobAuditUnPassWithCancelOverdue, job.AvgPrice); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
} else { } else {
@@ -500,6 +477,18 @@ func AuditJob(ctx *jxcontext.Context, jobOrderID, status int, comment string) (e
} }
} }
dao.Commit(db) dao.Commit(db)
//任务定时器停止
JobAuditTimerMap[int64(jobOrderID)].Stop()
//任务定时表状态完成
jobTimer := &model.JobTimer{
JobID: job.ID,
JobOrderID: jobOrder.JobOrderID,
Type: model.JobTimerTypeSubmit,
}
if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID", "Type"); err == nil {
jobTimer.Status = model.JobTimerStatusFinish
dao.UpdateEntity(db, jobTimer, "Status")
}
return err return err
} }
@@ -612,13 +601,7 @@ func SendJdDelivery(ctx *jxcontext.Context, dOrder *model.DeliveryOrder) (errCod
dao.Rollback(db) dao.Rollback(db)
} }
//账户支出明细 //账户支出明细
//1、账户支出增加一条记录 if err = financial.AddExpendUpdateAccount(db, userBill, model.BillTypeSpJob, dOrder.PayPrice); err != nil {
if err = financial.AddBillExpend(db, userBill.BillID, model.BillTypeSpJob, dOrder.PayPrice); err != nil {
dao.Rollback(db)
}
//2、账户表余额减少相应值
userBill.AccountBalance -= dOrder.PayPrice
if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
dao.Commit(db) dao.Commit(db)
@@ -666,13 +649,7 @@ func CancelJdDelivery(ctx *jxcontext.Context, vendorWaybillID, reason string) (e
if _, err = dao.UpdateEntity(db, dOrder, "Status", "OrderFinishedAt", "Comment"); err != nil { if _, err = dao.UpdateEntity(db, dOrder, "Status", "OrderFinishedAt", "Comment"); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
//1、根据任务剩余数量退钱到任务保证金余额中 if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeSpJob, dOrder.PayPrice); err != nil {
userBill.AccountBalance += dOrder.PayPrice
if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil {
dao.Rollback(db)
}
//2、账户收入增加一条记录
if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeSpJob, dOrder.PayPrice); err != nil {
dao.Rollback(db) dao.Rollback(db)
} }
dao.Commit(db) dao.Commit(db)
@@ -691,5 +668,33 @@ func ResetJobTimers() {
if err != nil { if err != nil {
return return
} }
task := tasksch.NewParallelTask("ResetJobTimers", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(true), jxcontext.AdminCtx,
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
jobTimer := batchItemList[0].(*model.JobTimer)
switch jobTimer.Type {
case model.JobTimerTypeAccept:
timer := time.NewTimer(jobTimer.StartAt.Add(time.Duration(jobTimer.LimitAt) * time.Hour).Sub(time.Now()))
jobOrders, err := dao.GetJobOrdersNoPage(db, jobTimer.JobID, jobTimer.JobOrderID, "", "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil)
if err != nil {
return retVal, err
}
utils.CallFuncAsync(func() {
select {
case <-timer.C:
switch jobTimer.Type {
case model.JobTimerTypeAccept:
UpdateLimitJobOrders(db, timer, jobTimer.JobID, jobOrders[0], jobTimer)
case model.JobTimerTypeSubmit:
UpdateLimitAuditJobOrders(db, timer, jobTimer.JobID, jobOrders[0], jobTimer)
}
}
})
case model.JobTimerTypeSubmit:
default:
return retVal, err
}
return retVal, err
}, jobTimers)
tasksch.HandleTask(task, nil, true).Run()
task.GetID()
} }

View File

@@ -38,3 +38,27 @@ func AddUserBill(db *dao.DaoDB, billID int64, userID string) (err error) {
func GetUserBillDetail(ctx *jxcontext.Context, userID, fromTime, toTime string, pageSize, offset int) (pagedInfo *model.PagedInfo, err error) { func GetUserBillDetail(ctx *jxcontext.Context, userID, fromTime, toTime string, pageSize, offset int) (pagedInfo *model.PagedInfo, err error) {
return dao.GetUserBillDetail(dao.GetDB(), userID, utils.Str2Time(fromTime), utils.Str2Time(toTime), pageSize, offset) return dao.GetUserBillDetail(dao.GetDB(), userID, utils.Str2Time(fromTime), utils.Str2Time(toTime), pageSize, offset)
} }
func AddExpendUpdateAccount(db *dao.DaoDB, userBill *model.UserBill, billType, price int) (err error) {
//1、账户支出增加一条记录
err = AddBillExpend(db, userBill.BillID, billType, price)
if err != nil {
return err
}
//2、账户表余额减少相应值
userBill.AccountBalance -= price
_, err = dao.UpdateEntity(db, userBill, "AccountBalance")
return err
}
func AddIncomeUpdateAccount(db *dao.DaoDB, userBill *model.UserBill, billType, price int) (err error) {
//2、账户收入增加一条记录
err = AddBillIncome(db, userBill.BillID, billType, price)
if err != nil {
return err
}
//1、根据任务剩余数量退钱到账户余额中
userBill.AccountBalance += price
_, err = dao.UpdateEntity(db, userBill, "AccountBalance")
return err
}

View File

@@ -22,6 +22,7 @@ func Init() {
//任务 //任务
orm.RegisterModel(&model.Job{}, &model.JobCategory{}, &model.JobStep{}, &model.JobImg{}) orm.RegisterModel(&model.Job{}, &model.JobCategory{}, &model.JobStep{}, &model.JobImg{})
orm.RegisterModel(&model.JobOrder{}) orm.RegisterModel(&model.JobOrder{})
orm.RegisterModel(&model.JobTimer{})
//聊天 //聊天
orm.RegisterModel(&model.ImMessageRecord{}, &model.MessageGroup{}, &model.MessageGroupMember{}) orm.RegisterModel(&model.ImMessageRecord{}, &model.MessageGroup{}, &model.MessageGroupMember{})

View File

@@ -7,6 +7,8 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"git.rosy.net.cn/jx-callback/business/jxstore/cms"
"github.com/astaxie/beego" "github.com/astaxie/beego"
// 导入缺省订单调度器 // 导入缺省订单调度器
@@ -96,7 +98,7 @@ func main() {
// globals.SugarLogger.Errorf("RefreshPushToken failed with error:%s", err) // globals.SugarLogger.Errorf("RefreshPushToken failed with error:%s", err)
// return // return
// } // }
cms.ResetJobTimers()
if beego.BConfig.RunMode != "prod" { if beego.BConfig.RunMode != "prod" {
beego.BConfig.WebConfig.DirectoryIndex = true beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger" beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"