From 5199ed7df48ec3d23003f14620f14b629039d4a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E5=B0=B9=E5=B2=9A?= <770236076@qq.com> Date: Tue, 27 Oct 2020 14:34:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=AE=9A=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- business/jxstore/cms/job.go | 195 +++++++++++++++-------------- business/jxstore/financial/bill.go | 24 ++++ globals/beegodb/beegodb.go | 1 + main.go | 4 +- 4 files changed, 128 insertions(+), 96 deletions(-) diff --git a/business/jxstore/cms/job.go b/business/jxstore/cms/job.go index 090a34c79..52e6ec5bc 100644 --- a/business/jxstore/cms/job.go +++ b/business/jxstore/cms/job.go @@ -18,6 +18,7 @@ import ( "git.rosy.net.cn/jx-callback/globals/api" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" + "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" ) @@ -33,7 +34,8 @@ var ( WeekTimeBegin time.Time WeekTimeEnd time.Time - JobTimerMap map[int64]*time.Timer + JobTimerMap map[int64]*time.Timer + JobAuditTimerMap map[int64]*time.Timer ) func init() { @@ -42,6 +44,7 @@ func init() { WeekTimeBegin, WeekTimeEnd = getWeekTime() JobTimerMap = make(map[int64]*time.Timer) + JobAuditTimerMap = make(map[int64]*time.Timer) } func getWeekTime() (weekTimeBegin, weekTimeEnd time.Time) { @@ -124,13 +127,7 @@ func PublishJob(ctx *jxcontext.Context, job *model.Job) (err error) { } //发布任务要扣除任务总额的保证金,不够扣就要进行充值 if err == nil && job.Status != model.JobStatusFailed { - //1、账户支出增加一条记录 - if err = financial.AddBillExpend(db, userBill.BillID, model.BillTypeDeposit, job.TotalPrice); err != nil { - dao.Rollback(db) - } - //2、账户表余额减少相应值 - userBill.AccountBalance -= job.TotalPrice - if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil { + if err = financial.AddExpendUpdateAccount(db, userBill, model.BillTypeDeposit, job.TotalPrice); err != nil { dao.Rollback(db) } } @@ -160,13 +157,7 @@ func CancelPublishJob(ctx *jxcontext.Context, jobID int) (err error) { panic(r) } }() - //1、根据任务剩余数量退钱到账户余额中 - userBill.AccountBalance += job.SurplusCount * job.AvgPrice - if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil { - dao.Rollback(db) - } - //2、账户收入增加一条记录 - if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeJobCancelOverdue, job.SurplusCount*job.AvgPrice); err != nil { + if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeJobCancelOverdue, job.SurplusCount*job.AvgPrice); err != nil { dao.Rollback(db) } //3、任务状态被取消 @@ -256,7 +247,7 @@ func AcceptJob(ctx *jxcontext.Context, jobID int) (errCode string, err error) { dao.Commit(db) //任务限时完成 defer func() { - timer := checkLimitJobOrders(db, job, jobOrder) + timer := checkLimitJobOrders(db, job, jobOrder, model.JobTimerTypeAccept) JobTimerMap[jobOrder.JobOrderID] = timer }() //特殊任务,如美团会员,是直接要支付 @@ -275,14 +266,8 @@ func AcceptJob(ctx *jxcontext.Context, jobID int) (errCode string, err error) { panic(r) } }() - //账户支出明细 - //1、账户支出增加一条记录 - if err = financial.AddBillExpend(db, userBill.BillID, model.BillTypeSpJob, job.AvgPrice); err != nil { - dao.Rollback(db) - } - //2、账户表余额减少相应值 - userBill.AccountBalance -= job.AvgPrice - if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil { + //账户支出 + if err = financial.AddExpendUpdateAccount(db, userBill, model.BillTypeSpJob, job.AvgPrice); err != nil { dao.Rollback(db) } dao.Commit(db) @@ -319,13 +304,8 @@ func CancelAcceptJob(ctx *jxcontext.Context, jobID int, jobOrderID int64) (err e return fmt.Errorf("未查询到该用户的账单!") } //如果状态不正常(取消或者过期)就要把这一笔退回去 - //1、根据任务剩余数量退钱到任务保证金余额中 - userBill.AccountBalance += job.AvgPrice - if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil { - dao.Rollback(db) - } - //2、账户收入增加一条记录 - if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeJobCancelOverdue, job.AvgPrice); err != nil { + //2、账户收入 + if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeJobCancelOverdue, job.AvgPrice); err != nil { dao.Rollback(db) } //3、任务订单状态被取消 @@ -343,57 +323,63 @@ func checkJobOrders(db *dao.DaoDB, jobID int, statusCompareStr, userID string, f return len(jobOrders), err } -func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) *time.Timer { - // timer := time.NewTimer(time.Hour * time.Duration(job.JobLimitAt)) - timer := time.NewTimer(time.Minute * 3) +func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder, jobTimerType int) (timer *time.Timer) { //插到定时任务表里,主要是重启项目后的重启定时器用 jobTimer := &model.JobTimer{ JobID: job.ID, JobOrderID: jobOrder.JobOrderID, - Type: model.JobTimerTypeAccept, + Type: jobTimerType, Status: model.JobTimerStatusWait, StartAt: jobOrder.CreatedAt, LimitAt: job.JobLimitAt, } dao.WrapAddIDCULEntity(jobTimer, jxcontext.AdminCtx.GetUserName()) dao.CreateEntity(db, jobTimer) - // utils.AfterFuncWithRecover(time.Hour*time.Duration(job.JobLimitAt), func() { + switch jobTimerType { + case model.JobTimerTypeAccept: + timer = time.NewTimer(time.Hour * time.Duration(job.JobLimitAt)) + case model.JobTimerTypeSubmit: + timer = time.NewTimer(time.Hour * time.Duration(job.AuditLimitAt)) + } utils.CallFuncAsync(func() { select { case <-timer.C: - globals.SugarLogger.Debugf("checkLimitJobOrders jobID: %v, jobOrderID: %v", job.ID, jobOrder.JobOrderID) - defer timer.Stop() - jobOrders, _ := dao.GetJobOrdersNoPage(db, job.ID, jobOrder.JobOrderID, jobOrder.UserID, "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil) - if len(jobOrders) == 0 { - return - } - jobOrder := jobOrders[0] - if jobOrder.Status > model.JobOrderStatusAccept { - return - } - jobOrder.Status = model.JobOrderStatusCancel - if _, err := dao.UpdateEntity(db, jobOrder, "Status"); err == nil { - jobTimer.Status = model.JobTimerStatusFinish - dao.UpdateEntity(db, jobTimer, "Status") + switch jobTimerType { + case model.JobTimerTypeAccept: + UpdateLimitJobOrders(db, timer, job.ID, jobOrder, jobTimer) + case model.JobTimerTypeSubmit: + UpdateLimitAuditJobOrders(db, timer, job.ID, jobOrder, jobTimer) } } }) return timer - // }) } -func checkLimitAuditJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder) { - utils.AfterFuncWithRecover(time.Hour*time.Duration(job.AuditLimitAt), func() { - utils.CallFuncAsync(func() { - globals.SugarLogger.Debugf("checkLimitAuditJobOrders jobID: %v, jobOrderID: %v", job.ID, jobOrder.JobOrderID) - if jobOrder.Status == model.JobOrderStatusWaitAudit { - err := AuditJob(jxcontext.AdminCtx, int(jobOrder.JobOrderID), model.JobOrderStatusAuditPass, "超时系统通过") - if err != nil { - globals.SugarLogger.Debugf("checkLimitAuditJobOrders err: %v jobID: %v, jobOrderID: %v", err, job.ID, jobOrder.JobOrderID) - } - } - }) - }) +func UpdateLimitJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrder *model.JobOrder, jobTimer *model.JobTimer) { + globals.SugarLogger.Debugf("updateLimitJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrder.JobOrderID) + defer timer.Stop() + if jobOrder.Status > model.JobOrderStatusAccept { + return + } + jobOrder.Status = model.JobOrderStatusCancel + if _, err := dao.UpdateEntity(db, jobOrder, "Status"); err == nil { + jobTimer.Status = model.JobTimerStatusFinish + dao.UpdateEntity(db, jobTimer, "Status") + } +} + +func UpdateLimitAuditJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrder *model.JobOrder, jobTimer *model.JobTimer) { + globals.SugarLogger.Debugf("checkLimitAuditJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrder.JobOrderID) + defer timer.Stop() + if jobOrder.Status == model.JobOrderStatusWaitAudit { + err := AuditJob(jxcontext.AdminCtx, int(jobOrder.JobOrderID), model.JobOrderStatusAuditPass, "超时系统通过") + if err != nil { + globals.SugarLogger.Debugf("checkLimitAuditJobOrders err: %v jobID: %v, jobOrderID: %v", err, jobID, jobOrder.JobOrderID) + } else { + jobTimer.Status = model.JobTimerStatusFinish + dao.UpdateEntity(db, jobTimer, "Status") + } + } } func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) { @@ -428,14 +414,16 @@ func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) { //任务定时表状态完成 jobTimer := &model.JobTimer{ JobID: job.ID, - JobOrderID: jobOrder2.ID, + JobOrderID: jobOrder2.JobOrderID, + Type: model.JobTimerTypeAccept, } - if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID"); err == nil { + if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID", "Type"); err == nil { jobTimer.Status = model.JobTimerStatusFinish dao.UpdateEntity(db, jobTimer, "Status") } //审核定时开启 - // checkLimitAuditJobOrders(db, job, jobOrder2) + timer := checkLimitJobOrders(db, job, jobOrder2, model.JobTimerTypeSubmit) + JobAuditTimerMap[jobOrder2.JobOrderID] = timer } return err } @@ -470,26 +458,15 @@ func AuditJob(ctx *jxcontext.Context, jobOrderID, status int, comment string) (e dao.Rollback(db) } if status == model.JobOrderStatusAuditPass { - //接收人账户增加任务单次 + //接收人账户收入 userBillJobOrder, err := dao.GetUserBill(db, jobOrder.UserID, "") - userBillJobOrder.AccountBalance += job.AvgPrice - if _, err = dao.UpdateEntity(db, userBillJobOrder, "AccountBalance"); err != nil { - dao.Rollback(db) - } - //接收人账户收入增加一条 - if err = financial.AddBillIncome(db, userBillJobOrder.BillID, model.BillTypeJob, job.AvgPrice); err != nil { + if err = financial.AddIncomeUpdateAccount(db, userBillJobOrder, model.BillTypeJob, job.AvgPrice); err != nil { dao.Rollback(db) } } else { if job.Status < 0 { userBill, err := dao.GetUserBill(db, job.UserID, "") - //1、根据任务剩余数量退钱到任务保证金余额中 - userBill.AccountBalance += job.AvgPrice - if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil { - dao.Rollback(db) - } - //2、账户收入增加一条记录 - if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeJobAuditUnPassWithCancelOverdue, job.AvgPrice); err != nil { + if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeJobAuditUnPassWithCancelOverdue, job.AvgPrice); err != nil { dao.Rollback(db) } } else { @@ -500,6 +477,18 @@ func AuditJob(ctx *jxcontext.Context, jobOrderID, status int, comment string) (e } } dao.Commit(db) + //任务定时器停止 + JobAuditTimerMap[int64(jobOrderID)].Stop() + //任务定时表状态完成 + jobTimer := &model.JobTimer{ + JobID: job.ID, + JobOrderID: jobOrder.JobOrderID, + Type: model.JobTimerTypeSubmit, + } + if err = dao.GetEntity(db, jobTimer, "JobID", "JobOrderID", "Type"); err == nil { + jobTimer.Status = model.JobTimerStatusFinish + dao.UpdateEntity(db, jobTimer, "Status") + } return err } @@ -612,13 +601,7 @@ func SendJdDelivery(ctx *jxcontext.Context, dOrder *model.DeliveryOrder) (errCod dao.Rollback(db) } //账户支出明细 - //1、账户支出增加一条记录 - if err = financial.AddBillExpend(db, userBill.BillID, model.BillTypeSpJob, dOrder.PayPrice); err != nil { - dao.Rollback(db) - } - //2、账户表余额减少相应值 - userBill.AccountBalance -= dOrder.PayPrice - if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil { + if err = financial.AddExpendUpdateAccount(db, userBill, model.BillTypeSpJob, dOrder.PayPrice); err != nil { dao.Rollback(db) } dao.Commit(db) @@ -666,13 +649,7 @@ func CancelJdDelivery(ctx *jxcontext.Context, vendorWaybillID, reason string) (e if _, err = dao.UpdateEntity(db, dOrder, "Status", "OrderFinishedAt", "Comment"); err != nil { dao.Rollback(db) } - //1、根据任务剩余数量退钱到任务保证金余额中 - userBill.AccountBalance += dOrder.PayPrice - if _, err = dao.UpdateEntity(db, userBill, "AccountBalance"); err != nil { - dao.Rollback(db) - } - //2、账户收入增加一条记录 - if err = financial.AddBillIncome(db, userBill.BillID, model.BillTypeSpJob, dOrder.PayPrice); err != nil { + if err = financial.AddIncomeUpdateAccount(db, userBill, model.BillTypeSpJob, dOrder.PayPrice); err != nil { dao.Rollback(db) } dao.Commit(db) @@ -691,5 +668,33 @@ func ResetJobTimers() { if err != nil { return } - + task := tasksch.NewParallelTask("ResetJobTimers", tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(true), jxcontext.AdminCtx, + func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + jobTimer := batchItemList[0].(*model.JobTimer) + switch jobTimer.Type { + case model.JobTimerTypeAccept: + timer := time.NewTimer(jobTimer.StartAt.Add(time.Duration(jobTimer.LimitAt) * time.Hour).Sub(time.Now())) + jobOrders, err := dao.GetJobOrdersNoPage(db, jobTimer.JobID, jobTimer.JobOrderID, "", "", utils.ZeroTimeValue, utils.ZeroTimeValue, nil) + if err != nil { + return retVal, err + } + utils.CallFuncAsync(func() { + select { + case <-timer.C: + switch jobTimer.Type { + case model.JobTimerTypeAccept: + UpdateLimitJobOrders(db, timer, jobTimer.JobID, jobOrders[0], jobTimer) + case model.JobTimerTypeSubmit: + UpdateLimitAuditJobOrders(db, timer, jobTimer.JobID, jobOrders[0], jobTimer) + } + } + }) + case model.JobTimerTypeSubmit: + default: + return retVal, err + } + return retVal, err + }, jobTimers) + tasksch.HandleTask(task, nil, true).Run() + task.GetID() } diff --git a/business/jxstore/financial/bill.go b/business/jxstore/financial/bill.go index 5ec5bd9f8..70ce3d060 100644 --- a/business/jxstore/financial/bill.go +++ b/business/jxstore/financial/bill.go @@ -38,3 +38,27 @@ func AddUserBill(db *dao.DaoDB, billID int64, userID string) (err error) { func GetUserBillDetail(ctx *jxcontext.Context, userID, fromTime, toTime string, pageSize, offset int) (pagedInfo *model.PagedInfo, err error) { return dao.GetUserBillDetail(dao.GetDB(), userID, utils.Str2Time(fromTime), utils.Str2Time(toTime), pageSize, offset) } + +func AddExpendUpdateAccount(db *dao.DaoDB, userBill *model.UserBill, billType, price int) (err error) { + //1、账户支出增加一条记录 + err = AddBillExpend(db, userBill.BillID, billType, price) + if err != nil { + return err + } + //2、账户表余额减少相应值 + userBill.AccountBalance -= price + _, err = dao.UpdateEntity(db, userBill, "AccountBalance") + return err +} + +func AddIncomeUpdateAccount(db *dao.DaoDB, userBill *model.UserBill, billType, price int) (err error) { + //2、账户收入增加一条记录 + err = AddBillIncome(db, userBill.BillID, billType, price) + if err != nil { + return err + } + //1、根据任务剩余数量退钱到账户余额中 + userBill.AccountBalance += price + _, err = dao.UpdateEntity(db, userBill, "AccountBalance") + return err +} diff --git a/globals/beegodb/beegodb.go b/globals/beegodb/beegodb.go index 398275a20..34879c5f5 100644 --- a/globals/beegodb/beegodb.go +++ b/globals/beegodb/beegodb.go @@ -22,6 +22,7 @@ func Init() { //任务 orm.RegisterModel(&model.Job{}, &model.JobCategory{}, &model.JobStep{}, &model.JobImg{}) orm.RegisterModel(&model.JobOrder{}) + orm.RegisterModel(&model.JobTimer{}) //聊天 orm.RegisterModel(&model.ImMessageRecord{}, &model.MessageGroup{}, &model.MessageGroupMember{}) diff --git a/main.go b/main.go index 56e8f9acc..980a1b877 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,8 @@ import ( _ "net/http/pprof" "os" + "git.rosy.net.cn/jx-callback/business/jxstore/cms" + "github.com/astaxie/beego" // 导入缺省订单调度器 @@ -96,7 +98,7 @@ func main() { // globals.SugarLogger.Errorf("RefreshPushToken failed with error:%s", err) // return // } - + cms.ResetJobTimers() if beego.BConfig.RunMode != "prod" { beego.BConfig.WebConfig.DirectoryIndex = true beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"