diff --git a/business/jxcallback/scheduler/basesch/basesch_ext.go b/business/jxcallback/scheduler/basesch/basesch_ext.go index 228b51b41..ec3fbaf59 100644 --- a/business/jxcallback/scheduler/basesch/basesch_ext.go +++ b/business/jxcallback/scheduler/basesch/basesch_ext.go @@ -15,7 +15,7 @@ import ( func (c *BaseScheduler) CreateWaybillOnProviders(ctx *jxcontext.Context, order *model.GoodsOrder, courierVendorIDs, excludeCourierVendorIDs []int, policyHandler partner.CreateWaybillPolicyFunc, createOnlyOne bool) (bills []*model.Waybill, err error) { userName := ctx.GetUserName() - globals.SugarLogger.Infof("CreateWaybillOnProviders orderID:%s userName:%s", order.VendorOrderID, userName) + globals.SugarLogger.Infof("CreateWaybillOnProviders orderID:%s userName:%s, courierVendorIDs:%v, excludeCourierVendorIDs:%v", order.VendorOrderID, userName, courierVendorIDs, excludeCourierVendorIDs) storeCourierList, err := dao.GetStoreCourierList(dao.GetDB(), jxutils.GetSaleStoreIDFromOrder(order), model.StoreStatusOpened) if err != nil { return nil, err @@ -24,8 +24,8 @@ func (c *BaseScheduler) CreateWaybillOnProviders(ctx *jxcontext.Context, order * excludeCourierVendorIDMap := jxutils.IntList2Map(excludeCourierVendorIDs) var errList []string for _, storeCourier := range storeCourierList { - if (courierVendorIDMap == nil || courierVendorIDMap[storeCourier.VendorID] == 1) && - (excludeCourierVendorIDMap == nil || excludeCourierVendorIDMap[storeCourier.VendorID] == 0) { + if (courierVendorIDs == nil || courierVendorIDMap[storeCourier.VendorID] == 1) && + (excludeCourierVendorIDs == nil || excludeCourierVendorIDMap[storeCourier.VendorID] == 0) { if handler := partner.GetDeliveryPlatformFromVendorID(storeCourier.VendorID); handler != nil && handler.Use4CreateWaybill { courierVendorID := storeCourier.VendorID if order.VendorID != model.VendorIDWSC || courierVendorID != model.VendorIDDada { // 达达作为微商城的自有配送,不参与配送竞争 diff --git a/business/jxstore/cms/store.go b/business/jxstore/cms/store.go index 25757d4d7..dd5ec5cc1 100644 --- a/business/jxstore/cms/store.go +++ b/business/jxstore/cms/store.go @@ -636,12 +636,12 @@ func UpdateStore(ctx *jxcontext.Context, storeID int, payload map[string]interfa dao.Commit(db) globals.SugarLogger.Debugf("UpdateStore track:%s, before call SyncStore", ctx.GetTrackInfo()) _, err = CurVendorSync.SyncStore(ctx, db, -1, store.ID, false, userName) - if err2 := updateCourierStores(ctx, storeID); err2 != nil && err == nil { - err = err2 - } if valid["tel1"] != nil { BindMobile2Store(ctx, utils.Interface2String(valid["tel1"]), storeID) } + if syncStatus&model.SyncFlagStoreAddress != 0 && valid["tel1"] != nil || valid["payeeName"] != nil { + updateCourierStores(ctx, storeID) + } } } else { dao.Commit(db) diff --git a/business/jxstore/cms/store_sku.go b/business/jxstore/cms/store_sku.go index 460fc02e9..993565b89 100644 --- a/business/jxstore/cms/store_sku.go +++ b/business/jxstore/cms/store_sku.go @@ -60,9 +60,10 @@ type StoreSkuExt struct { JdSyncStatus int8 `orm:"default(2)" json:"jdSyncStatus"` - EbaiSyncStatus int8 `orm:"default(2)" json:"ebaiSyncStatus"` - MtwmSyncStatus int8 `orm:"default(2)" json:"mtwmSyncStatus"` - WscSyncStatus int8 `orm:"default(2)" json:"wscSyncStatus"` + EbaiSyncStatus int8 `orm:"default(2)" json:"ebaiSyncStatus"` + MtwmSyncStatus int8 `orm:"default(2)" json:"mtwmSyncStatus"` + WscSyncStatus int8 `orm:"default(2)" json:"wscSyncStatus"` + AutoSaleAt time.Time `orm:"type(datetime);null" json:"autoSaleAt"` ActPrice int `json:"actPrice"` EarningPrice int `json:"earningPrice"` @@ -385,7 +386,7 @@ func GetStoresSkusNew(ctx *jxcontext.Context, storeIDs, skuIDs []int, isFocus bo t4.created_at bind_created_at, t4.updated_at bind_updated_at, t4.last_operator bind_last_operator, t4.deleted_at bind_deleted_at, t4.sub_store_id, t4.price bind_price, IF(t4.unit_price IS NOT NULL, t4.unit_price, t1.price) unit_price, t4.status bind_status, t4.ebai_id, t4.mtwm_id, t4.wsc_id, t4.wsc_id2, - t4.jd_sync_status, t4.ebai_sync_status, t4.mtwm_sync_status, t4.wsc_sync_status + t4.jd_sync_status, t4.ebai_sync_status, t4.mtwm_sync_status, t4.wsc_sync_status, t4.auto_sale_at ` + sql var tmpList []*tGetStoresSkusInfo beginTime := time.Now() @@ -1241,6 +1242,7 @@ func updateStoresSkusWithoutSync(ctx *jxcontext.Context, db *dao.DaoDB, storeIDs // setStoreSkuBindStatus(skuBind, model.SyncFlagModifiedMask) dao.WrapUpdateULEntity(skuBind, userName) + skuBind.AutoSaleAt = utils.DefaultTimeValue if num, err = dao.UpdateEntity(db, skuBind /*, utils.Map2KeySlice(updateFieldMap)...*/); err != nil { dao.Rollback(db) return nil, err @@ -1318,10 +1320,10 @@ func updateStoreSkusSaleWithoutSync(ctx *jxcontext.Context, storeID int, skuBind model.FieldElmSyncStatus: skuBind.ElmSyncStatus | model.SyncFlagSaleMask, model.FieldWscSyncStatus: skuBind.WscSyncStatus | model.SyncFlagSaleMask, } - // if utils.IsTimeZero(autoSaleTime) || skuBind.Status == model.SkuStatusNormal { - // autoSaleTime = utils.DefaultTimeValue - // } - // kvs["AutoSaleAt"] = autoSaleTime + if utils.IsTimeZero(autoSaleTime) || skuBind.Status == model.SkuStatusNormal { + autoSaleTime = utils.DefaultTimeValue + } + kvs["AutoSaleAt"] = autoSaleTime if num, err = dao.UpdateEntityLogically(db, skuBind, kvs, userName, nil); err != nil { dao.Rollback(db) return nil, err @@ -2178,35 +2180,35 @@ func GetMissingStoreSkuFromOrder(ctx *jxcontext.Context, fromTime time.Time) (mi } func AutoSaleStoreSku(ctx *jxcontext.Context, storeIDs []int) (err error) { - // db := dao.GetDB() - // storeSkuList, err := dao.GetAutoSaleStoreSku(db, storeIDs) - // if err != nil { - // return err - // } - // storeSkuMap := make(map[int][]*model.StoreSkuBind) - // for _, v := range storeSkuList { - // storeSkuMap[v.StoreID] = append(storeSkuMap[v.StoreID], v) - // } - // now := time.Now() - // for storeID, storeSkuList := range storeSkuMap { - // var skuIDs []int - // for _, storeSku := range storeSkuList { - // if now.Sub(storeSku.AutoSaleAt) > 0 { - // storeSku.AutoSaleAt = utils.DefaultTimeValue - // if storeSku.Status != model.SkuStatusNormal { - // storeSku.Status = model.SkuStatusNormal - // skuIDs = append(skuIDs, storeSku.SkuID) - // } - // if _, err = dao.UpdateEntity(db, storeSku, "AutoSaleAt", model.FieldStatus); err != nil { - // return err - // } - // } - // } - // if len(skuIDs) > 0 { - // if _, err = CurVendorSync.SyncStoresSkus(ctx, db, nil, []int{storeID}, skuIDs, false, true, true); err != nil { - // return err - // } - // } - // } + db := dao.GetDB() + storeSkuList, err := dao.GetAutoSaleStoreSku(db, storeIDs) + if err != nil { + return err + } + storeSkuMap := make(map[int][]*model.StoreSkuBind) + for _, v := range storeSkuList { + storeSkuMap[v.StoreID] = append(storeSkuMap[v.StoreID], v) + } + now := time.Now() + for storeID, storeSkuList := range storeSkuMap { + var skuIDs []int + for _, storeSku := range storeSkuList { + if now.Sub(storeSku.AutoSaleAt) > 0 { + storeSku.AutoSaleAt = utils.DefaultTimeValue + if storeSku.Status != model.SkuStatusNormal { + storeSku.Status = model.SkuStatusNormal + skuIDs = append(skuIDs, storeSku.SkuID) + } + if _, err = dao.UpdateEntity(db, storeSku, "AutoSaleAt", model.FieldStatus); err != nil { + return err + } + } + } + if len(skuIDs) > 0 { + if _, err = CurVendorSync.SyncStoresSkus(ctx, db, nil, []int{storeID}, skuIDs, false, true, true); err != nil { + return err + } + } + } return err } diff --git a/business/jxstore/cms/store_sku_check_test.go b/business/jxstore/cms/store_sku_check_test.go index 033524de8..ef06962e8 100644 --- a/business/jxstore/cms/store_sku_check_test.go +++ b/business/jxstore/cms/store_sku_check_test.go @@ -6,28 +6,4 @@ import ( func TestCheckSkuDiffBetweenJxAndVendor(t *testing.T) { CheckSkuDiffBetweenJxAndVendor() -} - -func TestTestExcel(t *testing.T) { - data1 := []DiffData{ - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - } - data2 := []DiffData{ - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - } - data3 := []DiffData{ - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"}, - } - data := map[int][]DiffData{ - 0: data1, - 1: data2, - 3: data3, - } - WriteToExcel(data) -} +} \ No newline at end of file diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 809a56fc2..ee11c0c48 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "reflect" - "strings" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" @@ -421,19 +420,19 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) }, isContinueWhenError) if task != nil { - if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil { - platformList := make([]string, len(task.GetDetailErrList())) - for k, v := range task.GetDetailErrList() { - if vendorErr := partner.IsErrVendorError(v); vendorErr != nil { - platformList[k] = model.VendorChineseNames[vendorErr.VendorID()] - } else { - platformList[k] = "未知" - } - } - err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ",")) - } else { - err = makeSyncError(err) - } + // if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil { + // platformList := make([]string, len(task.GetDetailErrList())) + // for k, v := range task.GetDetailErrList() { + // if vendorErr := partner.IsErrVendorError(v); vendorErr != nil { + // platformList[k] = model.VendorChineseNames[vendorErr.VendorID()] + // } else { + // platformList[k] = "未知" + // } + // } + // err = fmt.Errorf("同步价格失败\n失败平台:%s", strings.Join(platformList, ",")) + // } else { + // } + err = makeSyncError(err) } return hint, err } diff --git a/business/jxstore/misc/misc.go b/business/jxstore/misc/misc.go index 4a74bb8ac..63c4be009 100644 --- a/business/jxstore/misc/misc.go +++ b/business/jxstore/misc/misc.go @@ -47,13 +47,26 @@ var ( autoEnableStoreSkuTimeList = []string{ "7:00:00", + "8:00:00", + "9:00:00", + "10:00:00", + "11:00:00", + "12:00:00", + "13:00:00", "14:00:00", + "15:00:00", + "16:00:00", + "17:00:00", + "18:00:00", + "19:00:00", + "20:00:00", + "21:00:00", "22:00:00", } ) func Init() { - if globals.ReallyCallPlatformAPI { + if globals.IsProductEnv() { ScheduleTimerFunc(doDailyWork, dailyWorkTimeList) ScheduleTimerFuncByInterval(func() { @@ -76,11 +89,10 @@ func Init() { ScheduleTimerFunc(func() { dao.UpdateActStatusByTime(dao.GetDB(), time.Now().Add(-48*time.Hour)) }, updateActStatusTimeList) - - // ScheduleTimerFunc(func() { - // cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil) - // }, autoEnableStoreSkuTimeList) } + ScheduleTimerFunc(func() { + cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil) + }, autoEnableStoreSkuTimeList) } func doDailyWork() { diff --git a/business/jxstore/misc/misc2.go b/business/jxstore/misc/misc2.go index a8d23957e..d7da6c7a8 100644 --- a/business/jxstore/misc/misc2.go +++ b/business/jxstore/misc/misc2.go @@ -10,6 +10,7 @@ import ( "git.rosy.net.cn/jx-callback/business/partner/putils" "git.rosy.net.cn/baseapi" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" + "git.rosy.net.cn/jx-callback/globals" ) const ( @@ -23,10 +24,16 @@ const ( ) var ( - startOpStoreTimeList = []string { + startOpStoreTimeListJXCS = []string { + "22:10:00", + } + endOpStoreTimeListJXCS = []string { + "06:10:00", + } + startOpStoreTimeListJXGY = []string { "22:00:00", } - endOpStoreTimeList = []string { + endOpStoreTimeListJXGY = []string { "06:00:00", } vendorList = map[int]bool { @@ -72,7 +79,7 @@ func FilterSkuNameList(storeSkuNameList []*partner.SkuNameInfo) (filterStoreSkuN return filterStoreSkuNameList } -func StartOrEndOpStore(isStart bool, startTime, endTime int16) { +func StartOrEndOpStore(isStart bool, startTime, endTime int16, isAsync, isContinueWhenError bool) (retVal interface{}, err error) { startProcessTime := time.Now().Unix() baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now()) ctx := jxcontext.AdminCtx @@ -97,7 +104,7 @@ func StartOrEndOpStore(isStart bool, startTime, endTime int16) { vendorStoreID := utils.Interface2String(vendorListValue["vendorStoreID"]) baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID) singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) - storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, nil, storeID, vendorStoreID, nil) + storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil) if err != nil { baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID) } else { @@ -130,24 +137,40 @@ func StartOrEndOpStore(isStart bool, startTime, endTime int16) { } return retVal, err } - task := tasksch.NewParallelTask("StartOrEndOpStore", nil, ctx, taskFunc, storeInfo.Stores) + task := tasksch.NewParallelTask("StartOrEndOpStore", tasksch.NewParallelConfig().SetParallelCount(4), ctx, taskFunc, storeInfo.Stores) tasksch.HandleTask(task, nil, true).Run() - _, err = task.GetResult(0) - if err != nil { - baseapi.SugarLogger.Debugf("StartOrEndOpStore tasksch error:%v", err) + if isAsync { + retVal = task.ID + } else { + _, err = task.GetResult(0) + if err != nil { + baseapi.SugarLogger.Debugf("StartOrEndOpStore tasksch error:%v", err) + } + retVal = "1" } } endProcessTime := time.Now().Unix() diff := endProcessTime - startProcessTime baseapi.SugarLogger.Debugf("StartOrEndOpStore end time: %v", time.Now()) baseapi.SugarLogger.Debugf("StartOrEndOpStore cost time: %d sec", diff) + + return retVal, err } func InitEx() { - ScheduleTimerFunc(func() { - StartOrEndOpStore(true, 0, 0) - }, startOpStoreTimeList) - ScheduleTimerFunc(func() { - StartOrEndOpStore(false, 0, 0) - }, endOpStoreTimeList) + if globals.IsMainProductEnv() { + ScheduleTimerFunc(func() { + StartOrEndOpStore(true, 0, 0, false, true) + }, startOpStoreTimeListJXCS) + ScheduleTimerFunc(func() { + StartOrEndOpStore(false, 0, 0, false, true) + }, endOpStoreTimeListJXCS) + } else { + ScheduleTimerFunc(func() { + StartOrEndOpStore(true, 0, 0, false, true) + }, startOpStoreTimeListJXGY) + ScheduleTimerFunc(func() { + StartOrEndOpStore(false, 0, 0, false, true) + }, endOpStoreTimeListJXGY) + } } \ No newline at end of file diff --git a/business/jxutils/tasksch/parallel_task.go b/business/jxutils/tasksch/parallel_task.go index 626df2e3e..bc59b1a5e 100644 --- a/business/jxutils/tasksch/parallel_task.go +++ b/business/jxutils/tasksch/parallel_task.go @@ -2,7 +2,6 @@ package tasksch import ( "errors" - "time" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" @@ -24,13 +23,11 @@ type ParallelConfig struct { ParallelCount int BatchSize int IsContinueWhenError bool - ResultHandler ResultHandlerFunc } type ParallelTask struct { BaseTask - resultHandler ResultHandlerFunc worker WorkFunc jobList [][]interface{} taskChan chan []interface{} @@ -49,7 +46,6 @@ func NewParallelConfig() *ParallelConfig { IsContinueWhenError: false, ParallelCount: DefParallelCount, BatchSize: 1, - ResultHandler: nil, } } @@ -73,11 +69,6 @@ func (c *ParallelConfig) SetIsContinueWhenError(isContinueWhenError bool) *Paral // return c // } -func (c *ParallelConfig) SetResultHandler(resultHandler ResultHandlerFunc) *ParallelConfig { - c.ResultHandler = resultHandler - return c -} - func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { if config == nil { config = NewParallelConfig() @@ -97,7 +88,6 @@ func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Con task := &ParallelTask{ subFinishChan: make(chan interface{}, config.ParallelCount), taskChan: make(chan []interface{}, len(realItemList)), - resultHandler: config.ResultHandler, worker: worker, jobList: jobList, } @@ -135,7 +125,7 @@ func (task *ParallelTask) Run() { } else { globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) task.locker.Lock() - task.detailErrList = append(task.detailErrList, err) + task.batchErrList = append(task.batchErrList, err) task.locker.Unlock() if !task.IsContinueWhenError { // 出错 chanRetVal = err @@ -175,37 +165,14 @@ func (task *ParallelTask) Run() { taskResult = append(taskResult, resultList...) } } - task.locker.Lock() - if taskErr != nil { // 如果有错误,肯定就是失败了 - task.Status = TaskStatusFailed - } else { - if len(task.taskChan) > 0 { - taskErr = ErrTaskIsCanceled - task.Status = TaskStatusCanceled - } else { - task.Status = TaskStatusFinished - } - } - if taskErr != nil { - task.OriginalErr = taskErr - task.Err = NewTaskError(task.Name, taskErr) - } else { - if len(task.detailErrList) > 0 { - task.OriginalErr = task.detailErrList[0] - } - task.Err = task.buildTaskErrFromDetail() - } task.Result = taskResult - task.TerminatedAt = time.Now() - task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放 + task.mainErr = taskErr task.locker.Unlock() - globals.SugarLogger.Debugf("ParallelTask.Run %s, err:%v", task.Name, task.Err) - close(task.subFinishChan) - if task.resultHandler != nil { - task.resultHandler(task.Name, taskResult, task.Err) - } + close(task.subFinishChan) + task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放 + }) } diff --git a/business/jxutils/tasksch/sequence_task.go b/business/jxutils/tasksch/sequence_task.go index 5ec16d942..6c25b26d5 100644 --- a/business/jxutils/tasksch/sequence_task.go +++ b/business/jxutils/tasksch/sequence_task.go @@ -1,8 +1,6 @@ package tasksch import ( - "time" - "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/globals" @@ -45,7 +43,7 @@ func (task *SeqTask) Run() { task.finishedOneJob(1, err) if taskErr = err; taskErr != nil { task.locker.Lock() - task.detailErrList = append(task.detailErrList, err) + task.batchErrList = append(task.batchErrList, err) task.locker.Unlock() globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) if !task.IsContinueWhenError { @@ -57,29 +55,9 @@ func (task *SeqTask) Run() { } EndFor: task.locker.Lock() - if taskErr != nil { // 如果有错误,肯定就是失败了 - task.Status = TaskStatusFailed - } else { - if task.FinishedJobCount < task.TotalJobCount { - taskErr = ErrTaskIsCanceled - task.Status = TaskStatusCanceled - } else { - task.Status = TaskStatusFinished - } - } - if taskErr != nil { - task.OriginalErr = taskErr - task.Err = NewTaskError(task.Name, taskErr) - } else { - if len(task.detailErrList) > 0 { - task.OriginalErr = task.detailErrList[0] - } - task.Err = task.buildTaskErrFromDetail() - } task.Result = taskResult - task.TerminatedAt = time.Now() + task.mainErr = taskErr task.locker.Unlock() - globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, task.Err) }) } diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index d418d2df9..8363ab9d5 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -3,7 +3,6 @@ package tasksch import ( "encoding/json" "fmt" - "strings" "sync" "time" @@ -58,35 +57,37 @@ type ITask interface { AddChild(task ITask) ITask GetChildren() TaskList SetParent(parentTask ITask) - GetOriginalErr() error - GetDetailErrList() []error + // GetOriginalErr() error + GetErr() error + // GetDetailErrList() []error + GetLeafResult() (finishedItemCount, failedItemCount int) json.Marshaler } -type TaskError struct { - name string - errStr string -} +// type TaskError struct { +// name string +// errStr string +// } -func (t *TaskError) MarshalJSON() ([]byte, error) { - return json.Marshal(t.Error()) -} +// func (t *TaskError) MarshalJSON() ([]byte, error) { +// return json.Marshal(t.Error()) +// } -func (t *TaskError) Error() string { - return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr) -} +// func (t *TaskError) Error() string { +// return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr) +// } -func (t *TaskError) String() string { - return t.Error() -} +// func (t *TaskError) String() string { +// return t.Error() +// } -func NewTaskError(name string, err error) *TaskError { - return &TaskError{ - name: name, - errStr: err.Error(), - } -} +// func NewTaskError(name string, err error) *TaskError { +// return &TaskError{ +// name: name, +// errStr: err.Error(), +// } +// } type BaseTask struct { Name string `json:"name"` @@ -108,12 +109,12 @@ type BaseTask struct { NoticeMsg string `json:"noticeMsg"` - Result []interface{} `json:"-"` - Children TaskList `json:"children"` - Err error `json:"err"` - OriginalErr error `json:"-"` + Result []interface{} `json:"-"` + Children TaskList `json:"children"` + Err string `json:"err"` - detailErrList []error + mainErr error + batchErrList []error finishChan chan struct{} C <-chan struct{} `json:"-"` @@ -168,7 +169,7 @@ func (t *BaseTask) GetID() string { func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { if t.GetStatus() >= TaskStatusEndBegin { - return t.Result, t.OriginalErr + return t.getResult(), t.GetErr() } if duration == 0 { duration = time.Hour * 10000 // duration为0表示无限等待 @@ -178,7 +179,7 @@ func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err case <-t.finishChan: t.isGetResultCalled = true timer.Stop() - return t.Result, t.OriginalErr + return t.getResult(), t.GetErr() case <-timer.C: } return nil, ErrTaskNotFinished @@ -276,18 +277,75 @@ func (t *BaseTask) GetNoticeMsg() string { return t.NoticeMsg } -func (t *BaseTask) GetOriginalErr() error { +func (t *BaseTask) getResult() []interface{} { t.locker.RLock() defer t.locker.RUnlock() - return t.OriginalErr + return t.Result } -func (t *BaseTask) GetDetailErrList() []error { +// func (t *BaseTask) GetOriginalErr() error { +// t.locker.RLock() +// defer t.locker.RUnlock() +// return nil +// } + +func (t *BaseTask) GetErr() error { t.locker.RLock() defer t.locker.RUnlock() - return t.detailErrList + if t.mainErr == nil && len(t.batchErrList) == 0 { + return nil + } + return t } +func (t *BaseTask) GetLeafResult() (finishedItemCount, failedItemCount int) { + if len(t.Children) == 0 { + return t.FinishedItemCount, t.FailedItemCount + } + for _, v := range t.Children { + subFinishedItemCount, subFailedItemCount := v.GetLeafResult() + finishedItemCount += subFinishedItemCount + failedItemCount += subFailedItemCount + } + return finishedItemCount, failedItemCount +} + +func (t *BaseTask) Error() (errMsg string) { + if t.mainErr == nil && len(t.batchErrList) == 0 { + return "" + } + t.locker.RLock() + errMsg = t.Err + t.locker.RUnlock() + if errMsg != "" { + return errMsg + } + errMsg = "任务:" + t.Name + if t.parent == nil { + finishedItemCount, failedItemCount := t.GetLeafResult() + errMsg += fmt.Sprintf(", 全部总共:%d, 成功:%d, 失败:%d,\n", (finishedItemCount + failedItemCount), finishedItemCount, failedItemCount) + } + if t.mainErr != nil { + errMsg += "失败" + errMsg += "," + t.mainErr.Error() + } else { + errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount) + for _, v := range t.batchErrList { + errMsg += fmt.Sprintf("%s,\n", v.Error()) + } + } + t.locker.Lock() + t.Err = errMsg + t.locker.Unlock() + return errMsg +} + +// func (t *BaseTask) GetDetailErrList() []error { +// t.locker.RLock() +// defer t.locker.RUnlock() +// return t.batchErrList +// } + func AddChild(parentTask ITask, task ITask) ITask { if parentTask != nil { return parentTask.AddChild(task) @@ -322,11 +380,31 @@ func (t *BaseTask) run(taskHandler func()) { }() taskHandler() + + task := t + task.locker.Lock() + if task.mainErr != nil { // 如果有错误,肯定就是失败了 + task.Status = TaskStatusFailed + } else { + if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount { + task.mainErr = ErrTaskIsCanceled + task.Status = TaskStatusCanceled + } else { + task.Status = TaskStatusFinished + } + } + task.TerminatedAt = time.Now() + task.locker.Unlock() + task.Error() + + globals.SugarLogger.Debugf("Task:%s, result:%v, err:%v", task.Name, task.Result, task.mainErr) + select { case <-t.quitChan: default: close(t.quitChan) } + // todo 如下代码可能有对t.Children操作的并发问题 for _, subTask := range t.Children { if _, err := subTask.GetResult(0); err != nil { globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) @@ -340,18 +418,14 @@ func (t *BaseTask) run(taskHandler func()) { if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息 var content string taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt)) - if t.Err == nil { + if t.mainErr == nil { content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status]) noticeMsg := t.GetNoticeMsg() if noticeMsg != "" { content += ",通知消息:" + noticeMsg } } else { - if t.Status == TaskStatusFinished { - content = fmt.Sprintf("%s执行部分失败,%s", taskDesc, t.Err.Error()) - } else { - content = fmt.Sprintf("%s执行失败,%s", taskDesc, t.Err.Error()) - } + content = t.Error() } msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content) } @@ -381,16 +455,16 @@ func (t *BaseTask) setStatus(status int) { t.Status = status } -func (t *BaseTask) buildTaskErrFromDetail() (err error) { - if len(t.detailErrList) > 0 { - strList := make([]string, len(t.detailErrList)) - for k, v := range t.detailErrList { - strList[k] = v.Error() - } - return NewTaskError(t.Name, fmt.Errorf("总共:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) - } - return nil -} +// func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) { +// if len(t.batchErrList) > 0 { +// strList := make([]string, len(t.batchErrList)) +// for k, v := range t.batchErrList { +// strList[k] = v.Error() +// } +// return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) +// } +// return nil +// } func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) { defer func() { diff --git a/business/jxutils/tasksch/task_test.go b/business/jxutils/tasksch/task_test.go index 788716526..8b77e5b4b 100644 --- a/business/jxutils/tasksch/task_test.go +++ b/business/jxutils/tasksch/task_test.go @@ -1,15 +1,11 @@ package tasksch import ( - "errors" - "fmt" "testing" - - "git.rosy.net.cn/baseapi/utils" ) func TestTaskError(t *testing.T) { - err := NewTaskError("test", errors.New("hello")) - fmt.Println(utils.Format4Output(err, false)) - fmt.Println(err.Error()) + // err := NewTaskError("test", errors.New("hello")) + // fmt.Println(utils.Format4Output(err, false)) + // fmt.Println(err.Error()) } diff --git a/business/model/store_sku.go b/business/model/store_sku.go index 4e1cfa3fa..4b9ae58b3 100644 --- a/business/model/store_sku.go +++ b/business/model/store_sku.go @@ -1,5 +1,7 @@ package model +import "time" + const ( StoreSkuBindStatusNA = -2 StoreSkuBindStatusDeleted = -1 @@ -104,7 +106,7 @@ type StoreSkuBind struct { MtwmSyncStatus int8 `orm:"default(2)"` WscSyncStatus int8 `orm:"default(2)"` - // AutoSaleAt time.Time `orm:"type(datetime);null" json:"autoSaleAt"` + AutoSaleAt time.Time `orm:"type(datetime);null" json:"autoSaleAt"` } func (*StoreSkuBind) TableUnique() [][]string { @@ -116,6 +118,7 @@ func (*StoreSkuBind) TableUnique() [][]string { func (*StoreSkuBind) TableIndex() [][]string { return [][]string{ []string{"SkuID", "StoreID", "DeletedAt"}, + // []string{"AutoSaleAt", "DeletedAt", "StoreID"}, } } diff --git a/business/partner/purchase/jd/sku.go b/business/partner/purchase/jd/sku.go index 2e3365b8c..16f52a6ce 100644 --- a/business/partner/purchase/jd/sku.go +++ b/business/partner/purchase/jd/sku.go @@ -583,7 +583,7 @@ func (p *PurchaseHandler) GetVendorCategories(ctx *jxcontext.Context) (vendorCat func (p *PurchaseHandler) GetSkus(ctx *jxcontext.Context, skuID int, vendorSkuID, skuName string) (skuNameList []*partner.SkuNameInfo, err error) { param := &jdapi.QuerySkuParam{ - SkuID: utils.Str2Int64(vendorSkuID), + SkuID: utils.Str2Int64WithDefault(vendorSkuID, 0), SkuName: skuName, IsFilterDel: jdapi.IsFilterDelTrue, PageNo: 1, diff --git a/controllers/temp_op.go b/controllers/temp_op.go index bb1d463da..f92a2dcef 100644 --- a/controllers/temp_op.go +++ b/controllers/temp_op.go @@ -276,16 +276,18 @@ func (c *TempOpController) TestIt() { // @Title 开启或结束所有平台商店的额外时间 // @Description 开启或结束所有平台商店的额外时间,并且修改所有商品库存为0或99999(开启:0 , 结束:99999) -// @Param token header string true "认证token" -// @Param isStart query bool true "开启或结束门店" -// @Param startTime query int false "开始营业时间(格式:930代表早上9点30分)" -// @Param endTime query int false "结束营业时间" +// @Param token header string true "认证token" +// @Param startOrEndStore query bool true "开启或结束" +// @Param startTime query int false "开始营业时间(格式:930代表早上9点30分)" +// @Param endTime query int false "结束营业时间" +// @Param isAsync query bool false "是否异步操作" +// @Param isContinueWhenError query bool false "单个同步失败是否继续,缺省false" // @Success 200 {object} controllers.CallResult // @Failure 200 {object} controllers.CallResult // @router /TestStartOrEndOpStore [get] func (c *TempOpController) TestStartOrEndOpStore() { c.callTestStartOrEndOpStore(func(params *tTempopTestStartOrEndOpStoreParams) (retVal interface{}, errCode string, err error) { - misc.StartOrEndOpStore(params.IsStart, int16(params.StartTime), int16(params.EndTime)) + retVal, err = misc.StartOrEndOpStore(params.StartOrEndStore, int16(params.StartTime), int16(params.EndTime), params.IsAsync, params.IsContinueWhenError) return retVal, "", err }) } diff --git a/main.go b/main.go index 81aa12ff6..b0e563a2a 100644 --- a/main.go +++ b/main.go @@ -56,9 +56,9 @@ func Init() { if globals.IsProductEnv() { ebai.CurPurchaseHandler.StartRefreshComment() - misc.Init() misc.InitEx() } + misc.Init() } // 返回true表示非运行服务