This commit is contained in:
苏尹岚
2020-12-23 17:55:13 +08:00
parent a2313e4b19
commit 7eea91324d

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"math" "math"
"strings" "strings"
"sync"
"time" "time"
"git.rosy.net.cn/jx-callback/business/jxutils/ddmsg" "git.rosy.net.cn/jx-callback/business/jxutils/ddmsg"
@@ -38,14 +39,22 @@ const (
mtwmMemberPrice = 1100 mtwmMemberPrice = 1100
) )
var ( type JobTimer struct {
JobTimerMap map[int64]*time.Timer JobTimerMap map[int64]*time.Timer
JobAuditTimerMap map[int64]*time.Timer JobAuditTimerMap map[int64]*time.Timer
s *sync.RWMutex
}
var (
JobTimers = &JobTimer{}
) )
func init() { func init() {
JobTimerMap = make(map[int64]*time.Timer) JobTimerMap := make(map[int64]*time.Timer)
JobAuditTimerMap = make(map[int64]*time.Timer) JobAuditTimerMap := make(map[int64]*time.Timer)
JobTimers.JobTimerMap = JobTimerMap
JobTimers.JobAuditTimerMap = JobAuditTimerMap
JobTimers.s = new(sync.RWMutex)
} }
func PublishJob(ctx *jxcontext.Context, jobExt *model.JobExt) (errCode string, err error) { func PublishJob(ctx *jxcontext.Context, jobExt *model.JobExt) (errCode string, err error) {
@@ -389,7 +398,9 @@ func AcceptJob(ctx *jxcontext.Context, jobID, dropShippingDeliveryID, dropShippi
} else { } else {
//任务限时完成 //任务限时完成
timer := checkLimitJobOrders(db, job, jobOrder, model.JobTimerTypeAccept) timer := checkLimitJobOrders(db, job, jobOrder, model.JobTimerTypeAccept)
JobTimerMap[jobOrder.JobOrderID] = timer JobTimers.s.Lock()
JobTimers.JobTimerMap[jobOrder.JobOrderID] = timer
JobTimers.s.Unlock()
} }
dao.Commit(db) dao.Commit(db)
@@ -481,20 +492,22 @@ func checkLimitJobOrders(db *dao.DaoDB, job *model.Job, jobOrder *model.JobOrder
case <-timer.C: case <-timer.C:
switch jobTimerType { switch jobTimerType {
case model.JobTimerTypeAccept: case model.JobTimerTypeAccept:
UpdateLimitJobOrders(db, timer, job.ID, jobOrder, jobTimer) UpdateLimitJobOrders(db, timer, job.ID, jobOrder.JobOrderID, jobTimer)
case model.JobTimerTypeSubmit: case model.JobTimerTypeSubmit:
UpdateLimitAuditJobOrders(db, timer, job.ID, jobOrder, jobTimer) UpdateLimitAuditJobOrders(db, timer, job.ID, jobOrder.JobOrderID, jobTimer)
case model.JobTimerTypeDropShipping: case model.JobTimerTypeDropShipping:
UpdateDropShippingJobOrders(db, timer, job.ID, jobOrder.JobOrderID, jobTimer)
} }
} }
}) })
return timer return timer
} }
func UpdateLimitJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrder *model.JobOrder, jobTimer *model.JobTimer) { func UpdateLimitJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrderID int64, jobTimer *model.JobTimer) {
globals.SugarLogger.Debugf("updateLimitJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrder.JobOrderID) globals.SugarLogger.Debugf("updateLimitJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrderID)
defer timer.Stop() defer timer.Stop()
jobOrder := &model.JobOrder{JobOrderID: jobOrderID}
if err := dao.GetEntity(db, jobOrder, "JobOrderID"); err == nil {
if jobOrder.Status > model.JobOrderStatusAccept { if jobOrder.Status > model.JobOrderStatusAccept {
return return
} }
@@ -504,20 +517,37 @@ func UpdateLimitJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrder
dao.UpdateEntity(db, jobTimer, "Status") dao.UpdateEntity(db, jobTimer, "Status")
} }
} }
}
func UpdateLimitAuditJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrder *model.JobOrder, jobTimer *model.JobTimer) { func UpdateLimitAuditJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrderID int64, jobTimer *model.JobTimer) {
globals.SugarLogger.Debugf("checkLimitAuditJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrder.JobOrderID) globals.SugarLogger.Debugf("checkLimitAuditJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrderID)
defer timer.Stop() defer timer.Stop()
jobOrder := &model.JobOrder{JobOrderID: jobOrderID}
if err := dao.GetEntity(db, jobOrder, "JobOrderID"); err == nil {
if jobOrder.Status == model.JobOrderStatusWaitAudit { if jobOrder.Status == model.JobOrderStatusWaitAudit {
err := AuditJob(jxcontext.AdminCtx, int(jobOrder.JobOrderID), model.JobOrderStatusAuditPass, "超时系统通过") err := AuditJob(jxcontext.AdminCtx, int(jobOrderID), model.JobOrderStatusAuditPass, "超时系统通过")
if err != nil { if err != nil {
globals.SugarLogger.Debugf("checkLimitAuditJobOrders err: %v jobID: %v, jobOrderID: %v", err, jobID, jobOrder.JobOrderID) globals.SugarLogger.Debugf("checkLimitAuditJobOrders err: %v jobID: %v, jobOrderID: %v", err, jobID, jobOrderID)
} else { } else {
jobTimer.Status = model.JobTimerStatusFinish jobTimer.Status = model.JobTimerStatusFinish
dao.UpdateEntity(db, jobTimer, "Status") dao.UpdateEntity(db, jobTimer, "Status")
} }
} }
} }
}
func UpdateDropShippingJobOrders(db *dao.DaoDB, timer *time.Timer, jobID int, jobOrderID int64, jobTimer *model.JobTimer) {
globals.SugarLogger.Debugf("UpdateDropShippingJobOrders jobID: %v, jobOrderID: %v", jobID, jobOrderID)
defer timer.Stop()
jobOrder := &model.JobOrder{JobOrderID: jobOrderID}
job := &model.Job{}
job.ID = jobID
if err := dao.GetEntity(db, jobOrder, "JobOrderID"); err == nil {
if err := dao.GetEntity(db, job); err == nil {
}
}
}
func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) { func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) {
var ( var (
@@ -554,8 +584,9 @@ func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) {
jobOrder2.Status = model.JobOrderStatusWaitAudit jobOrder2.Status = model.JobOrderStatusWaitAudit
if _, err = dao.UpdateEntity(db, jobOrder2, "Imgs", "Content", "SubmitAuditAt", "Status", "UserActualPrice"); err == nil { if _, err = dao.UpdateEntity(db, jobOrder2, "Imgs", "Content", "SubmitAuditAt", "Status", "UserActualPrice"); err == nil {
//任务定时器停止 //任务定时器停止
if JobTimerMap[jobOrder2.JobOrderID] != nil { JobTimers.s.RLock()
JobTimerMap[jobOrder2.JobOrderID].Stop() if JobTimers.JobTimerMap[jobOrder2.JobOrderID] != nil {
JobTimers.JobTimerMap[jobOrder2.JobOrderID].Stop()
//任务定时表状态完成 //任务定时表状态完成
jobTimer := &model.JobTimer{ jobTimer := &model.JobTimer{
JobID: job.ID, JobID: job.ID,
@@ -567,6 +598,7 @@ func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) {
dao.UpdateEntity(db, jobTimer, "Status") dao.UpdateEntity(db, jobTimer, "Status")
} }
} }
JobTimers.s.RUnlock()
//一件代发 //一件代发
var timerType int var timerType int
if job.JobCategoryID == model.JobCategoryIDDropShipping { if job.JobCategoryID == model.JobCategoryIDDropShipping {
@@ -576,7 +608,9 @@ func SubmitJob(ctx *jxcontext.Context, jobOrder *model.JobOrder) (err error) {
} }
//审核定时开启 //审核定时开启
timer := checkLimitJobOrders(db, job, jobOrder2, timerType) timer := checkLimitJobOrders(db, job, jobOrder2, timerType)
JobAuditTimerMap[jobOrder2.JobOrderID] = timer JobTimers.s.Lock()
JobTimers.JobAuditTimerMap[jobOrder2.JobOrderID] = timer
JobTimers.s.Unlock()
} }
return err return err
} }
@@ -684,9 +718,11 @@ func AuditJob(ctx *jxcontext.Context, jobOrderID, status int, comment string) (e
} }
dao.Commit(db) dao.Commit(db)
//任务定时器停止 //任务定时器停止
if JobAuditTimerMap[int64(jobOrderID)] != nil { JobTimers.s.RLock()
JobAuditTimerMap[int64(jobOrderID)].Stop() if JobTimers.JobAuditTimerMap[int64(jobOrderID)] != nil {
JobTimers.JobAuditTimerMap[int64(jobOrderID)].Stop()
} }
JobTimers.s.RUnlock()
//任务定时表状态完成 //任务定时表状态完成
jobTimer := &model.JobTimer{ jobTimer := &model.JobTimer{
JobID: job.ID, JobID: job.ID,
@@ -1148,9 +1184,9 @@ func ResetJobTimers() {
case <-timer.C: case <-timer.C:
switch jobTimer.Type { switch jobTimer.Type {
case model.JobTimerTypeAccept: case model.JobTimerTypeAccept:
UpdateLimitJobOrders(db, timer, jobTimer.JobID, jobOrders[0], jobTimer) UpdateLimitJobOrders(db, timer, jobTimer.JobID, jobOrders[0].JobOrderID, jobTimer)
case model.JobTimerTypeSubmit: case model.JobTimerTypeSubmit:
UpdateLimitAuditJobOrders(db, timer, jobTimer.JobID, jobOrders[0], jobTimer) UpdateLimitAuditJobOrders(db, timer, jobTimer.JobID, jobOrders[0].JobOrderID, jobTimer)
} }
} }
}) })