Files
jx-callback/business/jxstore/misc/misc2.go

397 lines
13 KiB
Go

package misc
import (
"strings"
"sync"
"time"
"git.rosy.net.cn/baseapi"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxstore/cms"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/partner"
"git.rosy.net.cn/jx-callback/business/partner/putils"
"git.rosy.net.cn/jx-callback/globals"
)
const (
enableScheduleRefreshStore = true
taskParallelCount = 4
specialSkuNameKeyWord = "温馨提示"
startOpStoreStockNumber = 0
endOpStoreStockNumber = model.MaxStoreSkuStockQty
)
var (
isDaemonRunning = false
vendorList = map[int]bool{
model.VendorIDMTWM: true,
model.VendorIDEBAI: true,
}
storeListQueueData = map[bool]*StoreListQueueData{
true: &StoreListQueueData{},
false: &StoreListQueueData{},
}
// vendorStoreRefreshTimeList = map[int][]string {
// model.VendorIDMTWM: []string {
// //start and end time for JXGY
// "22:00:00",
// "00:00:00",
// //start and end time for JXCS
// "22:10:00",
// "00:10:00",
// },
// model.VendorIDEBAI: []string {
// "22:20:00",
// "06:00:00",
// "22:30:00",
// "06:10:00",
// },
// }
// vendorStartEndStoreTime = map[int][]int16 {
// model.VendorIDMTWM: []int16 {
// int16(2200),//start time
// int16(2355),//end time
// },
// model.VendorIDEBAI: []int16 {
// int16(5),
// int16(2355),
// },
// }
)
type StoreListQueueData struct {
waitQueue []*cms.StoreExt
processQueue []*cms.StoreExt
locker sync.RWMutex
}
func (s *StoreListQueueData) GetProcessQueue() (outQueue []*cms.StoreExt) {
s.locker.RLock()
defer s.locker.RUnlock()
outQueue = append(outQueue, s.processQueue...)
return outQueue
}
func (s *StoreListQueueData) InsertToWaitQueue(storeInfo *cms.StoreExt) {
s.locker.Lock()
defer s.locker.Unlock()
s.waitQueue = append(s.waitQueue, storeInfo)
}
func (s *StoreListQueueData) ClearProcessQueue() {
s.locker.Lock()
defer s.locker.Unlock()
if len(s.processQueue) > 0 {
s.processQueue = []*cms.StoreExt{}
}
}
func (s *StoreListQueueData) TransferWaitQueueToProcessQueue() {
s.locker.Lock()
defer s.locker.Unlock()
if len(s.processQueue) == 0 && len(s.waitQueue) > 0 {
for _, value := range s.waitQueue {
s.processQueue = append(s.processQueue, value)
}
s.waitQueue = []*cms.StoreExt{}
}
}
func AddOrDelExtraStoreOptime(ctx *jxcontext.Context, vendorID int, vendorOrgCode string, storeID int, vendorStoreID string, storeInfo *model.Store, startOpStoreTime, endOpStoreTime int16, needAddTime bool) bool {
opTimeList := storeInfo.GetOpTimeList()
if needAddTime {
opTimeList = []int16{startOpStoreTime, endOpStoreTime}
}
handler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IStoreHandler)
return handler.UpdateStoreOpTime(ctx, vendorOrgCode, storeID, vendorStoreID, opTimeList) == nil
}
func GetStockValue(isStart bool) int {
if isStart {
return startOpStoreStockNumber
} else {
return endOpStoreStockNumber
}
}
// func GetOpStoreTime(vendorID int) (startTime, endTime int16) {
// startTime = vendorStartEndStoreTime[vendorID][0]
// endTime = vendorStartEndStoreTime[vendorID][1]
// return startTime, endTime
// }
func IsSpecialSku(name string) bool {
return strings.Contains(name, specialSkuNameKeyWord)
}
func SetSkuStock(isStart bool, storeSkuNameList []*partner.SkuNameInfo) {
for _, skuNameInfo := range storeSkuNameList {
for _, skuInfo := range skuNameInfo.SkuList {
if IsSpecialSku(skuNameInfo.Name) || IsSpecialSku(skuInfo.SkuName) {
skuInfo.Stock = endOpStoreStockNumber
} else {
skuInfo.Stock = GetStockValue(isStart)
}
}
}
}
func SetSpecialSkuStatus(ctx *jxcontext.Context, vendorID int, vendorOrgCode string, storeID int, vendorStoreID string, storeSkuNameList []*partner.SkuNameInfo) {
singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
for _, skuNameInfo := range storeSkuNameList {
for _, skuInfo := range skuNameInfo.SkuList {
if IsSpecialSku(skuNameInfo.Name) || IsSpecialSku(skuInfo.SkuName) {
storeSkuList := []*partner.StoreSkuInfo{&skuInfo.StoreSkuInfo}
singleStoreHandler.UpdateStoreSkusStatus(ctx, vendorOrgCode, storeID, vendorStoreID, storeSkuList, model.SkuStatusNormal)
}
}
}
}
func GetFilterStoreSkuList(storeSkuList []*partner.StoreSkuInfo) (storeSkuListOut []*partner.StoreSkuInfo) {
for _, value := range storeSkuList {
if value.SkuID != 0 {
storeSkuListOut = append(storeSkuListOut, value)
}
}
return storeSkuListOut
}
func GetStoreList(ctx *jxcontext.Context) (storeList []*cms.StoreExt, err error) {
storeInfo, err := cms.GetStores(ctx, "", map[string]interface{}{}, 0, -1, utils.DefaultTimeValue, utils.DefaultTimeValue, 0, 0)
storeList = storeInfo.Stores
return storeList, err
}
func GetFilterStoreList(storeList []*cms.StoreExt, vendorMap, storeIDMap map[int]bool) (outStoreList []*cms.StoreExt) {
for _, storeInfo := range storeList {
storeID := storeInfo.ID
//filter for storeID
if len(storeIDMap) > 0 {
if _, ok := storeIDMap[storeID]; !ok {
continue
}
}
for _, vendorStoreInfo := range storeInfo.StoreMaps {
vendorID := vendorStoreInfo.VendorID
//filter for vendorID
if len(vendorMap) > 0 {
if _, ok := vendorMap[vendorID]; !ok {
continue
}
}
if _, ok := vendorList[vendorID]; !ok {
continue
}
temp := *storeInfo
newStoreInfo := &temp
newStoreInfo.StoreMaps = []*model.StoreMap{vendorStoreInfo}
outStoreList = append(outStoreList, newStoreInfo)
}
}
return outStoreList
}
func StartOrEndOpStore(ctx *jxcontext.Context, isStart bool, vendorIDList []int, storeIDList []int, startTime, endTime int16, isAsync, isContinueWhenError bool) (retVal interface{}, err error) {
storeList, err := GetStoreList(ctx)
vendorMap := make(map[int]bool)
for _, vendorID := range vendorIDList {
vendorMap[vendorID] = true
}
storeIDMap := make(map[int]bool)
for _, storeID := range storeIDList {
storeIDMap[storeID] = true
}
storeList = GetFilterStoreList(storeList, vendorMap, storeIDMap)
return StartOrEndOpStoreEx(ctx, isStart, startTime, endTime, isAsync, isContinueWhenError, storeList)
}
func StartOrEndOpStoreEx(ctx *jxcontext.Context, isStart bool, startTime, endTime int16, isAsync, isContinueWhenError bool, storeList []*cms.StoreExt) (retVal interface{}, err error) {
startProcessTime := time.Now().Unix()
baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now())
if len(storeList) > 0 {
taskFunc := func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
storeListValue := batchItemList[0].(*cms.StoreExt)
storeID := storeListValue.ID
for _, vendorListValue := range storeListValue.StoreMaps {
vendorID := vendorListValue.VendorID
startOpStoreTime := vendorListValue.FakeOpenStart
endOpStoreTime := vendorListValue.FakeOpenStop
//startOpStoreTime, endOpStoreTime := GetOpStoreTime(vendorID)
if startTime != 0 && endTime != 0 {
startOpStoreTime = startTime
endOpStoreTime = endTime
baseapi.SugarLogger.Debugf("StartOrEndOpStore SetStoreOptime:%d do:%d", startTime, endTime)
}
if isStart && (startOpStoreTime == 0 || endOpStoreTime == 0) {
continue
}
vendorStoreID := vendorListValue.VendorStoreID
baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID)
singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil)
if err != nil {
baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID)
} else {
SetSkuStock(isStart, storeSkuNameList)
SetSpecialSkuStatus(ctx, vendorID, vendorListValue.VendorOrgCode, storeID, vendorStoreID, storeSkuNameList)
storeSkuList := putils.StoreSkuFullList2Bare(storeSkuNameList)
if vendorID == model.VendorIDMTWM {
storeSkuList = GetFilterStoreSkuList(storeSkuList)
}
if len(storeSkuList) > 0 {
if !isStart {
AddOrDelExtraStoreOptime(ctx, vendorID, vendorListValue.VendorOrgCode, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false)
}
_, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
_, err = singleStoreHandler.UpdateStoreSkusStock(ctx, vendorListValue.VendorOrgCode, storeID, vendorStoreID, batchedStoreSkuList)
return nil, 0, err
}, ctx, task, storeSkuList, singleStoreHandler.GetStoreSkusBatchSize(partner.FuncUpdateStoreSkusStock), true)
if isStart {
AddOrDelExtraStoreOptime(ctx, vendorID, vendorListValue.VendorOrgCode, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, true)
}
}
}
}
return retVal, err
}
taskName := ""
if isStart {
taskName = "开启平台商店的额外营业时间"
} else {
taskName = "结束平台商店的额外营业时间"
}
task := tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetParallelCount(taskParallelCount), ctx, taskFunc, storeList)
tasksch.HandleTask(task, nil, true).Run()
if isAsync {
retVal = task.ID
} else {
_, err = task.GetResult(0)
if err != nil {
baseapi.SugarLogger.Debugf("StartOrEndOpStore tasksch error:%v", err)
}
retVal = "1"
}
}
endProcessTime := time.Now().Unix()
diff := endProcessTime - startProcessTime
baseapi.SugarLogger.Debugf("StartOrEndOpStore end time: %v", time.Now())
baseapi.SugarLogger.Debugf("StartOrEndOpStore cost time: %d sec", diff)
return retVal, err
}
func IsJXCS() bool {
return globals.IsMainProductEnv()
}
// func GetVendorStoreRefreshTime(vendorID int) (startTimeList, stopTimeList []string) {
// isJXCS := globals.IsMainProductEnv()
// refreshTimeList := vendorStoreRefreshTimeList[vendorID]
// if isJXCS {
// startTimeList = []string{refreshTimeList[2]}
// stopTimeList = []string{refreshTimeList[3]}
// } else {
// startTimeList = []string{refreshTimeList[0]}
// stopTimeList = []string{refreshTimeList[1]}
// }
// return startTimeList, stopTimeList
// }
// func RefreshStore(vendorID int) {
// ctx := jxcontext.AdminCtx
// startTimeList, stopTimeList := GetVendorStoreRefreshTime(vendorID)
// vendorIDList := []int{vendorID}
// storeIDList := []int{}
// ScheduleTimerFunc("StartOpStore", func() {
// if !IsImportantTaskRunning(TaskNameSyncStoreSku) {
// StartOrEndOpStore(ctx, true, vendorIDList, storeIDList, 0, 0, false, true)
// }
// }, startTimeList)
// ScheduleTimerFunc("EndOpStore", func() {
// if !IsImportantTaskRunning(TaskNameSyncStoreSku) {
// StartOrEndOpStore(ctx, false, vendorIDList, storeIDList, 0, 0, false, true)
// }
// }, stopTimeList)
// }
func InitEx() {
// for index, value := range vendorList {
// if value {
// RefreshStore(index)
// }
// }
if enableScheduleRefreshStore && IsJXCS() {
ctx := jxcontext.AdminCtx
storeList, err := GetStoreList(ctx)
storeList = GetFilterStoreList(storeList, map[int]bool{}, map[int]bool{})
if err == nil {
for _, storeInfo := range storeList {
for _, vendorStoreInfo := range storeInfo.StoreMaps {
startOpStoreTime := vendorStoreInfo.FakeOpenStart
endOpStoreTime := vendorStoreInfo.FakeOpenStop
if startOpStoreTime == 0 || endOpStoreTime == 0 {
continue
}
startTime := jxutils.OperationTime2StrWithSecond(startOpStoreTime)
ScheduleTimerFuncOnce("StartOpStore", func(param interface{}) {
if !IsImportantTaskRunning(TaskNameSyncStoreSku) {
storeInfo := param.(*cms.StoreExt)
storeListQueueData[true].InsertToWaitQueue(storeInfo)
}
}, startTime, storeInfo)
stopTime := jxutils.OperationTime2StrWithSecond(endOpStoreTime)
ScheduleTimerFuncOnce("EndOpStore", func(param interface{}) {
storeInfo := param.(*cms.StoreExt)
storeListQueueData[false].InsertToWaitQueue(storeInfo)
}, stopTime, storeInfo)
}
}
if !isDaemonRunning {
isDaemonRunning = true
taskSeqFunc := func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
switch step {
case 0:
periodlyFunc := func() {
for isStart, value := range storeListQueueData {
storeList := value.GetProcessQueue()
if len(storeList) > 0 {
StartOrEndOpStoreEx(ctx, isStart, 0, 0, false, true, storeList)
}
}
for _, value := range storeListQueueData {
value.ClearProcessQueue()
value.TransferWaitQueueToProcessQueue()
}
}
PeriodlyCall(60*time.Second, periodlyFunc)
}
return result, err
}
taskSeq := tasksch.NewSeqTask("启动监听-定时刷新平台商店的额外营业时间", ctx, taskSeqFunc, 1)
tasksch.HandleTask(taskSeq, nil, true).Run()
}
}
}
}
func PeriodlyCall(d time.Duration, handler func()) {
ticker := time.NewTicker(d)
for _ = range ticker.C {
handler()
}
}