同步修改

This commit is contained in:
Rosy-zhudan
2019-08-01 17:05:09 +08:00
15 changed files with 260 additions and 228 deletions

View File

@@ -15,7 +15,7 @@ import (
func (c *BaseScheduler) CreateWaybillOnProviders(ctx *jxcontext.Context, order *model.GoodsOrder, courierVendorIDs, excludeCourierVendorIDs []int, policyHandler partner.CreateWaybillPolicyFunc, createOnlyOne bool) (bills []*model.Waybill, err error) { func (c *BaseScheduler) CreateWaybillOnProviders(ctx *jxcontext.Context, order *model.GoodsOrder, courierVendorIDs, excludeCourierVendorIDs []int, policyHandler partner.CreateWaybillPolicyFunc, createOnlyOne bool) (bills []*model.Waybill, err error) {
userName := ctx.GetUserName() userName := ctx.GetUserName()
globals.SugarLogger.Infof("CreateWaybillOnProviders orderID:%s userName:%s", order.VendorOrderID, userName) globals.SugarLogger.Infof("CreateWaybillOnProviders orderID:%s userName:%s, courierVendorIDs:%v, excludeCourierVendorIDs:%v", order.VendorOrderID, userName, courierVendorIDs, excludeCourierVendorIDs)
storeCourierList, err := dao.GetStoreCourierList(dao.GetDB(), jxutils.GetSaleStoreIDFromOrder(order), model.StoreStatusOpened) storeCourierList, err := dao.GetStoreCourierList(dao.GetDB(), jxutils.GetSaleStoreIDFromOrder(order), model.StoreStatusOpened)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -24,8 +24,8 @@ func (c *BaseScheduler) CreateWaybillOnProviders(ctx *jxcontext.Context, order *
excludeCourierVendorIDMap := jxutils.IntList2Map(excludeCourierVendorIDs) excludeCourierVendorIDMap := jxutils.IntList2Map(excludeCourierVendorIDs)
var errList []string var errList []string
for _, storeCourier := range storeCourierList { for _, storeCourier := range storeCourierList {
if (courierVendorIDMap == nil || courierVendorIDMap[storeCourier.VendorID] == 1) && if (courierVendorIDs == nil || courierVendorIDMap[storeCourier.VendorID] == 1) &&
(excludeCourierVendorIDMap == nil || excludeCourierVendorIDMap[storeCourier.VendorID] == 0) { (excludeCourierVendorIDs == nil || excludeCourierVendorIDMap[storeCourier.VendorID] == 0) {
if handler := partner.GetDeliveryPlatformFromVendorID(storeCourier.VendorID); handler != nil && handler.Use4CreateWaybill { if handler := partner.GetDeliveryPlatformFromVendorID(storeCourier.VendorID); handler != nil && handler.Use4CreateWaybill {
courierVendorID := storeCourier.VendorID courierVendorID := storeCourier.VendorID
if order.VendorID != model.VendorIDWSC || courierVendorID != model.VendorIDDada { // 达达作为微商城的自有配送,不参与配送竞争 if order.VendorID != model.VendorIDWSC || courierVendorID != model.VendorIDDada { // 达达作为微商城的自有配送,不参与配送竞争

View File

@@ -636,12 +636,12 @@ func UpdateStore(ctx *jxcontext.Context, storeID int, payload map[string]interfa
dao.Commit(db) dao.Commit(db)
globals.SugarLogger.Debugf("UpdateStore track:%s, before call SyncStore", ctx.GetTrackInfo()) globals.SugarLogger.Debugf("UpdateStore track:%s, before call SyncStore", ctx.GetTrackInfo())
_, err = CurVendorSync.SyncStore(ctx, db, -1, store.ID, false, userName) _, err = CurVendorSync.SyncStore(ctx, db, -1, store.ID, false, userName)
if err2 := updateCourierStores(ctx, storeID); err2 != nil && err == nil {
err = err2
}
if valid["tel1"] != nil { if valid["tel1"] != nil {
BindMobile2Store(ctx, utils.Interface2String(valid["tel1"]), storeID) BindMobile2Store(ctx, utils.Interface2String(valid["tel1"]), storeID)
} }
if syncStatus&model.SyncFlagStoreAddress != 0 && valid["tel1"] != nil || valid["payeeName"] != nil {
updateCourierStores(ctx, storeID)
}
} }
} else { } else {
dao.Commit(db) dao.Commit(db)

View File

@@ -63,6 +63,7 @@ type StoreSkuExt struct {
EbaiSyncStatus int8 `orm:"default(2)" json:"ebaiSyncStatus"` EbaiSyncStatus int8 `orm:"default(2)" json:"ebaiSyncStatus"`
MtwmSyncStatus int8 `orm:"default(2)" json:"mtwmSyncStatus"` MtwmSyncStatus int8 `orm:"default(2)" json:"mtwmSyncStatus"`
WscSyncStatus int8 `orm:"default(2)" json:"wscSyncStatus"` WscSyncStatus int8 `orm:"default(2)" json:"wscSyncStatus"`
AutoSaleAt time.Time `orm:"type(datetime);null" json:"autoSaleAt"`
ActPrice int `json:"actPrice"` ActPrice int `json:"actPrice"`
EarningPrice int `json:"earningPrice"` EarningPrice int `json:"earningPrice"`
@@ -385,7 +386,7 @@ func GetStoresSkusNew(ctx *jxcontext.Context, storeIDs, skuIDs []int, isFocus bo
t4.created_at bind_created_at, t4.updated_at bind_updated_at, t4.last_operator bind_last_operator, t4.deleted_at bind_deleted_at, t4.created_at bind_created_at, t4.updated_at bind_updated_at, t4.last_operator bind_last_operator, t4.deleted_at bind_deleted_at,
t4.sub_store_id, t4.price bind_price, IF(t4.unit_price IS NOT NULL, t4.unit_price, t1.price) unit_price, t4.status bind_status, t4.sub_store_id, t4.price bind_price, IF(t4.unit_price IS NOT NULL, t4.unit_price, t1.price) unit_price, t4.status bind_status,
t4.ebai_id, t4.mtwm_id, t4.wsc_id, t4.wsc_id2, t4.ebai_id, t4.mtwm_id, t4.wsc_id, t4.wsc_id2,
t4.jd_sync_status, t4.ebai_sync_status, t4.mtwm_sync_status, t4.wsc_sync_status t4.jd_sync_status, t4.ebai_sync_status, t4.mtwm_sync_status, t4.wsc_sync_status, t4.auto_sale_at
` + sql ` + sql
var tmpList []*tGetStoresSkusInfo var tmpList []*tGetStoresSkusInfo
beginTime := time.Now() beginTime := time.Now()
@@ -1241,6 +1242,7 @@ func updateStoresSkusWithoutSync(ctx *jxcontext.Context, db *dao.DaoDB, storeIDs
// setStoreSkuBindStatus(skuBind, model.SyncFlagModifiedMask) // setStoreSkuBindStatus(skuBind, model.SyncFlagModifiedMask)
dao.WrapUpdateULEntity(skuBind, userName) dao.WrapUpdateULEntity(skuBind, userName)
skuBind.AutoSaleAt = utils.DefaultTimeValue
if num, err = dao.UpdateEntity(db, skuBind /*, utils.Map2KeySlice(updateFieldMap)...*/); err != nil { if num, err = dao.UpdateEntity(db, skuBind /*, utils.Map2KeySlice(updateFieldMap)...*/); err != nil {
dao.Rollback(db) dao.Rollback(db)
return nil, err return nil, err
@@ -1318,10 +1320,10 @@ func updateStoreSkusSaleWithoutSync(ctx *jxcontext.Context, storeID int, skuBind
model.FieldElmSyncStatus: skuBind.ElmSyncStatus | model.SyncFlagSaleMask, model.FieldElmSyncStatus: skuBind.ElmSyncStatus | model.SyncFlagSaleMask,
model.FieldWscSyncStatus: skuBind.WscSyncStatus | model.SyncFlagSaleMask, model.FieldWscSyncStatus: skuBind.WscSyncStatus | model.SyncFlagSaleMask,
} }
// if utils.IsTimeZero(autoSaleTime) || skuBind.Status == model.SkuStatusNormal { if utils.IsTimeZero(autoSaleTime) || skuBind.Status == model.SkuStatusNormal {
// autoSaleTime = utils.DefaultTimeValue autoSaleTime = utils.DefaultTimeValue
// } }
// kvs["AutoSaleAt"] = autoSaleTime kvs["AutoSaleAt"] = autoSaleTime
if num, err = dao.UpdateEntityLogically(db, skuBind, kvs, userName, nil); err != nil { if num, err = dao.UpdateEntityLogically(db, skuBind, kvs, userName, nil); err != nil {
dao.Rollback(db) dao.Rollback(db)
return nil, err return nil, err
@@ -2178,35 +2180,35 @@ func GetMissingStoreSkuFromOrder(ctx *jxcontext.Context, fromTime time.Time) (mi
} }
func AutoSaleStoreSku(ctx *jxcontext.Context, storeIDs []int) (err error) { func AutoSaleStoreSku(ctx *jxcontext.Context, storeIDs []int) (err error) {
// db := dao.GetDB() db := dao.GetDB()
// storeSkuList, err := dao.GetAutoSaleStoreSku(db, storeIDs) storeSkuList, err := dao.GetAutoSaleStoreSku(db, storeIDs)
// if err != nil { if err != nil {
// return err return err
// } }
// storeSkuMap := make(map[int][]*model.StoreSkuBind) storeSkuMap := make(map[int][]*model.StoreSkuBind)
// for _, v := range storeSkuList { for _, v := range storeSkuList {
// storeSkuMap[v.StoreID] = append(storeSkuMap[v.StoreID], v) storeSkuMap[v.StoreID] = append(storeSkuMap[v.StoreID], v)
// } }
// now := time.Now() now := time.Now()
// for storeID, storeSkuList := range storeSkuMap { for storeID, storeSkuList := range storeSkuMap {
// var skuIDs []int var skuIDs []int
// for _, storeSku := range storeSkuList { for _, storeSku := range storeSkuList {
// if now.Sub(storeSku.AutoSaleAt) > 0 { if now.Sub(storeSku.AutoSaleAt) > 0 {
// storeSku.AutoSaleAt = utils.DefaultTimeValue storeSku.AutoSaleAt = utils.DefaultTimeValue
// if storeSku.Status != model.SkuStatusNormal { if storeSku.Status != model.SkuStatusNormal {
// storeSku.Status = model.SkuStatusNormal storeSku.Status = model.SkuStatusNormal
// skuIDs = append(skuIDs, storeSku.SkuID) skuIDs = append(skuIDs, storeSku.SkuID)
// } }
// if _, err = dao.UpdateEntity(db, storeSku, "AutoSaleAt", model.FieldStatus); err != nil { if _, err = dao.UpdateEntity(db, storeSku, "AutoSaleAt", model.FieldStatus); err != nil {
// return err return err
// } }
// } }
// } }
// if len(skuIDs) > 0 { if len(skuIDs) > 0 {
// if _, err = CurVendorSync.SyncStoresSkus(ctx, db, nil, []int{storeID}, skuIDs, false, true, true); err != nil { if _, err = CurVendorSync.SyncStoresSkus(ctx, db, nil, []int{storeID}, skuIDs, false, true, true); err != nil {
// return err return err
// } }
// } }
// } }
return err return err
} }

View File

@@ -7,27 +7,3 @@ import (
func TestCheckSkuDiffBetweenJxAndVendor(t *testing.T) { func TestCheckSkuDiffBetweenJxAndVendor(t *testing.T) {
CheckSkuDiffBetweenJxAndVendor() CheckSkuDiffBetweenJxAndVendor()
} }
func TestTestExcel(t *testing.T) {
data1 := []DiffData{
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
}
data2 := []DiffData{
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
}
data3 := []DiffData{
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
DiffData{"1", "1211", "aaa", "apple", "banna", "1", "0"},
}
data := map[int][]DiffData{
0: data1,
1: data2,
3: data3,
}
WriteToExcel(data)
}

View File

@@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"strings"
"git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils"
@@ -421,20 +420,20 @@ func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendo
return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID) return nil, partner.AddVendorInfo2Err(err, loopMapInfo.VendorID)
}, isContinueWhenError) }, isContinueWhenError)
if task != nil { if task != nil {
if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil { // if vendorErr := partner.IsErrChangePriceFailed(task.GetOriginalErr()); vendorErr != nil {
platformList := make([]string, len(task.GetDetailErrList())) // platformList := make([]string, len(task.GetDetailErrList()))
for k, v := range task.GetDetailErrList() { // for k, v := range task.GetDetailErrList() {
if vendorErr := partner.IsErrVendorError(v); vendorErr != nil { // if vendorErr := partner.IsErrVendorError(v); vendorErr != nil {
platformList[k] = model.VendorChineseNames[vendorErr.VendorID()] // platformList[k] = model.VendorChineseNames[vendorErr.VendorID()]
} else { // } else {
platformList[k] = "未知" // platformList[k] = "未知"
} // }
} // }
err = fmt.Errorf("同步价格失败\n失败平台%s", strings.Join(platformList, ",")) // err = fmt.Errorf("同步价格失败\n失败平台%s", strings.Join(platformList, ","))
} else { // } else {
// }
err = makeSyncError(err) err = makeSyncError(err)
} }
}
return hint, err return hint, err
} }

View File

@@ -47,13 +47,26 @@ var (
autoEnableStoreSkuTimeList = []string{ autoEnableStoreSkuTimeList = []string{
"7:00:00", "7:00:00",
"8:00:00",
"9:00:00",
"10:00:00",
"11:00:00",
"12:00:00",
"13:00:00",
"14:00:00", "14:00:00",
"15:00:00",
"16:00:00",
"17:00:00",
"18:00:00",
"19:00:00",
"20:00:00",
"21:00:00",
"22:00:00", "22:00:00",
} }
) )
func Init() { func Init() {
if globals.ReallyCallPlatformAPI { if globals.IsProductEnv() {
ScheduleTimerFunc(doDailyWork, dailyWorkTimeList) ScheduleTimerFunc(doDailyWork, dailyWorkTimeList)
ScheduleTimerFuncByInterval(func() { ScheduleTimerFuncByInterval(func() {
@@ -76,11 +89,10 @@ func Init() {
ScheduleTimerFunc(func() { ScheduleTimerFunc(func() {
dao.UpdateActStatusByTime(dao.GetDB(), time.Now().Add(-48*time.Hour)) dao.UpdateActStatusByTime(dao.GetDB(), time.Now().Add(-48*time.Hour))
}, updateActStatusTimeList) }, updateActStatusTimeList)
// ScheduleTimerFunc(func() {
// cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil)
// }, autoEnableStoreSkuTimeList)
} }
ScheduleTimerFunc(func() {
cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil)
}, autoEnableStoreSkuTimeList)
} }
func doDailyWork() { func doDailyWork() {

View File

@@ -10,6 +10,7 @@ import (
"git.rosy.net.cn/jx-callback/business/partner/putils" "git.rosy.net.cn/jx-callback/business/partner/putils"
"git.rosy.net.cn/baseapi" "git.rosy.net.cn/baseapi"
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
"git.rosy.net.cn/jx-callback/globals"
) )
const ( const (
@@ -23,10 +24,16 @@ const (
) )
var ( var (
startOpStoreTimeList = []string { startOpStoreTimeListJXCS = []string {
"22:10:00",
}
endOpStoreTimeListJXCS = []string {
"06:10:00",
}
startOpStoreTimeListJXGY = []string {
"22:00:00", "22:00:00",
} }
endOpStoreTimeList = []string { endOpStoreTimeListJXGY = []string {
"06:00:00", "06:00:00",
} }
vendorList = map[int]bool { vendorList = map[int]bool {
@@ -72,7 +79,7 @@ func FilterSkuNameList(storeSkuNameList []*partner.SkuNameInfo) (filterStoreSkuN
return filterStoreSkuNameList return filterStoreSkuNameList
} }
func StartOrEndOpStore(isStart bool, startTime, endTime int16) { func StartOrEndOpStore(isStart bool, startTime, endTime int16, isAsync, isContinueWhenError bool) (retVal interface{}, err error) {
startProcessTime := time.Now().Unix() startProcessTime := time.Now().Unix()
baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now()) baseapi.SugarLogger.Debugf("StartOrEndOpStore start time: %v", time.Now())
ctx := jxcontext.AdminCtx ctx := jxcontext.AdminCtx
@@ -97,7 +104,7 @@ func StartOrEndOpStore(isStart bool, startTime, endTime int16) {
vendorStoreID := utils.Interface2String(vendorListValue["vendorStoreID"]) vendorStoreID := utils.Interface2String(vendorListValue["vendorStoreID"])
baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID) baseapi.SugarLogger.Debugf("StartOrEndOpStore storeID:%d vendorID:%d vendorStoreID:%s", storeID, vendorID, vendorStoreID)
singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler) singleStoreHandler := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.ISingleStoreStoreSkuHandler)
storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, nil, storeID, vendorStoreID, nil) storeSkuNameList, err := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, nil)
if err != nil { if err != nil {
baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID) baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID)
} else { } else {
@@ -130,24 +137,40 @@ func StartOrEndOpStore(isStart bool, startTime, endTime int16) {
} }
return retVal, err return retVal, err
} }
task := tasksch.NewParallelTask("StartOrEndOpStore", nil, ctx, taskFunc, storeInfo.Stores) task := tasksch.NewParallelTask("StartOrEndOpStore", tasksch.NewParallelConfig().SetParallelCount(4), ctx, taskFunc, storeInfo.Stores)
tasksch.HandleTask(task, nil, true).Run() tasksch.HandleTask(task, nil, true).Run()
if isAsync {
retVal = task.ID
} else {
_, err = task.GetResult(0) _, err = task.GetResult(0)
if err != nil { if err != nil {
baseapi.SugarLogger.Debugf("StartOrEndOpStore tasksch error:%v", err) baseapi.SugarLogger.Debugf("StartOrEndOpStore tasksch error:%v", err)
} }
retVal = "1"
}
} }
endProcessTime := time.Now().Unix() endProcessTime := time.Now().Unix()
diff := endProcessTime - startProcessTime diff := endProcessTime - startProcessTime
baseapi.SugarLogger.Debugf("StartOrEndOpStore end time: %v", time.Now()) baseapi.SugarLogger.Debugf("StartOrEndOpStore end time: %v", time.Now())
baseapi.SugarLogger.Debugf("StartOrEndOpStore cost time: %d sec", diff) baseapi.SugarLogger.Debugf("StartOrEndOpStore cost time: %d sec", diff)
return retVal, err
} }
func InitEx() { func InitEx() {
if globals.IsMainProductEnv() {
ScheduleTimerFunc(func() { ScheduleTimerFunc(func() {
StartOrEndOpStore(true, 0, 0) StartOrEndOpStore(true, 0, 0, false, true)
}, startOpStoreTimeList) }, startOpStoreTimeListJXCS)
ScheduleTimerFunc(func() { ScheduleTimerFunc(func() {
StartOrEndOpStore(false, 0, 0) StartOrEndOpStore(false, 0, 0, false, true)
}, endOpStoreTimeList) }, endOpStoreTimeListJXCS)
} else {
ScheduleTimerFunc(func() {
StartOrEndOpStore(true, 0, 0, false, true)
}, startOpStoreTimeListJXGY)
ScheduleTimerFunc(func() {
StartOrEndOpStore(false, 0, 0, false, true)
}, endOpStoreTimeListJXGY)
}
} }

View File

@@ -2,7 +2,6 @@ package tasksch
import ( import (
"errors" "errors"
"time"
"git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils"
@@ -24,13 +23,11 @@ type ParallelConfig struct {
ParallelCount int ParallelCount int
BatchSize int BatchSize int
IsContinueWhenError bool IsContinueWhenError bool
ResultHandler ResultHandlerFunc
} }
type ParallelTask struct { type ParallelTask struct {
BaseTask BaseTask
resultHandler ResultHandlerFunc
worker WorkFunc worker WorkFunc
jobList [][]interface{} jobList [][]interface{}
taskChan chan []interface{} taskChan chan []interface{}
@@ -49,7 +46,6 @@ func NewParallelConfig() *ParallelConfig {
IsContinueWhenError: false, IsContinueWhenError: false,
ParallelCount: DefParallelCount, ParallelCount: DefParallelCount,
BatchSize: 1, BatchSize: 1,
ResultHandler: nil,
} }
} }
@@ -73,11 +69,6 @@ func (c *ParallelConfig) SetIsContinueWhenError(isContinueWhenError bool) *Paral
// return c // return c
// } // }
func (c *ParallelConfig) SetResultHandler(resultHandler ResultHandlerFunc) *ParallelConfig {
c.ResultHandler = resultHandler
return c
}
func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask { func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Context, worker WorkFunc, itemList interface{}, params ...interface{}) *ParallelTask {
if config == nil { if config == nil {
config = NewParallelConfig() config = NewParallelConfig()
@@ -97,7 +88,6 @@ func NewParallelTask(taskName string, config *ParallelConfig, ctx *jxcontext.Con
task := &ParallelTask{ task := &ParallelTask{
subFinishChan: make(chan interface{}, config.ParallelCount), subFinishChan: make(chan interface{}, config.ParallelCount),
taskChan: make(chan []interface{}, len(realItemList)), taskChan: make(chan []interface{}, len(realItemList)),
resultHandler: config.ResultHandler,
worker: worker, worker: worker,
jobList: jobList, jobList: jobList,
} }
@@ -135,7 +125,7 @@ func (task *ParallelTask) Run() {
} else { } else {
globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err) globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
task.locker.Lock() task.locker.Lock()
task.detailErrList = append(task.detailErrList, err) task.batchErrList = append(task.batchErrList, err)
task.locker.Unlock() task.locker.Unlock()
if !task.IsContinueWhenError { // 出错 if !task.IsContinueWhenError { // 出错
chanRetVal = err chanRetVal = err
@@ -175,37 +165,14 @@ func (task *ParallelTask) Run() {
taskResult = append(taskResult, resultList...) taskResult = append(taskResult, resultList...)
} }
} }
task.locker.Lock() task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if len(task.taskChan) > 0 {
taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
if taskErr != nil {
task.OriginalErr = taskErr
task.Err = NewTaskError(task.Name, taskErr)
} else {
if len(task.detailErrList) > 0 {
task.OriginalErr = task.detailErrList[0]
}
task.Err = task.buildTaskErrFromDetail()
}
task.Result = taskResult task.Result = taskResult
task.TerminatedAt = time.Now() task.mainErr = taskErr
task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放
task.locker.Unlock() task.locker.Unlock()
globals.SugarLogger.Debugf("ParallelTask.Run %s, err:%v", task.Name, task.Err)
close(task.subFinishChan)
if task.resultHandler != nil { close(task.subFinishChan)
task.resultHandler(task.Name, taskResult, task.Err) task.jobList = nil // 如果不释放,任务被管理的话,会导致内存不能释放
}
}) })
} }

View File

@@ -1,8 +1,6 @@
package tasksch package tasksch
import ( import (
"time"
"git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
"git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals"
@@ -45,7 +43,7 @@ func (task *SeqTask) Run() {
task.finishedOneJob(1, err) task.finishedOneJob(1, err)
if taskErr = err; taskErr != nil { if taskErr = err; taskErr != nil {
task.locker.Lock() task.locker.Lock()
task.detailErrList = append(task.detailErrList, err) task.batchErrList = append(task.batchErrList, err)
task.locker.Unlock() task.locker.Unlock()
globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err) globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err)
if !task.IsContinueWhenError { if !task.IsContinueWhenError {
@@ -57,29 +55,9 @@ func (task *SeqTask) Run() {
} }
EndFor: EndFor:
task.locker.Lock() task.locker.Lock()
if taskErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if task.FinishedJobCount < task.TotalJobCount {
taskErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
if taskErr != nil {
task.OriginalErr = taskErr
task.Err = NewTaskError(task.Name, taskErr)
} else {
if len(task.detailErrList) > 0 {
task.OriginalErr = task.detailErrList[0]
}
task.Err = task.buildTaskErrFromDetail()
}
task.Result = taskResult task.Result = taskResult
task.TerminatedAt = time.Now() task.mainErr = taskErr
task.locker.Unlock() task.locker.Unlock()
globals.SugarLogger.Debugf("SeqTask.Run %s, result:%v, err:%v", task.Name, taskResult, task.Err)
}) })
} }

View File

@@ -3,7 +3,6 @@ package tasksch
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
@@ -58,35 +57,37 @@ type ITask interface {
AddChild(task ITask) ITask AddChild(task ITask) ITask
GetChildren() TaskList GetChildren() TaskList
SetParent(parentTask ITask) SetParent(parentTask ITask)
GetOriginalErr() error // GetOriginalErr() error
GetDetailErrList() []error GetErr() error
// GetDetailErrList() []error
GetLeafResult() (finishedItemCount, failedItemCount int)
json.Marshaler json.Marshaler
} }
type TaskError struct { // type TaskError struct {
name string // name string
errStr string // errStr string
} // }
func (t *TaskError) MarshalJSON() ([]byte, error) { // func (t *TaskError) MarshalJSON() ([]byte, error) {
return json.Marshal(t.Error()) // return json.Marshal(t.Error())
} // }
func (t *TaskError) Error() string { // func (t *TaskError) Error() string {
return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr) // return fmt.Sprintf("[%s], 错误:%s", t.name, t.errStr)
} // }
func (t *TaskError) String() string { // func (t *TaskError) String() string {
return t.Error() // return t.Error()
} // }
func NewTaskError(name string, err error) *TaskError { // func NewTaskError(name string, err error) *TaskError {
return &TaskError{ // return &TaskError{
name: name, // name: name,
errStr: err.Error(), // errStr: err.Error(),
} // }
} // }
type BaseTask struct { type BaseTask struct {
Name string `json:"name"` Name string `json:"name"`
@@ -110,10 +111,10 @@ type BaseTask struct {
Result []interface{} `json:"-"` Result []interface{} `json:"-"`
Children TaskList `json:"children"` Children TaskList `json:"children"`
Err error `json:"err"` Err string `json:"err"`
OriginalErr error `json:"-"`
detailErrList []error mainErr error
batchErrList []error
finishChan chan struct{} finishChan chan struct{}
C <-chan struct{} `json:"-"` C <-chan struct{} `json:"-"`
@@ -168,7 +169,7 @@ func (t *BaseTask) GetID() string {
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) { func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
if t.GetStatus() >= TaskStatusEndBegin { if t.GetStatus() >= TaskStatusEndBegin {
return t.Result, t.OriginalErr return t.getResult(), t.GetErr()
} }
if duration == 0 { if duration == 0 {
duration = time.Hour * 10000 // duration为0表示无限等待 duration = time.Hour * 10000 // duration为0表示无限等待
@@ -178,7 +179,7 @@ func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err
case <-t.finishChan: case <-t.finishChan:
t.isGetResultCalled = true t.isGetResultCalled = true
timer.Stop() timer.Stop()
return t.Result, t.OriginalErr return t.getResult(), t.GetErr()
case <-timer.C: case <-timer.C:
} }
return nil, ErrTaskNotFinished return nil, ErrTaskNotFinished
@@ -276,18 +277,75 @@ func (t *BaseTask) GetNoticeMsg() string {
return t.NoticeMsg return t.NoticeMsg
} }
func (t *BaseTask) GetOriginalErr() error { func (t *BaseTask) getResult() []interface{} {
t.locker.RLock() t.locker.RLock()
defer t.locker.RUnlock() defer t.locker.RUnlock()
return t.OriginalErr return t.Result
} }
func (t *BaseTask) GetDetailErrList() []error { // func (t *BaseTask) GetOriginalErr() error {
// t.locker.RLock()
// defer t.locker.RUnlock()
// return nil
// }
func (t *BaseTask) GetErr() error {
t.locker.RLock() t.locker.RLock()
defer t.locker.RUnlock() defer t.locker.RUnlock()
return t.detailErrList if t.mainErr == nil && len(t.batchErrList) == 0 {
return nil
}
return t
} }
func (t *BaseTask) GetLeafResult() (finishedItemCount, failedItemCount int) {
if len(t.Children) == 0 {
return t.FinishedItemCount, t.FailedItemCount
}
for _, v := range t.Children {
subFinishedItemCount, subFailedItemCount := v.GetLeafResult()
finishedItemCount += subFinishedItemCount
failedItemCount += subFailedItemCount
}
return finishedItemCount, failedItemCount
}
func (t *BaseTask) Error() (errMsg string) {
if t.mainErr == nil && len(t.batchErrList) == 0 {
return ""
}
t.locker.RLock()
errMsg = t.Err
t.locker.RUnlock()
if errMsg != "" {
return errMsg
}
errMsg = "任务:" + t.Name
if t.parent == nil {
finishedItemCount, failedItemCount := t.GetLeafResult()
errMsg += fmt.Sprintf(", 全部总共:%d, 成功:%d, 失败:%d,\n", (finishedItemCount + failedItemCount), finishedItemCount, failedItemCount)
}
if t.mainErr != nil {
errMsg += "失败"
errMsg += "," + t.mainErr.Error()
} else {
errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount)
for _, v := range t.batchErrList {
errMsg += fmt.Sprintf("%s,\n", v.Error())
}
}
t.locker.Lock()
t.Err = errMsg
t.locker.Unlock()
return errMsg
}
// func (t *BaseTask) GetDetailErrList() []error {
// t.locker.RLock()
// defer t.locker.RUnlock()
// return t.batchErrList
// }
func AddChild(parentTask ITask, task ITask) ITask { func AddChild(parentTask ITask, task ITask) ITask {
if parentTask != nil { if parentTask != nil {
return parentTask.AddChild(task) return parentTask.AddChild(task)
@@ -322,11 +380,31 @@ func (t *BaseTask) run(taskHandler func()) {
}() }()
taskHandler() taskHandler()
task := t
task.locker.Lock()
if task.mainErr != nil { // 如果有错误,肯定就是失败了
task.Status = TaskStatusFailed
} else {
if task.FinishedJobCount+task.FailedJobCount < task.TotalJobCount {
task.mainErr = ErrTaskIsCanceled
task.Status = TaskStatusCanceled
} else {
task.Status = TaskStatusFinished
}
}
task.TerminatedAt = time.Now()
task.locker.Unlock()
task.Error()
globals.SugarLogger.Debugf("Task:%s, result:%v, err:%v", task.Name, task.Result, task.mainErr)
select { select {
case <-t.quitChan: case <-t.quitChan:
default: default:
close(t.quitChan) close(t.quitChan)
} }
// todo 如下代码可能有对t.Children操作的并发问题
for _, subTask := range t.Children { for _, subTask := range t.Children {
if _, err := subTask.GetResult(0); err != nil { if _, err := subTask.GetResult(0); err != nil {
globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err) globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err)
@@ -340,18 +418,14 @@ func (t *BaseTask) run(taskHandler func()) {
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息 if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
var content string var content string
taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt)) taskDesc := fmt.Sprintf("你的异步任务[%s],ID[%s],开始于:%s,结束于:%s,", t.Name, t.ID, utils.Time2Str(t.CreatedAt), utils.Time2Str(t.TerminatedAt))
if t.Err == nil { if t.mainErr == nil {
content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status]) content = fmt.Sprintf("%s执行%s", taskDesc, TaskStatusName[t.Status])
noticeMsg := t.GetNoticeMsg() noticeMsg := t.GetNoticeMsg()
if noticeMsg != "" { if noticeMsg != "" {
content += ",通知消息:" + noticeMsg content += ",通知消息:" + noticeMsg
} }
} else { } else {
if t.Status == TaskStatusFinished { content = t.Error()
content = fmt.Sprintf("%s执行部分失败,%s", taskDesc, t.Err.Error())
} else {
content = fmt.Sprintf("%s执行失败,%s", taskDesc, t.Err.Error())
}
} }
msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content) msg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
} }
@@ -381,16 +455,16 @@ func (t *BaseTask) setStatus(status int) {
t.Status = status t.Status = status
} }
func (t *BaseTask) buildTaskErrFromDetail() (err error) { // func (t *BaseTask) buildTaskErrFromBatchErrList() (err error) {
if len(t.detailErrList) > 0 { // if len(t.batchErrList) > 0 {
strList := make([]string, len(t.detailErrList)) // strList := make([]string, len(t.batchErrList))
for k, v := range t.detailErrList { // for k, v := range t.batchErrList {
strList[k] = v.Error() // strList[k] = v.Error()
} // }
return NewTaskError(t.Name, fmt.Errorf("总共:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FailedItemCount, strings.Join(strList, "\n"))) // return NewTaskError(t.Name, fmt.Errorf("总共:%d, 成功:%d, 失败:%d, 详情:\n%s", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount, strings.Join(strList, "\n")))
} // }
return nil // return nil
} // }
func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) { func (task *BaseTask) callWorker(worker func() (retVal interface{}, err error)) (retVal interface{}, err error) {
defer func() { defer func() {

View File

@@ -1,15 +1,11 @@
package tasksch package tasksch
import ( import (
"errors"
"fmt"
"testing" "testing"
"git.rosy.net.cn/baseapi/utils"
) )
func TestTaskError(t *testing.T) { func TestTaskError(t *testing.T) {
err := NewTaskError("test", errors.New("hello")) // err := NewTaskError("test", errors.New("hello"))
fmt.Println(utils.Format4Output(err, false)) // fmt.Println(utils.Format4Output(err, false))
fmt.Println(err.Error()) // fmt.Println(err.Error())
} }

View File

@@ -1,5 +1,7 @@
package model package model
import "time"
const ( const (
StoreSkuBindStatusNA = -2 StoreSkuBindStatusNA = -2
StoreSkuBindStatusDeleted = -1 StoreSkuBindStatusDeleted = -1
@@ -104,7 +106,7 @@ type StoreSkuBind struct {
MtwmSyncStatus int8 `orm:"default(2)"` MtwmSyncStatus int8 `orm:"default(2)"`
WscSyncStatus int8 `orm:"default(2)"` WscSyncStatus int8 `orm:"default(2)"`
// AutoSaleAt time.Time `orm:"type(datetime);null" json:"autoSaleAt"` AutoSaleAt time.Time `orm:"type(datetime);null" json:"autoSaleAt"`
} }
func (*StoreSkuBind) TableUnique() [][]string { func (*StoreSkuBind) TableUnique() [][]string {
@@ -116,6 +118,7 @@ func (*StoreSkuBind) TableUnique() [][]string {
func (*StoreSkuBind) TableIndex() [][]string { func (*StoreSkuBind) TableIndex() [][]string {
return [][]string{ return [][]string{
[]string{"SkuID", "StoreID", "DeletedAt"}, []string{"SkuID", "StoreID", "DeletedAt"},
// []string{"AutoSaleAt", "DeletedAt", "StoreID"},
} }
} }

View File

@@ -583,7 +583,7 @@ func (p *PurchaseHandler) GetVendorCategories(ctx *jxcontext.Context) (vendorCat
func (p *PurchaseHandler) GetSkus(ctx *jxcontext.Context, skuID int, vendorSkuID, skuName string) (skuNameList []*partner.SkuNameInfo, err error) { func (p *PurchaseHandler) GetSkus(ctx *jxcontext.Context, skuID int, vendorSkuID, skuName string) (skuNameList []*partner.SkuNameInfo, err error) {
param := &jdapi.QuerySkuParam{ param := &jdapi.QuerySkuParam{
SkuID: utils.Str2Int64(vendorSkuID), SkuID: utils.Str2Int64WithDefault(vendorSkuID, 0),
SkuName: skuName, SkuName: skuName,
IsFilterDel: jdapi.IsFilterDelTrue, IsFilterDel: jdapi.IsFilterDelTrue,
PageNo: 1, PageNo: 1,

View File

@@ -277,15 +277,17 @@ func (c *TempOpController) TestIt() {
// @Title 开启或结束所有平台商店的额外时间 // @Title 开启或结束所有平台商店的额外时间
// @Description 开启或结束所有平台商店的额外时间并且修改所有商品库存为0或99999(开启0 结束99999) // @Description 开启或结束所有平台商店的额外时间并且修改所有商品库存为0或99999(开启0 结束99999)
// @Param token header string true "认证token" // @Param token header string true "认证token"
// @Param isStart query bool true "开启或结束门店" // @Param startOrEndStore query bool true "开启或结束"
// @Param startTime query int false "开始营业时间(格式930代表早上9点30分)" // @Param startTime query int false "开始营业时间(格式930代表早上9点30分)"
// @Param endTime query int false "结束营业时间" // @Param endTime query int false "结束营业时间"
// @Param isAsync query bool false "是否异步操作"
// @Param isContinueWhenError query bool false "单个同步失败是否继续缺省false"
// @Success 200 {object} controllers.CallResult // @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult // @Failure 200 {object} controllers.CallResult
// @router /TestStartOrEndOpStore [get] // @router /TestStartOrEndOpStore [get]
func (c *TempOpController) TestStartOrEndOpStore() { func (c *TempOpController) TestStartOrEndOpStore() {
c.callTestStartOrEndOpStore(func(params *tTempopTestStartOrEndOpStoreParams) (retVal interface{}, errCode string, err error) { c.callTestStartOrEndOpStore(func(params *tTempopTestStartOrEndOpStoreParams) (retVal interface{}, errCode string, err error) {
misc.StartOrEndOpStore(params.IsStart, int16(params.StartTime), int16(params.EndTime)) retVal, err = misc.StartOrEndOpStore(params.StartOrEndStore, int16(params.StartTime), int16(params.EndTime), params.IsAsync, params.IsContinueWhenError)
return retVal, "", err return retVal, "", err
}) })
} }

View File

@@ -56,9 +56,9 @@ func Init() {
if globals.IsProductEnv() { if globals.IsProductEnv() {
ebai.CurPurchaseHandler.StartRefreshComment() ebai.CurPurchaseHandler.StartRefreshComment()
misc.Init()
misc.InitEx() misc.InitEx()
} }
misc.Init()
} }
// 返回true表示非运行服务 // 返回true表示非运行服务