package misc import ( "strings" "sync" "time" "git.rosy.net.cn/baseapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxstore/cms" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/business/partner/putils" "git.rosy.net.cn/jx-callback/globals" ) const ( enableScheduleRefreshStore = true taskParallelCount = 4 specialSkuNameKeyWord = "温馨提示" startOpStoreStockNumber = 0 endOpStoreStockNumber = model.MaxStoreSkuStockQty ) var ( isDaemonRunning = false vendorList = map[int]bool{ model.VendorIDMTWM: true, model.VendorIDEBAI: true, } storeListQueueData = map[bool]*StoreListQueueData{ true: &StoreListQueueData{}, false: &StoreListQueueData{}, } // vendorStoreRefreshTimeList = map[int][]string { // model.VendorIDMTWM: []string { // //start and end time for JXGY // "22:00:00", // "00:00:00", // //start and end time for JXCS // "22:10:00", // "00:10:00", // }, // model.VendorIDEBAI: []string { // "22:20:00", // "06:00:00", // "22:30:00", // "06:10:00", // }, // } // vendorStartEndStoreTime = map[int][]int16 { // model.VendorIDMTWM: []int16 { // int16(2200),//start time // int16(2355),//end time // }, // model.VendorIDEBAI: []int16 { // int16(5), // int16(2355), // }, // } ) type StoreListQueueData struct { waitQueue []*cms.StoreExt processQueue []*cms.StoreExt locker sync.RWMutex } func (s *StoreListQueueData) GetProcessQueue() (outQueue []*cms.StoreExt) { s.locker.RLock() defer s.locker.RUnlock() outQueue = append(outQueue, s.processQueue...) return outQueue } func (s *StoreListQueueData) InsertToWaitQueue(storeInfo *cms.StoreExt) { s.locker.Lock() defer s.locker.Unlock() s.waitQueue = append(s.waitQueue, storeInfo) } func (s *StoreListQueueData) ClearProcessQueue() { s.locker.Lock() defer s.locker.Unlock() if len(s.processQueue) > 0 { s.processQueue = []*cms.StoreExt{} } } func (s *StoreListQueueData) TransferWaitQueueToProcessQueue() { s.locker.Lock() defer s.locker.Unlock() if len(s.processQueue) == 0 && len(s.waitQueue) > 0 { for _, value := range s.waitQueue { s.processQueue = append(s.processQueue, value) } s.waitQueue = []*cms.StoreExt{} } } func AddOrDelExtraStoreOptime(ctx *jxcontext.Context, vendorID int, vendorOrgCode string, storeID int, vendorStoreID string, storeInfo *model.Store, startOpStoreTime, endOpStoreTime int16, needAddTime bool) bool { opTimeList := storeInfo.GetOpTimeList() if needAddTime { opTimeList = []int16{startOpStoreTime, endOpStoreTime} } handler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IStoreHandler) return handler.UpdateStoreOpTime(ctx, vendorOrgCode, storeID, vendorStoreID, opTimeList) == nil } func GetStockValue(isStart bool) int { if isStart { return startOpStoreStockNumber } else { return endOpStoreStockNumber } } // func GetOpStoreTime(vendorID int) (startTime, endTime int16) { // startTime = vendorStartEndStoreTime[vendorID][0] // endTime = vendorStartEndStoreTime[vendorID][1] // return startTime, endTime // } func IsSpecialSku(name string) bool { return strings.Contains(name, specialSkuNameKeyWord) } func SetSkuStock(isStart bool, storeSkuNameList []*partner.SkuNameInfo) { for _, skuNameInfo := range storeSkuNameList { for _, skuInfo := range skuNameInfo.SkuList { if IsSpecialSku(skuNameInfo.Name) || IsSpecialSku(skuInfo.SkuName) { skuInfo.Stock = endOpStoreStockNumber } else { skuInfo.Stock = GetStockValue(isStart) } } } } func SetSpecialSkuStatus(ctx *jxcontext.Context, vendorID int, vendorOrgCode string, storeID int, vendorStoreID string, storeSkuNameList []*partner.SkuNameInfo) { singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) for _, skuNameInfo := range storeSkuNameList { for _, skuInfo := range skuNameInfo.SkuList { if IsSpecialSku(skuNameInfo.Name) || IsSpecialSku(skuInfo.SkuName) { storeSkuList := []*partner.StoreSkuInfo{&skuInfo.StoreSkuInfo} singleStoreHandler.UpdateStoreSkusStatus(ctx, vendorOrgCode, storeID, vendorStoreID, storeSkuList, model.SkuStatusNormal) } } } } func GetFilterStoreSkuList(storeSkuList []*partner.StoreSkuInfo) (storeSkuListOut []*partner.StoreSkuInfo) { for _, value := range storeSkuList { if value.SkuID != 0 { storeSkuListOut = append(storeSkuListOut, value) } } return storeSkuListOut } func GetStoreList(ctx *jxcontext.Context) (storeList []*cms.StoreExt, err error) { storeInfo, err := cms.GetStores(ctx, "", map[string]interface{}{}, 0, -1, utils.DefaultTimeValue, utils.DefaultTimeValue, 0, 0) storeList = storeInfo.Stores return storeList, err } func GetFilterStoreList(storeList []*cms.StoreExt, vendorMap, storeIDMap map[int]bool) (outStoreList []*cms.StoreExt) { for _, storeInfo := range storeList { storeID := storeInfo.ID //filter for storeID if len(storeIDMap) > 0 { if _, ok := storeIDMap[storeID]; !ok { continue } } for _, vendorStoreInfo := range storeInfo.StoreMaps { vendorID := vendorStoreInfo.VendorID //filter for vendorID if len(vendorMap) > 0 { if _, ok := vendorMap[vendorID]; !ok { continue } } if _, ok := vendorList[vendorID]; !ok { continue } temp := *storeInfo newStoreInfo := &temp newStoreInfo.StoreMaps = []*model.StoreMap{vendorStoreInfo} outStoreList = append(outStoreList, newStoreInfo) } } return outStoreList } func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int, storeIDList []int, startTime, endTime int16, isAsync, isContinueWhenError bool) (retVal interface{}, err error) { storeList, err := GetStoreList(ctx) vendorMap := make(map[int]bool) for _, vendorID := range vendorIDList { vendorMap[vendorID] = true } storeIDMap := make(map[int]bool) for _, storeID := range storeIDList { storeIDMap[storeID] = true } storeList = GetFilterStoreList(storeList, vendorMap, storeIDMap) return StartOrEndOpStoreEx(ctx, isStart, startTime, endTime, isAsync, isContinueWhenError, storeList) } func StartOrEndOpStoreEx(ctx *jxcontext.Context, isStart bool, startTime, endTime int16, isAsync, isContinueWhenError bool, storeList []*cms.StoreExt) (retVal interface{}, err error) { startProcessTime := time.Now().Unix() baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now()) if len(storeList) > 0 { taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeListValue := batchItemList[0].(*cms.StoreExt) storeID := storeListValue.ID for _, vendorListValue := range storeListValue.StoreMaps { vendorID := vendorListValue.VendorID startOpStoreTime := vendorListValue.FakeOpenStart endOpStoreTime := vendorListValue.FakeOpenStop //startOpStoreTime, endOpStoreTime := GetOpStoreTime(vendorID) if startTime != 0 && endTime != 0 { startOpStoreTime = startTime endOpStoreTime = endTime baseapi.SugarLogger.Debugf("StartOrEndOpStore SetStoreOptime:%d do:%d", startTime, endTime) } if isStart && (startOpStoreTime == 0 || endOpStoreTime == 0) { continue } vendorStoreID := vendorListValue.VendorStoreID baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID) singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil) if err != nil { baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID) } else { SetSkuStock(isStart, storeSkuNameList) SetSpecialSkuStatus(ctx, vendorID, vendorListValue.VendorOrgCode, storeID, vendorStoreID, storeSkuNameList) storeSkuList := putils.StoreSkuFullList2Bare(storeSkuNameList) if vendorID == model.VendorIDMTWM { storeSkuList = GetFilterStoreSkuList(storeSkuList) } if len(storeSkuList) > 0 { if !isStart { AddOrDelExtraStoreOptime(ctx, vendorID, vendorListValue.VendorOrgCode, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false) } _, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) { _, err = singleStoreHandler.UpdateStoreSkusStock(ctx, vendorListValue.VendorOrgCode, storeID, vendorStoreID, batchedStoreSkuList) return nil, 0, err }, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true) if isStart { AddOrDelExtraStoreOptime(ctx, vendorID, vendorListValue.VendorOrgCode, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, true) } } } } return retVal, err } taskName := "" if isStart { taskName = "开启平台商店的额外营业时间" } else { taskName = "结束平台商店的额外营业时间" } task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetParallelCount(taskParallelCount), ctx, taskFunc, storeList) tasksch.HandleTask(task, nil, true).Run() if isAsync { retVal = task.ID } else { _, err = task.GetResult(0) if err != nil { baseapi.SugarLogger.Debugf("StartOrEndOpStore tasksch error:%v", err) } retVal = "1" } } endProcessTime := time.Now().Unix() diff := endProcessTime - startProcessTime baseapi.SugarLogger.Debugf("StartOrEndOpStore end time: %v", time.Now()) baseapi.SugarLogger.Debugf("StartOrEndOpStore cost time: %d sec", diff) return retVal, err } func IsJXCS() bool { return globals.IsMainProductEnv() } // func GetVendorStoreRefreshTime(vendorID int) (startTimeList, stopTimeList []string) { // isJXCS := globals.IsMainProductEnv() // refreshTimeList := vendorStoreRefreshTimeList[vendorID] // if isJXCS { // startTimeList = []string{refreshTimeList[2]} // stopTimeList = []string{refreshTimeList[3]} // } else { // startTimeList = []string{refreshTimeList[0]} // stopTimeList = []string{refreshTimeList[1]} // } // return startTimeList, stopTimeList // } // func RefreshStore(vendorID int) { // ctx := jxcontext.AdminCtx // startTimeList, stopTimeList := GetVendorStoreRefreshTime(vendorID) // vendorIDList := []int{vendorID} // storeIDList := []int{} // ScheduleTimerFunc("StartOpStore", func() { // if !IsImportantTaskRunning(TaskNameSyncStoreSku) { // StartOrEndOpStore(ctx, true, vendorIDList, storeIDList, 0, 0, false, true) // } // }, startTimeList) // ScheduleTimerFunc("EndOpStore", func() { // if !IsImportantTaskRunning(TaskNameSyncStoreSku) { // StartOrEndOpStore(ctx, false, vendorIDList, storeIDList, 0, 0, false, true) // } // }, stopTimeList) // } func InitEx() { // for index, value := range vendorList { // if value { // RefreshStore(index) // } // } if enableScheduleRefreshStore && IsJXCS() { ctx := jxcontext.AdminCtx storeList, err := GetStoreList(ctx) storeList = GetFilterStoreList(storeList, map[int]bool{}, map[int]bool{}) if err == nil { for _, storeInfo := range storeList { for _, vendorStoreInfo := range storeInfo.StoreMaps { startOpStoreTime := vendorStoreInfo.FakeOpenStart endOpStoreTime := vendorStoreInfo.FakeOpenStop if startOpStoreTime == 0 || endOpStoreTime == 0 { continue } startTime := jxutils.OperationTime2StrWithSecond(startOpStoreTime) ScheduleTimerFuncOnce("StartOpStore", func(param interface{}) { if !IsImportantTaskRunning(TaskNameSyncStoreSku) { storeInfo := param.(*cms.StoreExt) storeListQueueData[true].InsertToWaitQueue(storeInfo) } }, startTime, storeInfo) stopTime := jxutils.OperationTime2StrWithSecond(endOpStoreTime) ScheduleTimerFuncOnce("EndOpStore", func(param interface{}) { storeInfo := param.(*cms.StoreExt) storeListQueueData[false].InsertToWaitQueue(storeInfo) }, stopTime, storeInfo) } } if !isDaemonRunning { isDaemonRunning = true taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { switch step { case 0: periodlyFunc := func() { for isStart, value := range storeListQueueData { storeList := value.GetProcessQueue() if len(storeList) > 0 { StartOrEndOpStoreEx(ctx, isStart, 0, 0, false, true, storeList) } } for _, value := range storeListQueueData { value.ClearProcessQueue() value.TransferWaitQueueToProcessQueue() } } PeriodlyCall(60*time.Second, periodlyFunc) } return result, err } taskSeq := tasksch.NewSeqTask("启动监听-定时刷新平台商店的额外营业时间", ctx, taskSeqFunc, 1) tasksch.HandleTask(taskSeq, nil, true).Run() } } } } func PeriodlyCall(d time.Duration, handler func()) { ticker := time.NewTicker(d) for _ = range ticker.C { handler() } }