重构-开启或结束所有平台商店的额外时间

This commit is contained in:
Rosy-zhudan
2019-08-25 10:05:37 +08:00
parent 881198d0f1
commit 949d4e7c8d
3 changed files with 261 additions and 123 deletions

View File

@@ -102,7 +102,6 @@ func Init() {
ScheduleTimerFunc("UpdateActStatusByTime", func() { ScheduleTimerFunc("UpdateActStatusByTime", func() {
dao.UpdateActStatusByTime(dao.GetDB(), time.Now().Add(-48*time.Hour)) dao.UpdateActStatusByTime(dao.GetDB(), time.Now().Add(-48*time.Hour))
}, updateActStatusTimeList) }, updateActStatusTimeList)
InitEx()
} }
ScheduleTimerFunc("AutoSaleStoreSku", func() { ScheduleTimerFunc("AutoSaleStoreSku", func() {
cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil, false) cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil, false)
@@ -118,6 +117,7 @@ func doDailyWork() {
SaveImportantTaskID(TaskNameSyncStoreSku, SpecialTaskID) SaveImportantTaskID(TaskNameSyncStoreSku, SpecialTaskID)
taskID, _ := cms.CurVendorSync.SyncStoresSkus2(jxcontext.AdminCtx, dao.GetDB(), []int{model.VendorIDEBAI, model.VendorIDMTWM}, nil, nil, nil, model.SyncFlagSaleMask|model.SyncFlagPriceMask, true, true) taskID, _ := cms.CurVendorSync.SyncStoresSkus2(jxcontext.AdminCtx, dao.GetDB(), []int{model.VendorIDEBAI, model.VendorIDMTWM}, nil, nil, nil, model.SyncFlagSaleMask|model.SyncFlagPriceMask, true, true)
SaveImportantTaskID(TaskNameSyncStoreSku, taskID) SaveImportantTaskID(TaskNameSyncStoreSku, taskID)
InitEx()
} }
func RefreshRealMobile(ctx *jxcontext.Context, vendorID int, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { func RefreshRealMobile(ctx *jxcontext.Context, vendorID int, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) {
@@ -184,14 +184,14 @@ func ScheduleTimerFunc(name string, handler func(), timeList []string) {
} }
// 按时间调度一次 // 按时间调度一次
func ScheduleTimerFuncOnce(name string, handler func(), timeStr string) { func ScheduleTimerFuncOnce(name string, handler func(interface{}), timeStr string, param interface{}) {
now := time.Now() now := time.Now()
nextTime := jxutils.GetNextTimeFromList(now, []string{timeStr}) nextTime := jxutils.GetNextTimeFromList(now, []string{timeStr})
duration := nextTime.Sub(now) + 1*time.Second duration := nextTime.Sub(now) + 1*time.Second
globals.SugarLogger.Debugf("ScheduleTimerFuncOnce, func:%s, duration:%v", name, duration) globals.SugarLogger.Debugf("ScheduleTimerFuncOnce, func:%s, duration:%v", name, duration)
utils.AfterFuncWithRecover(duration, func() { utils.AfterFuncWithRecover(duration, func() {
globals.SugarLogger.Debugf("ScheduleTimerFuncOnce func:%s", name) globals.SugarLogger.Debugf("ScheduleTimerFuncOnce func:%s", name)
handler() handler(param)
}) })
} }

View File

@@ -1,8 +1,10 @@
package misc package misc
import ( import (
"git.rosy.net.cn/jx-callback/business/jxutils"
"time" "time"
"strings" "strings"
"sync"
"git.rosy.net.cn/baseapi" "git.rosy.net.cn/baseapi"
"git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxstore/cms" "git.rosy.net.cn/jx-callback/business/jxstore/cms"
@@ -15,48 +17,90 @@ import (
) )
const ( const (
taskParallelCount = 4 enableScheduleRefreshStore = false
taskParallelCount = 2
specialSkuNameKeyWord = "温馨提示" specialSkuNameKeyWord = "温馨提示"
startOpStoreStockNumber = 0 startOpStoreStockNumber = 0
endOpStoreStockNumber = model.MaxStoreSkuStockQty endOpStoreStockNumber = model.MaxStoreSkuStockQty
) )
var ( var (
isDaemonRunning = false
vendorList = map[int]bool { vendorList = map[int]bool {
model.VendorIDMTWM: true, model.VendorIDMTWM: true,
model.VendorIDEBAI: true, model.VendorIDEBAI: true,
} }
storeListQueueData = map[bool]*StoreListQueueData {
vendorStoreRefreshTimeList = map[int][]string { true : &StoreListQueueData{},
model.VendorIDMTWM: []string { false : &StoreListQueueData{},
//start and end time for JXGY
"22:00:00",
"00:00:00",
//start and end time for JXCS
"22:10:00",
"00:10:00",
},
model.VendorIDEBAI: []string {
"22:20:00",
"06:00:00",
"22:30:00",
"06:10:00",
},
} }
// vendorStoreRefreshTimeList = map[int][]string {
// model.VendorIDMTWM: []string {
// //start and end time for JXGY
// "22:00:00",
// "00:00:00",
// //start and end time for JXCS
// "22:10:00",
// "00:10:00",
// },
// model.VendorIDEBAI: []string {
// "22:20:00",
// "06:00:00",
vendorStartEndStoreTime = map[int][]int16 { // "22:30:00",
model.VendorIDMTWM: []int16 { // "06:10:00",
int16(2200),//start time // },
int16(2355),//end time // }
},
model.VendorIDEBAI: []int16 { // vendorStartEndStoreTime = map[int][]int16 {
int16(5), // model.VendorIDMTWM: []int16 {
int16(2355), // int16(2200),//start time
}, // int16(2355),//end time
} // },
// model.VendorIDEBAI: []int16 {
// int16(5),
// int16(2355),
// },
// }
) )
type StoreListQueueData struct {
waitQueue []*cms.StoreExt
processQueue []*cms.StoreExt
locker sync.RWMutex
}
func (s *StoreListQueueData) GetProcessQueue() []*cms.StoreExt {
s.locker.RLock()
defer s.locker.RUnlock()
return s.processQueue
}
func (s *StoreListQueueData) InsertToWaitQueue(storeInfo *cms.StoreExt) {
s.locker.Lock()
defer s.locker.Unlock()
s.waitQueue = append(s.waitQueue, storeInfo)
}
func (s *StoreListQueueData) ClearProcessQueue() {
s.locker.Lock()
defer s.locker.Unlock()
if len(s.processQueue) > 0 {
s.processQueue = []*cms.StoreExt{}
}
}
func (s *StoreListQueueData) TransferWaitQueueToProcessQueue() {
s.locker.Lock()
defer s.locker.Unlock()
if len(s.processQueue) == 0 && len(s.waitQueue) > 0 {
for _, value := range s.waitQueue {
s.processQueue = append(s.processQueue, value)
}
s.waitQueue = []*cms.StoreExt{}
}
}
func AddOrDelExtraStoreOptime(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, storeInfo *model.Store, startOpStoreTime, endOpStoreTime int16, needAddTime bool) bool { func AddOrDelExtraStoreOptime(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, storeInfo *model.Store, startOpStoreTime, endOpStoreTime int16, needAddTime bool) bool {
opTimeList := storeInfo.GetOpTimeList() opTimeList := storeInfo.GetOpTimeList()
if needAddTime { if needAddTime {
@@ -74,11 +118,11 @@ func GetStockValue(isStart bool) int {
} }
} }
func GetOpStoreTime(vendorID int) (startTime, endTime int16) { // func GetOpStoreTime(vendorID int) (startTime, endTime int16) {
startTime = vendorStartEndStoreTime[vendorID][0] // startTime = vendorStartEndStoreTime[vendorID][0]
endTime = vendorStartEndStoreTime[vendorID][1] // endTime = vendorStartEndStoreTime[vendorID][1]
return startTime, endTime // return startTime, endTime
} // }
func IsSpecialSku(name string) bool { func IsSpecialSku(name string) bool {
return strings.Contains(name, specialSkuNameKeyWord) return strings.Contains(name, specialSkuNameKeyWord)
@@ -118,8 +162,44 @@ func GetFilterStoreSkuList(storeSkuList []*partner.StoreSkuInfo) (storeSkuListOu
return storeSkuListOut return storeSkuListOut
} }
func GetStoreList(ctx *jxcontext.Context) (storeList []*cms.StoreExt, err error) {
storeInfo, err := cms.GetStores(ctx, "", map[string]interface{}{}, 0, -1, utils.DefaultTimeValue, utils.DefaultTimeValue, 0, 0)
storeList = storeInfo.Stores
return storeList, err
}
func GetFilterStoreList(storeList []*cms.StoreExt, vendorMap, storeIDMap map[int]bool) (outStoreList []*cms.StoreExt) {
for _, storeInfo := range storeList {
storeID := storeInfo.ID
//filter for storeID
if len(storeIDMap) > 0 {
if _, ok := storeIDMap[storeID]; !ok {
continue
}
}
for _, vendorStoreInfo := range storeInfo.StoreMaps {
vendorID := int(utils.MustInterface2Int64(vendorStoreInfo["vendorID"]))
//filter for vendorID
if len(vendorMap) > 0 {
if _, ok := vendorMap[vendorID]; !ok {
continue
}
}
if _, ok := vendorList[vendorID]; !ok {
continue
}
temp := *storeInfo
newStoreInfo := &temp
newStoreInfo.StoreMaps = []map[string]interface{} {vendorStoreInfo}
outStoreList = append(outStoreList, newStoreInfo)
}
}
return outStoreList
}
func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int, storeIDList []int, startTime, endTime int16, isAsync, isContinueWhenError bool) (retVal interface{}, err error) { func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int, storeIDList []int, startTime, endTime int16, isAsync, isContinueWhenError bool) (retVal interface{}, err error) {
startProcessTime := time.Now().Unix() storeList, err := GetStoreList(ctx)
vendorMap := make(map[int]bool) vendorMap := make(map[int]bool)
for _, vendorID := range vendorIDList { for _, vendorID := range vendorIDList {
vendorMap[vendorID] = true vendorMap[vendorID] = true
@@ -128,37 +208,31 @@ func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int,
for _, storeID := range storeIDList { for _, storeID := range storeIDList {
storeIDMap[storeID] = true storeIDMap[storeID] = true
} }
baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now()) storeList = GetFilterStoreList(storeList, vendorMap, storeIDMap)
storeInfo, err := cms.GetStores(ctx, "", map[string]interface{}{}, 0, -1, utils.DefaultTimeValue, utils.DefaultTimeValue, 0, 0) return StartOrEndOpStoreEx(ctx, isStart, startTime, endTime, isAsync, isContinueWhenError, storeList)
}
if err != nil { func StartOrEndOpStoreEx(ctx *jxcontext.Context, isStart bool, startTime, endTime int16, isAsync, isContinueWhenError bool, storeList []*cms.StoreExt) (retVal interface{}, err error) {
baseapi.SugarLogger.Errorf("StartOrEndOpStore cms.GetStores error:%v", err) startProcessTime := time.Now().Unix()
} else { baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now())
if len(storeList) > 0 {
taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeListValue := batchItemList[0].(*cms.StoreExt) storeListValue := batchItemList[0].(*cms.StoreExt)
storeID := storeListValue.ID storeID := storeListValue.ID
//filter for storeID
if len(storeIDMap) > 0 {
if _, ok := storeIDMap[storeID]; !ok {
return retVal, err
}
}
if storeListValue.StoreMaps != nil {
for _, vendorListValue := range storeListValue.StoreMaps { for _, vendorListValue := range storeListValue.StoreMaps {
vendorID := int(utils.MustInterface2Int64(vendorListValue["vendorID"])) vendorID := int(utils.MustInterface2Int64(vendorListValue["vendorID"]))
//filter for vendorID startOpStoreTime := int16(utils.MustInterface2Int64(vendorListValue["fakeOpenStart"]))
if len(vendorMap) > 0 { endOpStoreTime := int16(utils.MustInterface2Int64(vendorListValue["fakeOpenStop"]))
if _, ok := vendorMap[vendorID]; !ok { //startOpStoreTime, endOpStoreTime := GetOpStoreTime(vendorID)
continue
}
}
if _, ok := vendorList[vendorID]; ok {
startOpStoreTime, endOpStoreTime := GetOpStoreTime(vendorID)
if startTime != 0 && endTime != 0 { if startTime != 0 && endTime != 0 {
startOpStoreTime = startTime startOpStoreTime = startTime
endOpStoreTime = endTime endOpStoreTime = endTime
baseapi.SugarLogger.Debugf("StartOrEndOpStore SetStoreOptime:%d do:%d", startTime, endTime) baseapi.SugarLogger.Debugf("StartOrEndOpStore SetStoreOptime:%d do:%d", startTime, endTime)
} }
if isStart && (startOpStoreTime == 0 || endOpStoreTime == 0) {
continue
}
vendorStoreID := utils.Interface2String(vendorListValue["vendorStoreID"]) vendorStoreID := utils.Interface2String(vendorListValue["vendorStoreID"])
baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID) baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID)
singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
@@ -178,11 +252,7 @@ func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int,
} }
_, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) { _, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
//var successList []*partner.StoreSkuInfo _, err = singleStoreHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList)
if _, err = singleStoreHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
//successList = batchedStoreSkuList
//baseapi.SugarLogger.Debugf("StartOrEndOpStore successList:%v error:%v", successList, err)
}
return nil, 0, err return nil, 0, err
}, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true) }, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true)
@@ -192,11 +262,15 @@ func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int,
} }
} }
} }
}
}
return retVal, err return retVal, err
} }
task := tasksch.NewParallelTask("StartOrEndOpStore", tasksch.NewParallelConfig().SetParallelCount(taskParallelCount), ctx, taskFunc, storeInfo.Stores) taskName := ""
if isStart {
taskName = "开启平台商店的额外营业时间"
} else {
taskName = "结束平台商店的额外营业时间"
}
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetParallelCount(taskParallelCount), ctx, taskFunc, storeList)
tasksch.HandleTask(task, nil, true).Run() tasksch.HandleTask(task, nil, true).Run()
if isAsync { if isAsync {
retVal = task.ID retVal = task.ID
@@ -216,40 +290,104 @@ func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int,
return retVal, err return retVal, err
} }
func GetVendorStoreRefreshTime(vendorID int) (startTimeList, stopTimeList []string) { func IsJXCS() bool {
isJXCS := globals.IsMainProductEnv() return globals.IsMainProductEnv()
refreshTimeList := vendorStoreRefreshTimeList[vendorID]
if isJXCS {
startTimeList = []string{refreshTimeList[2]}
stopTimeList = []string{refreshTimeList[3]}
} else {
startTimeList = []string{refreshTimeList[0]}
stopTimeList = []string{refreshTimeList[1]}
}
return startTimeList, stopTimeList
} }
// func GetVendorStoreRefreshTime(vendorID int) (startTimeList, stopTimeList []string) {
// isJXCS := globals.IsMainProductEnv()
// refreshTimeList := vendorStoreRefreshTimeList[vendorID]
// if isJXCS {
// startTimeList = []string{refreshTimeList[2]}
// stopTimeList = []string{refreshTimeList[3]}
// } else {
// startTimeList = []string{refreshTimeList[0]}
// stopTimeList = []string{refreshTimeList[1]}
// }
// return startTimeList, stopTimeList
// }
func RefreshStore(vendorID int) { // func RefreshStore(vendorID int) {
ctx := jxcontext.AdminCtx // ctx := jxcontext.AdminCtx
startTimeList, stopTimeList := GetVendorStoreRefreshTime(vendorID) // startTimeList, stopTimeList := GetVendorStoreRefreshTime(vendorID)
vendorIDList := []int{vendorID} // vendorIDList := []int{vendorID}
storeIDList := []int{} // storeIDList := []int{}
ScheduleTimerFunc("StartOpStore", func() { // ScheduleTimerFunc("StartOpStore", func() {
if !IsImportantTaskRunning(TaskNameSyncStoreSku) { // if !IsImportantTaskRunning(TaskNameSyncStoreSku) {
StartOrEndOpStore(ctx, true, vendorIDList, storeIDList, 0, 0, false, true) // StartOrEndOpStore(ctx, true, vendorIDList, storeIDList, 0, 0, false, true)
} // }
}, startTimeList) // }, startTimeList)
ScheduleTimerFunc("EndOpStore", func() { // ScheduleTimerFunc("EndOpStore", func() {
if !IsImportantTaskRunning(TaskNameSyncStoreSku) { // if !IsImportantTaskRunning(TaskNameSyncStoreSku) {
StartOrEndOpStore(ctx, false, vendorIDList, storeIDList, 0, 0, false, true) // StartOrEndOpStore(ctx, false, vendorIDList, storeIDList, 0, 0, false, true)
} // }
}, stopTimeList) // }, stopTimeList)
} // }
func InitEx() { func InitEx() {
for index, value := range vendorList { // for index, value := range vendorList {
if value { // if value {
RefreshStore(index) // RefreshStore(index)
// }
// }
if enableScheduleRefreshStore && IsJXCS() {
ctx := jxcontext.AdminCtx
storeList, err := GetStoreList(ctx)
storeList = GetFilterStoreList(storeList, map[int]bool{}, map[int]bool{})
if err == nil {
for _, storeInfo := range storeList {
for _, vendorStoreInfo := range storeInfo.StoreMaps {
startOpStoreTime := int16(utils.MustInterface2Int64(vendorStoreInfo["fakeOpenStart"]))
endOpStoreTime := int16(utils.MustInterface2Int64(vendorStoreInfo["fakeOpenStop"]))
if startOpStoreTime == 0 || endOpStoreTime == 0 {
continue
}
startTime := jxutils.OperationTime2StrWithSecond(startOpStoreTime)
ScheduleTimerFuncOnce("StartOpStore", func(param interface{}) {
if !IsImportantTaskRunning(TaskNameSyncStoreSku) {
storeInfo := param.(*cms.StoreExt)
storeListQueueData[true].InsertToWaitQueue(storeInfo)
}
}, startTime, storeInfo)
stopTime := jxutils.OperationTime2StrWithSecond(endOpStoreTime)
ScheduleTimerFuncOnce("EndOpStore", func(param interface{}) {
storeInfo := param.(*cms.StoreExt)
storeListQueueData[false].InsertToWaitQueue(storeInfo)
}, stopTime, storeInfo)
}
}
if !isDaemonRunning {
isDaemonRunning = true
taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
switch step {
case 0:
periodlyFunc := func() {
for isStart, value := range storeListQueueData {
storeList := value.GetProcessQueue()
if len(storeList) > 0 {
StartOrEndOpStoreEx(ctx, isStart, 0, 0, false, true, storeList)
}
}
for _, value := range storeListQueueData {
value.ClearProcessQueue()
value.TransferWaitQueueToProcessQueue()
}
}
PeriodlyCall(time.Second, periodlyFunc)
}
return result, err
}
taskSeq := tasksch.NewSeqTask("启动监听-定时刷新平台商店的额外营业时间", ctx, taskSeqFunc, 1)
tasksch.HandleTask(taskSeq, nil, true).Run()
} }
} }
} }
}
func PeriodlyCall(d time.Duration, handler func()) {
ticker := time.NewTicker(d)
for _ = range ticker.C {
handler()
}
}

View File

@@ -322,8 +322,8 @@ func (c *SkuController) SyncSku() {
}) })
} }
// @Title 同步商家SKU // @Title 获取敏感词列表
// @Description 同步商家SKU // @Description 获取敏感词列表
// @Param token header string true "认证token" // @Param token header string true "认证token"
// @Success 200 {object} controllers.CallResult // @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult // @Failure 200 {object} controllers.CallResult