diff --git a/business/jxstore/misc/misc.go b/business/jxstore/misc/misc.go index 4da56479e..81db2b920 100644 --- a/business/jxstore/misc/misc.go +++ b/business/jxstore/misc/misc.go @@ -102,7 +102,6 @@ func Init() { ScheduleTimerFunc("UpdateActStatusByTime", func() { dao.UpdateActStatusByTime(dao.GetDB(), time.Now().Add(-48*time.Hour)) }, updateActStatusTimeList) - InitEx() } ScheduleTimerFunc("AutoSaleStoreSku", func() { cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil, false) @@ -118,6 +117,7 @@ func doDailyWork() { SaveImportantTaskID(TaskNameSyncStoreSku, SpecialTaskID) taskID, _ := cms.CurVendorSync.SyncStoresSkus2(jxcontext.AdminCtx, dao.GetDB(), []int{model.VendorIDEBAI, model.VendorIDMTWM}, nil, nil, nil, model.SyncFlagSaleMask|model.SyncFlagPriceMask, true, true) SaveImportantTaskID(TaskNameSyncStoreSku, taskID) + InitEx() } func RefreshRealMobile(ctx *jxcontext.Context, vendorID int, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { @@ -184,14 +184,14 @@ func ScheduleTimerFunc(name string, handler func(), timeList []string) { } // 按时间调度一次 -func ScheduleTimerFuncOnce(name string, handler func(), timeStr string) { +func ScheduleTimerFuncOnce(name string, handler func(interface{}), timeStr string, param interface{}) { now := time.Now() nextTime := jxutils.GetNextTimeFromList(now, []string{timeStr}) duration := nextTime.Sub(now) + 1*time.Second globals.SugarLogger.Debugf("ScheduleTimerFuncOnce, func:%s, duration:%v", name, duration) utils.AfterFuncWithRecover(duration, func() { globals.SugarLogger.Debugf("ScheduleTimerFuncOnce func:%s", name) - handler() + handler(param) }) } diff --git a/business/jxstore/misc/misc2.go b/business/jxstore/misc/misc2.go index 5c691e047..5fc8a9a67 100644 --- a/business/jxstore/misc/misc2.go +++ b/business/jxstore/misc/misc2.go @@ -1,8 +1,10 @@ package misc import ( + "git.rosy.net.cn/jx-callback/business/jxutils" "time" "strings" + "sync" "git.rosy.net.cn/baseapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxstore/cms" @@ -15,48 +17,90 @@ import ( ) const ( - taskParallelCount = 4 + enableScheduleRefreshStore = false + taskParallelCount = 2 specialSkuNameKeyWord = "温馨提示" startOpStoreStockNumber = 0 endOpStoreStockNumber = model.MaxStoreSkuStockQty ) var ( + isDaemonRunning = false vendorList = map[int]bool { model.VendorIDMTWM: true, model.VendorIDEBAI: true, } - - vendorStoreRefreshTimeList = map[int][]string { - model.VendorIDMTWM: []string { - //start and end time for JXGY - "22:00:00", - "00:00:00", - //start and end time for JXCS - "22:10:00", - "00:10:00", - }, - model.VendorIDEBAI: []string { - "22:20:00", - "06:00:00", - - "22:30:00", - "06:10:00", - }, + storeListQueueData = map[bool]*StoreListQueueData { + true : &StoreListQueueData{}, + false : &StoreListQueueData{}, } + // vendorStoreRefreshTimeList = map[int][]string { + // model.VendorIDMTWM: []string { + // //start and end time for JXGY + // "22:00:00", + // "00:00:00", + // //start and end time for JXCS + // "22:10:00", + // "00:10:00", + // }, + // model.VendorIDEBAI: []string { + // "22:20:00", + // "06:00:00", - vendorStartEndStoreTime = map[int][]int16 { - model.VendorIDMTWM: []int16 { - int16(2200),//start time - int16(2355),//end time - }, - model.VendorIDEBAI: []int16 { - int16(5), - int16(2355), - }, - } + // "22:30:00", + // "06:10:00", + // }, + // } + + // vendorStartEndStoreTime = map[int][]int16 { + // model.VendorIDMTWM: []int16 { + // int16(2200),//start time + // int16(2355),//end time + // }, + // model.VendorIDEBAI: []int16 { + // int16(5), + // int16(2355), + // }, + // } ) +type StoreListQueueData struct { + waitQueue []*cms.StoreExt + processQueue []*cms.StoreExt + locker sync.RWMutex +} + +func (s *StoreListQueueData) GetProcessQueue() []*cms.StoreExt { + s.locker.RLock() + defer s.locker.RUnlock() + return s.processQueue +} + +func (s *StoreListQueueData) InsertToWaitQueue(storeInfo *cms.StoreExt) { + s.locker.Lock() + defer s.locker.Unlock() + s.waitQueue = append(s.waitQueue, storeInfo) +} + +func (s *StoreListQueueData) ClearProcessQueue() { + s.locker.Lock() + defer s.locker.Unlock() + if len(s.processQueue) > 0 { + s.processQueue = []*cms.StoreExt{} + } +} + +func (s *StoreListQueueData) TransferWaitQueueToProcessQueue() { + s.locker.Lock() + defer s.locker.Unlock() + if len(s.processQueue) == 0 && len(s.waitQueue) > 0 { + for _, value := range s.waitQueue { + s.processQueue = append(s.processQueue, value) + } + s.waitQueue = []*cms.StoreExt{} + } +} + func AddOrDelExtraStoreOptime(ctx *jxcontext.Context, vendorID, storeID int, vendorStoreID string, storeInfo *model.Store, startOpStoreTime, endOpStoreTime int16, needAddTime bool) bool { opTimeList := storeInfo.GetOpTimeList() if needAddTime { @@ -74,11 +118,11 @@ func GetStockValue(isStart bool) int { } } -func GetOpStoreTime(vendorID int) (startTime, endTime int16) { - startTime = vendorStartEndStoreTime[vendorID][0] - endTime = vendorStartEndStoreTime[vendorID][1] - return startTime, endTime -} +// func GetOpStoreTime(vendorID int) (startTime, endTime int16) { +// startTime = vendorStartEndStoreTime[vendorID][0] +// endTime = vendorStartEndStoreTime[vendorID][1] +// return startTime, endTime +// } func IsSpecialSku(name string) bool { return strings.Contains(name, specialSkuNameKeyWord) @@ -118,8 +162,44 @@ func GetFilterStoreSkuList(storeSkuList []*partner.StoreSkuInfo) (storeSkuListOu return storeSkuListOut } +func GetStoreList(ctx *jxcontext.Context) (storeList []*cms.StoreExt, err error) { + storeInfo, err := cms.GetStores(ctx, "", map[string]interface{}{}, 0, -1, utils.DefaultTimeValue, utils.DefaultTimeValue, 0, 0) + storeList = storeInfo.Stores + return storeList, err +} + +func GetFilterStoreList(storeList []*cms.StoreExt, vendorMap, storeIDMap map[int]bool) (outStoreList []*cms.StoreExt) { + for _, storeInfo := range storeList { + storeID := storeInfo.ID + //filter for storeID + if len(storeIDMap) > 0 { + if _, ok := storeIDMap[storeID]; !ok { + continue + } + } + for _, vendorStoreInfo := range storeInfo.StoreMaps { + vendorID := int(utils.MustInterface2Int64(vendorStoreInfo["vendorID"])) + //filter for vendorID + if len(vendorMap) > 0 { + if _, ok := vendorMap[vendorID]; !ok { + continue + } + } + if _, ok := vendorList[vendorID]; !ok { + continue + } + temp := *storeInfo + newStoreInfo := &temp + newStoreInfo.StoreMaps = []map[string]interface{} {vendorStoreInfo} + outStoreList = append(outStoreList, newStoreInfo) + } + } + + return outStoreList +} + func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int, storeIDList []int, startTime, endTime int16, isAsync, isContinueWhenError bool) (retVal interface{}, err error) { - startProcessTime := time.Now().Unix() + storeList, err := GetStoreList(ctx) vendorMap := make(map[int]bool) for _, vendorID := range vendorIDList { vendorMap[vendorID] = true @@ -128,75 +208,69 @@ func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int, for _, storeID := range storeIDList { storeIDMap[storeID] = true } - baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now()) - storeInfo, err := cms.GetStores(ctx, "", map[string]interface{}{}, 0, -1, utils.DefaultTimeValue, utils.DefaultTimeValue, 0, 0) + storeList = GetFilterStoreList(storeList, vendorMap, storeIDMap) + return StartOrEndOpStoreEx(ctx, isStart, startTime, endTime, isAsync, isContinueWhenError, storeList) +} - if err != nil { - baseapi.SugarLogger.Errorf("StartOrEndOpStore cms.GetStores error:%v", err) - } else { +func StartOrEndOpStoreEx(ctx *jxcontext.Context, isStart bool, startTime, endTime int16, isAsync, isContinueWhenError bool, storeList []*cms.StoreExt) (retVal interface{}, err error) { + startProcessTime := time.Now().Unix() + baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now()) + + if len(storeList) > 0 { taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeListValue := batchItemList[0].(*cms.StoreExt) storeID := storeListValue.ID - //filter for storeID - if len(storeIDMap) > 0 { - if _, ok := storeIDMap[storeID]; !ok { - return retVal, err + for _, vendorListValue := range storeListValue.StoreMaps { + vendorID := int(utils.MustInterface2Int64(vendorListValue["vendorID"])) + startOpStoreTime := int16(utils.MustInterface2Int64(vendorListValue["fakeOpenStart"])) + endOpStoreTime := int16(utils.MustInterface2Int64(vendorListValue["fakeOpenStop"])) + //startOpStoreTime, endOpStoreTime := GetOpStoreTime(vendorID) + if startTime != 0 && endTime != 0 { + startOpStoreTime = startTime + endOpStoreTime = endTime + baseapi.SugarLogger.Debugf("StartOrEndOpStore SetStoreOptime:%d do:%d", startTime, endTime) } - } - if storeListValue.StoreMaps != nil { - for _, vendorListValue := range storeListValue.StoreMaps { - vendorID := int(utils.MustInterface2Int64(vendorListValue["vendorID"])) - //filter for vendorID - if len(vendorMap) > 0 { - if _, ok := vendorMap[vendorID]; !ok { - continue - } + if isStart && (startOpStoreTime == 0 || endOpStoreTime == 0) { + continue + } + vendorStoreID := utils.Interface2String(vendorListValue["vendorStoreID"]) + baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID) + singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) + storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil) + if err != nil { + baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID) + } else { + SetSkuStock(isStart, storeSkuNameList) + SetSpecialSkuStatus(ctx, storeID, vendorID, vendorStoreID, storeSkuNameList) + storeSkuList := putils.StoreSkuFullList2Bare(storeSkuNameList) + if vendorID == model.VendorIDMTWM { + storeSkuList = GetFilterStoreSkuList(storeSkuList) } - if _, ok := vendorList[vendorID]; ok { - startOpStoreTime, endOpStoreTime := GetOpStoreTime(vendorID) - if startTime != 0 && endTime != 0 { - startOpStoreTime = startTime - endOpStoreTime = endTime - baseapi.SugarLogger.Debugf("StartOrEndOpStore SetStoreOptime:%d do:%d", startTime, endTime) + if len(storeSkuList) > 0 { + if !isStart { + AddOrDelExtraStoreOptime(ctx, vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false) } - vendorStoreID := utils.Interface2String(vendorListValue["vendorStoreID"]) - baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID) - singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) - storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil) - if err != nil { - baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID) - } else { - SetSkuStock(isStart, storeSkuNameList) - SetSpecialSkuStatus(ctx, storeID, vendorID, vendorStoreID, storeSkuNameList) - storeSkuList := putils.StoreSkuFullList2Bare(storeSkuNameList) - if vendorID == model.VendorIDMTWM { - storeSkuList = GetFilterStoreSkuList(storeSkuList) - } - if len(storeSkuList) > 0 { - if !isStart { - AddOrDelExtraStoreOptime(ctx, vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false) - } - _, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) { - //var successList []*partner.StoreSkuInfo - if _, err = singleStoreHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil { - //successList = batchedStoreSkuList - //baseapi.SugarLogger.Debugf("StartOrEndOpStore successList:%v error:%v", successList, err) - } - return nil, 0, err - }, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true) + _, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) { + _, err = singleStoreHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList) + return nil, 0, err + }, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true) - if isStart { - AddOrDelExtraStoreOptime(ctx, vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, true) - } - } + if isStart { + AddOrDelExtraStoreOptime(ctx, vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, true) } } } } return retVal, err } - task := tasksch.NewParallelTask("StartOrEndOpStore", tasksch.NewParallelConfig().SetParallelCount(taskParallelCount), ctx, taskFunc, storeInfo.Stores) + taskName := "" + if isStart { + taskName = "开启平台商店的额外营业时间" + } else { + taskName = "结束平台商店的额外营业时间" + } + task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetParallelCount(taskParallelCount), ctx, taskFunc, storeList) tasksch.HandleTask(task, nil, true).Run() if isAsync { retVal = task.ID @@ -216,40 +290,104 @@ func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int, return retVal, err } -func GetVendorStoreRefreshTime(vendorID int) (startTimeList, stopTimeList []string) { - isJXCS := globals.IsMainProductEnv() - refreshTimeList := vendorStoreRefreshTimeList[vendorID] - if isJXCS { - startTimeList = []string{refreshTimeList[2]} - stopTimeList = []string{refreshTimeList[3]} - } else { - startTimeList = []string{refreshTimeList[0]} - stopTimeList = []string{refreshTimeList[1]} - } - return startTimeList, stopTimeList +func IsJXCS() bool { + return globals.IsMainProductEnv() } +// func GetVendorStoreRefreshTime(vendorID int) (startTimeList, stopTimeList []string) { +// isJXCS := globals.IsMainProductEnv() +// refreshTimeList := vendorStoreRefreshTimeList[vendorID] +// if isJXCS { +// startTimeList = []string{refreshTimeList[2]} +// stopTimeList = []string{refreshTimeList[3]} +// } else { +// startTimeList = []string{refreshTimeList[0]} +// stopTimeList = []string{refreshTimeList[1]} +// } +// return startTimeList, stopTimeList +// } -func RefreshStore(vendorID int) { - ctx := jxcontext.AdminCtx - startTimeList, stopTimeList := GetVendorStoreRefreshTime(vendorID) - vendorIDList := []int{vendorID} - storeIDList := []int{} - ScheduleTimerFunc("StartOpStore", func() { - if !IsImportantTaskRunning(TaskNameSyncStoreSku) { - StartOrEndOpStore(ctx, true, vendorIDList, storeIDList, 0, 0, false, true) - } - }, startTimeList) - ScheduleTimerFunc("EndOpStore", func() { - if !IsImportantTaskRunning(TaskNameSyncStoreSku) { - StartOrEndOpStore(ctx, false, vendorIDList, storeIDList, 0, 0, false, true) - } - }, stopTimeList) -} +// func RefreshStore(vendorID int) { +// ctx := jxcontext.AdminCtx +// startTimeList, stopTimeList := GetVendorStoreRefreshTime(vendorID) +// vendorIDList := []int{vendorID} +// storeIDList := []int{} +// ScheduleTimerFunc("StartOpStore", func() { +// if !IsImportantTaskRunning(TaskNameSyncStoreSku) { +// StartOrEndOpStore(ctx, true, vendorIDList, storeIDList, 0, 0, false, true) +// } +// }, startTimeList) +// ScheduleTimerFunc("EndOpStore", func() { +// if !IsImportantTaskRunning(TaskNameSyncStoreSku) { +// StartOrEndOpStore(ctx, false, vendorIDList, storeIDList, 0, 0, false, true) +// } +// }, stopTimeList) +// } func InitEx() { - for index, value := range vendorList { - if value { - RefreshStore(index) + // for index, value := range vendorList { + // if value { + // RefreshStore(index) + // } + // } + if enableScheduleRefreshStore && IsJXCS() { + ctx := jxcontext.AdminCtx + storeList, err := GetStoreList(ctx) + storeList = GetFilterStoreList(storeList, map[int]bool{}, map[int]bool{}) + if err == nil { + for _, storeInfo := range storeList { + for _, vendorStoreInfo := range storeInfo.StoreMaps { + startOpStoreTime := int16(utils.MustInterface2Int64(vendorStoreInfo["fakeOpenStart"])) + endOpStoreTime := int16(utils.MustInterface2Int64(vendorStoreInfo["fakeOpenStop"])) + if startOpStoreTime == 0 || endOpStoreTime == 0 { + continue + } + + startTime := jxutils.OperationTime2StrWithSecond(startOpStoreTime) + ScheduleTimerFuncOnce("StartOpStore", func(param interface{}) { + if !IsImportantTaskRunning(TaskNameSyncStoreSku) { + storeInfo := param.(*cms.StoreExt) + storeListQueueData[true].InsertToWaitQueue(storeInfo) + } + }, startTime, storeInfo) + + stopTime := jxutils.OperationTime2StrWithSecond(endOpStoreTime) + ScheduleTimerFuncOnce("EndOpStore", func(param interface{}) { + storeInfo := param.(*cms.StoreExt) + storeListQueueData[false].InsertToWaitQueue(storeInfo) + }, stopTime, storeInfo) + } + } + if !isDaemonRunning { + isDaemonRunning = true + taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { + switch step { + case 0: + periodlyFunc := func() { + for isStart, value := range storeListQueueData { + storeList := value.GetProcessQueue() + if len(storeList) > 0 { + StartOrEndOpStoreEx(ctx, isStart, 0, 0, false, true, storeList) + } + } + for _, value := range storeListQueueData { + value.ClearProcessQueue() + value.TransferWaitQueueToProcessQueue() + } + } + PeriodlyCall(time.Second, periodlyFunc) + } + return result, err + } + taskSeq := tasksch.NewSeqTask("启动监听-定时刷新平台商店的额外营业时间", ctx, taskSeqFunc, 1) + tasksch.HandleTask(taskSeq, nil, true).Run() + } } } } + +func PeriodlyCall(d time.Duration, handler func()) { + ticker := time.NewTicker(d) + for _ = range ticker.C { + handler() + } +} \ No newline at end of file diff --git a/controllers/cms_sku.go b/controllers/cms_sku.go index b535e6c48..835f2b239 100644 --- a/controllers/cms_sku.go +++ b/controllers/cms_sku.go @@ -322,8 +322,8 @@ func (c *SkuController) SyncSku() { }) } -// @Title 同步商家SKU -// @Description 同步商家SKU +// @Title 获取敏感词列表 +// @Description 获取敏感词列表 // @Param token header string true "认证token" // @Success 200 {object} controllers.CallResult // @Failure 200 {object} controllers.CallResult