解决冲突,合并提交
This commit is contained in:
@@ -530,7 +530,19 @@ func UpdateStore(ctx *jxcontext.Context, storeID int, payload map[string]interfa
|
|||||||
if err = dao.GetEntity(db, store); err != nil {
|
if err = dao.GetEntity(db, store); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
valid := dao.StrictMakeMapByStructObject(payload, store, userName)
|
valid := dao.StrictMakeMapByStructObject(payload, store, userName)
|
||||||
|
if payload["lng"] != nil || payload["lat"] != nil {
|
||||||
|
intLng := jxutils.StandardCoordinate2Int(utils.Interface2Float64WithDefault(payload["lng"], 0.0))
|
||||||
|
intLat := jxutils.StandardCoordinate2Int(utils.Interface2Float64WithDefault(payload["lat"], 0.0))
|
||||||
|
if intLng != 0 && intLng != store.Lng {
|
||||||
|
valid["lng"] = intLng
|
||||||
|
}
|
||||||
|
if intLat != 0 && intLat != store.Lng {
|
||||||
|
valid["lat"] = intLat
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if valid["originalName"] != nil {
|
if valid["originalName"] != nil {
|
||||||
delete(valid, "originalName")
|
delete(valid, "originalName")
|
||||||
}
|
}
|
||||||
@@ -589,18 +601,15 @@ func UpdateStore(ctx *jxcontext.Context, storeID int, payload map[string]interfa
|
|||||||
"address",
|
"address",
|
||||||
"deliveryRange",
|
"deliveryRange",
|
||||||
} {
|
} {
|
||||||
if payload[v] != nil {
|
if valid[v] != nil {
|
||||||
syncStatus |= model.SyncFlagStoreAddress
|
syncStatus |= model.SyncFlagStoreAddress
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var lng, lat float64
|
if valid["status"] != nil {
|
||||||
if payload["lng"] != nil || payload["lat"] != nil {
|
syncStatus |= model.SyncFlagStoreStatus
|
||||||
lng = utils.Interface2Float64WithDefault(payload["lng"], 0.0)
|
|
||||||
lat = utils.Interface2Float64WithDefault(payload["lat"], 0.0)
|
|
||||||
valid["lng"] = jxutils.StandardCoordinate2Int(lng)
|
|
||||||
valid["lat"] = jxutils.StandardCoordinate2Int(lat)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if valid["deliveryRange"] != nil {
|
if valid["deliveryRange"] != nil {
|
||||||
valid["deliveryRange"] = strings.Trim(valid["deliveryRange"].(string), ";")
|
valid["deliveryRange"] = strings.Trim(valid["deliveryRange"].(string), ";")
|
||||||
}
|
}
|
||||||
@@ -830,7 +839,11 @@ func UpdateStoreVendorMap(ctx *jxcontext.Context, db *dao.DaoDB, storeID, vendor
|
|||||||
if err = dao.GetEntity(db, storeMap, model.FieldStoreID, model.FieldVendorID, model.FieldDeletedAt); err != nil {
|
if err = dao.GetEntity(db, storeMap, model.FieldStoreID, model.FieldVendorID, model.FieldDeletedAt); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
syncStatus := model.SyncFlagModifiedMask
|
||||||
valid := dao.StrictMakeMapByStructObject(payload, storeMap, userName)
|
valid := dao.StrictMakeMapByStructObject(payload, storeMap, userName)
|
||||||
|
if valid["status"] != nil {
|
||||||
|
syncStatus |= model.SyncFlagStoreStatus
|
||||||
|
}
|
||||||
if valid["pricePercentagePack"] != nil {
|
if valid["pricePercentagePack"] != nil {
|
||||||
if pricePercentagePack := utils.Interface2String(valid["pricePercentagePack"]); pricePercentagePack != "" {
|
if pricePercentagePack := utils.Interface2String(valid["pricePercentagePack"]); pricePercentagePack != "" {
|
||||||
_, err2 := dao.QueryConfigs(db, pricePercentagePack, model.ConfigTypePricePack, "")
|
_, err2 := dao.QueryConfigs(db, pricePercentagePack, model.ConfigTypePricePack, "")
|
||||||
@@ -857,7 +870,7 @@ func UpdateStoreVendorMap(ctx *jxcontext.Context, db *dao.DaoDB, storeID, vendor
|
|||||||
num, err = dao.UpdateEntityLogicallyAndUpdateSyncStatus(db, storeMap, valid, userName, map[string]interface{}{
|
num, err = dao.UpdateEntityLogicallyAndUpdateSyncStatus(db, storeMap, valid, userName, map[string]interface{}{
|
||||||
model.FieldStoreID: storeID,
|
model.FieldStoreID: storeID,
|
||||||
model.FieldVendorID: vendorID,
|
model.FieldVendorID: vendorID,
|
||||||
}, model.FieldSyncStatus, model.SyncFlagModifiedMask)
|
}, model.FieldSyncStatus, syncStatus)
|
||||||
} else {
|
} else {
|
||||||
num, err = dao.UpdateEntityLogically(db, storeMap, valid, userName, map[string]interface{}{
|
num, err = dao.UpdateEntityLogically(db, storeMap, valid, userName, map[string]interface{}{
|
||||||
model.FieldStoreID: storeID,
|
model.FieldStoreID: storeID,
|
||||||
|
|||||||
@@ -349,7 +349,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
|
|||||||
switch step {
|
switch step {
|
||||||
case 0:
|
case 0:
|
||||||
if len(deleteList) > 0 {
|
if len(deleteList) > 0 {
|
||||||
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuInfo("删除门店商品", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
var successList []*partner.StoreSkuInfo
|
var successList []*partner.StoreSkuInfo
|
||||||
if successList, err = singleStoreHandler.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuNotExist(err) {
|
if successList, err = singleStoreHandler.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuNotExist(err) {
|
||||||
err = nil
|
err = nil
|
||||||
@@ -365,7 +365,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
|
|||||||
}
|
}
|
||||||
case 1:
|
case 1:
|
||||||
if len(createList) > 0 {
|
if len(createList) > 0 {
|
||||||
_, err = putils.FreeBatchStoreSkuSyncInfo(func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuSyncInfo("创建门店商品", func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
|
||||||
var successList []*dao.StoreSkuSyncInfo
|
var successList []*dao.StoreSkuSyncInfo
|
||||||
if successList, err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuExist(err) {
|
if successList, err = singleStoreHandler.CreateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); singleStoreHandler.IsErrSkuExist(err) {
|
||||||
if skuNameList, err2 := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, []*partner.StoreSkuInfo{
|
if skuNameList, err2 := singleStoreHandler.GetStoreSkusFullInfo(ctx, task, storeID, vendorStoreID, []*partner.StoreSkuInfo{
|
||||||
@@ -392,7 +392,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
|
|||||||
}
|
}
|
||||||
case 2:
|
case 2:
|
||||||
if len(updateList) > 0 {
|
if len(updateList) > 0 {
|
||||||
_, err = putils.FreeBatchStoreSkuSyncInfo(func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuSyncInfo("更新门店商品基础信息", func(task tasksch.ITask, batchedStoreSkuList []*dao.StoreSkuSyncInfo) (result interface{}, successCount int, err error) {
|
||||||
var successList []*dao.StoreSkuSyncInfo
|
var successList []*dao.StoreSkuSyncInfo
|
||||||
if successList, err = singleStoreHandler.UpdateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
if successList, err = singleStoreHandler.UpdateStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
||||||
successList = batchedStoreSkuList
|
successList = batchedStoreSkuList
|
||||||
@@ -406,7 +406,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
|
|||||||
case 3:
|
case 3:
|
||||||
for k, list := range [][]*partner.StoreSkuInfo{stockList /*, onlineList*/} {
|
for k, list := range [][]*partner.StoreSkuInfo{stockList /*, onlineList*/} {
|
||||||
if len(list) > 0 {
|
if len(list) > 0 {
|
||||||
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
var successList []*partner.StoreSkuInfo
|
var successList []*partner.StoreSkuInfo
|
||||||
if successList, err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
if successList, err = storeSkuHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
||||||
successList = batchedStoreSkuList
|
successList = batchedStoreSkuList
|
||||||
@@ -421,12 +421,14 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
|
|||||||
case 4, 5:
|
case 4, 5:
|
||||||
statusList := onlineList
|
statusList := onlineList
|
||||||
status := model.SkuStatusNormal
|
status := model.SkuStatusNormal
|
||||||
|
name := "可售门店商品"
|
||||||
if step == 5 {
|
if step == 5 {
|
||||||
statusList = offlineList
|
statusList = offlineList
|
||||||
status = model.SkuStatusDontSale
|
status = model.SkuStatusDontSale
|
||||||
|
name = "不可售门店商品"
|
||||||
}
|
}
|
||||||
if len(statusList) > 0 {
|
if len(statusList) > 0 {
|
||||||
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuInfo(name, func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
var successList []*partner.StoreSkuInfo
|
var successList []*partner.StoreSkuInfo
|
||||||
if successList, err = storeSkuHandler.UpdateStoreSkusStatus(ctx, storeID, vendorStoreID, batchedStoreSkuList, status); err == nil {
|
if successList, err = storeSkuHandler.UpdateStoreSkusStatus(ctx, storeID, vendorStoreID, batchedStoreSkuList, status); err == nil {
|
||||||
successList = batchedStoreSkuList
|
successList = batchedStoreSkuList
|
||||||
@@ -439,7 +441,7 @@ func syncStoreSkuNew(ctx *jxcontext.Context, parentTask tasksch.ITask, isFull bo
|
|||||||
}
|
}
|
||||||
case 6:
|
case 6:
|
||||||
if len(priceList) > 0 {
|
if len(priceList) > 0 {
|
||||||
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuInfo("更新门店商品价格", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
var successList []*partner.StoreSkuInfo
|
var successList []*partner.StoreSkuInfo
|
||||||
if successList, err = storeSkuHandler.UpdateStoreSkusPrice(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
if successList, err = storeSkuHandler.UpdateStoreSkusPrice(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
||||||
successList = batchedStoreSkuList
|
successList = batchedStoreSkuList
|
||||||
@@ -473,7 +475,7 @@ func PruneMissingStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, ven
|
|||||||
localSkuMap[v.SkuID] = v
|
localSkuMap[v.SkuID] = v
|
||||||
}
|
}
|
||||||
var sku2Delete []*partner.StoreSkuInfo
|
var sku2Delete []*partner.StoreSkuInfo
|
||||||
task := tasksch.NewSeqTask(fmt.Sprintf("PruneMissingStoreSkus平台:%s", model.VendorChineseNames[vendorID]), ctx,
|
task := tasksch.NewSeqTask(fmt.Sprintf("清除平台:%s上多余的门店商品", model.VendorChineseNames[vendorID]), ctx,
|
||||||
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
|
||||||
switch step {
|
switch step {
|
||||||
case 0:
|
case 0:
|
||||||
@@ -490,7 +492,7 @@ func PruneMissingStoreSkus(ctx *jxcontext.Context, parentTask tasksch.ITask, ven
|
|||||||
}
|
}
|
||||||
case 1:
|
case 1:
|
||||||
if len(sku2Delete) > 0 {
|
if len(sku2Delete) > 0 {
|
||||||
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuInfo("删除门店商品", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
_, err = handler.DeleteStoreSkus(ctx, storeID, vendorStoreID, sku2Delete)
|
_, err = handler.DeleteStoreSkus(ctx, storeID, vendorStoreID, sku2Delete)
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
|
}, ctx, parentTask, sku2Delete, handler.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
|
||||||
|
|||||||
@@ -166,14 +166,14 @@ func StartOrEndOpStore(isStart bool, vendorIDList []int, storeIDList []int, star
|
|||||||
baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID)
|
baseapi.SugarLogger.Errorf("StartOrEndOpStore GetStoreSkusFullInfo error:%v storeID:%d vendorID:%d vendorStoreID:%s", err, storeID, vendorID, vendorStoreID)
|
||||||
} else {
|
} else {
|
||||||
SetSkuStock(isStart, storeSkuNameList)
|
SetSkuStock(isStart, storeSkuNameList)
|
||||||
SetSpecialSkuStatus(storeID, vendorID, vendorStoreID,storeSkuNameList)
|
SetSpecialSkuStatus(storeID, vendorID, vendorStoreID, storeSkuNameList)
|
||||||
storeSkuList := putils.StoreSkuFullList2Bare(storeSkuNameList)
|
storeSkuList := putils.StoreSkuFullList2Bare(storeSkuNameList)
|
||||||
if len(storeSkuList) > 0 {
|
if len(storeSkuList) > 0 {
|
||||||
if !isStart {
|
if !isStart {
|
||||||
AddOrDelExtraStoreOptime(vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false)
|
AddOrDelExtraStoreOptime(vendorID, storeID, vendorStoreID, &storeListValue.Store, startOpStoreTime, endOpStoreTime, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
_, err = putils.FreeBatchStoreSkuInfo("更新门店商品库存", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
//var successList []*partner.StoreSkuInfo
|
//var successList []*partner.StoreSkuInfo
|
||||||
if _, err = singleStoreHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
if _, err = singleStoreHandler.UpdateStoreSkusStock(ctx, storeID, vendorStoreID, batchedStoreSkuList); err == nil {
|
||||||
//successList = batchedStoreSkuList
|
//successList = batchedStoreSkuList
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import (
|
|||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
||||||
@@ -28,11 +27,6 @@ var (
|
|||||||
skuNamePat *regexp.Regexp
|
skuNamePat *regexp.Regexp
|
||||||
)
|
)
|
||||||
|
|
||||||
type SyncMapWithTimeout struct {
|
|
||||||
sync.Map
|
|
||||||
timers sync.Map
|
|
||||||
}
|
|
||||||
|
|
||||||
type OrderSkuList []*model.OrderSku
|
type OrderSkuList []*model.OrderSku
|
||||||
|
|
||||||
func (l OrderSkuList) Len() int {
|
func (l OrderSkuList) Len() int {
|
||||||
@@ -61,22 +55,6 @@ func init() {
|
|||||||
skuNamePat = regexp.MustCompile(`([\((\[【][^\((\[【\))\]】]*[\))\]】])?(.*?)([((].*[))])?\s*约?([1-9][\d\.]*)(g|G|kg|kG|Kg|KG|l|L|ml|mL|Ml|ML|克)\s*([((].*[))])?\s*(?:\/|/|)\s*([^\s()()]{0,2})\s*([((].*[))])?$`)
|
skuNamePat = regexp.MustCompile(`([\((\[【][^\((\[【\))\]】]*[\))\]】])?(.*?)([((].*[))])?\s*约?([1-9][\d\.]*)(g|G|kg|kG|Kg|KG|l|L|ml|mL|Ml|ML|克)\s*([((].*[))])?\s*(?:\/|/|)\s*([^\s()()]{0,2})\s*([((].*[))])?$`)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *SyncMapWithTimeout) StoreWithTimeout(key, value interface{}, timeout time.Duration) {
|
|
||||||
m.Map.Store(key, value)
|
|
||||||
m.timers.Store(key, utils.AfterFuncWithRecover(timeout, func() {
|
|
||||||
m.Delete(key)
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *SyncMapWithTimeout) Delete(key interface{}) {
|
|
||||||
m.Map.Delete(key)
|
|
||||||
if value, ok := m.timers.Load(key); ok {
|
|
||||||
timer := value.(*time.Timer)
|
|
||||||
timer.Stop()
|
|
||||||
}
|
|
||||||
m.timers.Delete(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getJxStoreIDFromOrder(order *model.GoodsOrder) (retVal int) {
|
func getJxStoreIDFromOrder(order *model.GoodsOrder) (retVal int) {
|
||||||
if order.JxStoreID != 0 {
|
if order.JxStoreID != 0 {
|
||||||
return order.JxStoreID
|
return order.JxStoreID
|
||||||
|
|||||||
29
business/jxutils/jxutils_sync_map.go
Normal file
29
business/jxutils/jxutils_sync_map.go
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
package jxutils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncMapWithTimeout struct {
|
||||||
|
sync.Map
|
||||||
|
timers sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SyncMapWithTimeout) StoreWithTimeout(key, value interface{}, timeout time.Duration) {
|
||||||
|
m.Map.Store(key, value)
|
||||||
|
m.timers.Store(key, utils.AfterFuncWithRecover(timeout, func() {
|
||||||
|
m.Delete(key)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SyncMapWithTimeout) Delete(key interface{}) {
|
||||||
|
m.Map.Delete(key)
|
||||||
|
if value, ok := m.timers.Load(key); ok {
|
||||||
|
timer := value.(*time.Timer)
|
||||||
|
timer.Stop()
|
||||||
|
}
|
||||||
|
m.timers.Delete(key)
|
||||||
|
}
|
||||||
@@ -130,9 +130,7 @@ func (task *ParallelTask) Run() {
|
|||||||
if err != nil { // 出错
|
if err != nil { // 出错
|
||||||
// globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
|
// globals.SugarLogger.Infof("ParallelTask.Run %s, subtask(job:%s, params:%s) result:%v, failed with error:%v", task.Name, utils.Format4Output(job, true), utils.Format4Output(task.params, true), result, err)
|
||||||
if task.IsContinueWhenError {
|
if task.IsContinueWhenError {
|
||||||
task.locker.Lock()
|
task.AddBatchErr(err)
|
||||||
task.batchErrList = append(task.batchErrList, err)
|
|
||||||
task.locker.Unlock()
|
|
||||||
} else {
|
} else {
|
||||||
chanRetVal = err
|
chanRetVal = err
|
||||||
goto end
|
goto end
|
||||||
|
|||||||
@@ -44,9 +44,7 @@ func (task *SeqTask) Run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err)
|
// globals.SugarLogger.Infof("SeqTask.Run %s step:%d failed with error:%v", task.Name, i, err)
|
||||||
if task.IsContinueWhenError {
|
if task.IsContinueWhenError {
|
||||||
task.locker.Lock()
|
task.AddBatchErr(err)
|
||||||
task.batchErrList = append(task.batchErrList, err)
|
|
||||||
task.locker.Unlock()
|
|
||||||
} else {
|
} else {
|
||||||
taskErr = err
|
taskErr = err
|
||||||
break
|
break
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package tasksch
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -283,6 +284,14 @@ func (t *BaseTask) getResult() []interface{} {
|
|||||||
return t.Result
|
return t.Result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *BaseTask) AddBatchErr(err error) {
|
||||||
|
if err != nil {
|
||||||
|
t.locker.Lock()
|
||||||
|
defer t.locker.Unlock()
|
||||||
|
t.batchErrList = append(t.batchErrList, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// func (t *BaseTask) GetOriginalErr() error {
|
// func (t *BaseTask) GetOriginalErr() error {
|
||||||
// t.locker.RLock()
|
// t.locker.RLock()
|
||||||
// defer t.locker.RUnlock()
|
// defer t.locker.RUnlock()
|
||||||
@@ -330,9 +339,11 @@ func (t *BaseTask) Error() (errMsg string) {
|
|||||||
errMsg += "," + t.mainErr.Error()
|
errMsg += "," + t.mainErr.Error()
|
||||||
} else {
|
} else {
|
||||||
errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount)
|
errMsg += fmt.Sprintf("部分失败, 总共:%d, 成功:%d, 失败:%d, 详情如下:\n", t.TotalItemCount, t.FinishedItemCount, t.FailedItemCount)
|
||||||
for _, v := range t.batchErrList {
|
strList := make([]string, len(t.batchErrList))
|
||||||
errMsg += fmt.Sprintf("%s,\n", v.Error())
|
for k, v := range t.batchErrList {
|
||||||
|
strList[k] = v.Error()
|
||||||
}
|
}
|
||||||
|
errMsg += strings.Join(strList, ",\n")
|
||||||
}
|
}
|
||||||
t.locker.Lock()
|
t.locker.Lock()
|
||||||
t.Err = errMsg
|
t.Err = errMsg
|
||||||
|
|||||||
@@ -2,45 +2,44 @@ package tasksch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
defTaskMan TaskMan
|
defTaskMan TaskMan
|
||||||
defLastHours = 24
|
defLastHours = 24
|
||||||
|
maxStoreTime = 48 * time.Hour // 最多存两天时间
|
||||||
)
|
)
|
||||||
|
|
||||||
type TaskMan struct {
|
type TaskMan struct {
|
||||||
taskList map[string]ITask
|
taskMap jxutils.SyncMapWithTimeout
|
||||||
locker sync.RWMutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
defTaskMan.taskList = make(map[string]ITask)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TaskMan) GetTasks(taskID string, fromStatus, toStatus int, lastHours int, createdBy string) (taskList TaskList) {
|
func (m *TaskMan) GetTasks(taskID string, fromStatus, toStatus int, lastHours int, createdBy string) (taskList TaskList) {
|
||||||
m.locker.RLock()
|
|
||||||
defer m.locker.RUnlock()
|
|
||||||
if lastHours == 0 {
|
if lastHours == 0 {
|
||||||
lastHours = defLastHours
|
lastHours = defLastHours
|
||||||
}
|
}
|
||||||
lastTime := time.Now().Add(time.Duration(-lastHours) * time.Hour).Unix()
|
lastTime := time.Now().Add(time.Duration(-lastHours) * time.Hour).Unix()
|
||||||
for k, v := range m.taskList {
|
m.taskMap.Range(func(key, value interface{}) bool {
|
||||||
|
k := key.(string)
|
||||||
|
v := value.(ITask)
|
||||||
status := v.GetStatus()
|
status := v.GetStatus()
|
||||||
if !((createdBy != "" && createdBy != v.GetCreatedBy()) || (taskID != "" && taskID != k) || status < fromStatus || status > toStatus || v.GetCreatedAt().Unix() < lastTime) {
|
if !((createdBy != "" && createdBy != v.GetCreatedBy()) || (taskID != "" && taskID != k) || status < fromStatus || status > toStatus || v.GetCreatedAt().Unix() < lastTime) {
|
||||||
taskList = append(taskList, v)
|
taskList = append(taskList, v)
|
||||||
}
|
}
|
||||||
}
|
return true
|
||||||
|
})
|
||||||
sort.Sort(TaskList(taskList))
|
sort.Sort(TaskList(taskList))
|
||||||
return taskList
|
return taskList
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TaskMan) ManageTask(task ITask) ITask {
|
func (m *TaskMan) ManageTask(task ITask) ITask {
|
||||||
m.locker.Lock()
|
m.taskMap.StoreWithTimeout(task.GetID(), task, maxStoreTime)
|
||||||
defer m.locker.Unlock()
|
|
||||||
m.taskList[task.GetID()] = task
|
|
||||||
return task
|
return task
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,7 +52,7 @@ func ManageTask(task ITask) ITask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func IsTaskRunning(taskID string) bool {
|
func IsTaskRunning(taskID string) bool {
|
||||||
if taskList := GetTasks(taskID, TaskStatusBegin, TaskStatusWorking, 36, ""); len(taskList) > 0 {
|
if taskList := GetTasks(taskID, TaskStatusBegin, TaskStatusWorking, 0, ""); len(taskList) > 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ func GetUsers(db *DaoDB, userType int, keyword string, userIDs []string, userID2
|
|||||||
sql := `
|
sql := `
|
||||||
SELECT *
|
SELECT *
|
||||||
FROM user t1
|
FROM user t1
|
||||||
WHERE t1.deleted_at = ? AND t1.type & ? <> 0`
|
WHERE t1.status = 1 AND t1.deleted_at = ? AND t1.type & ? <> 0`
|
||||||
sqlParams := []interface{}{
|
sqlParams := []interface{}{
|
||||||
utils.DefaultTimeValue,
|
utils.DefaultTimeValue,
|
||||||
userType,
|
userType,
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ const (
|
|||||||
|
|
||||||
SyncFlagStoreName = 8
|
SyncFlagStoreName = 8
|
||||||
SyncFlagStoreAddress = 16
|
SyncFlagStoreAddress = 16
|
||||||
|
SyncFlagStoreStatus = 32
|
||||||
)
|
)
|
||||||
|
|
||||||
func IsSyncStatusNew(syncStatus int8) bool {
|
func IsSyncStatusNew(syncStatus int8) bool {
|
||||||
|
|||||||
@@ -177,9 +177,11 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
mergeStatus := jxutils.MergeStoreStatus(store.Status, store.EbaiStoreStatus)
|
if store.SyncStatus&(model.SyncFlagNewMask|model.SyncFlagStoreStatus) != 0 {
|
||||||
if err = p.updateStoreStatus(userName, storeID, store.VendorStoreID, mergeStatus, store2.Status); err != nil {
|
mergeStatus := jxutils.MergeStoreStatus(store.Status, store.EbaiStoreStatus)
|
||||||
return err
|
if err = p.updateStoreStatus(userName, storeID, store.VendorStoreID, mergeStatus, store2.Status); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
params := genStoreMapFromStore(store)
|
params := genStoreMapFromStore(store)
|
||||||
if err = api.EbaiAPI.ShopUpdate(params); err == nil {
|
if err = api.EbaiAPI.ShopUpdate(params); err == nil {
|
||||||
|
|||||||
@@ -142,7 +142,9 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin
|
|||||||
params["deliveryRangeRadius"] = utils.Str2Int64WithDefault(store.DeliveryRange, 0)
|
params["deliveryRangeRadius"] = utils.Str2Int64WithDefault(store.DeliveryRange, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, params["closeStatus"] = JxStoreStatus2JdStatus(jxutils.MergeStoreStatus(store.Status, store.JdStoreStatus))
|
if store.SyncStatus&(model.SyncFlagNewMask|model.SyncFlagStoreStatus) != 0 {
|
||||||
|
_, params["closeStatus"] = JxStoreStatus2JdStatus(jxutils.MergeStoreStatus(store.Status, store.JdStoreStatus))
|
||||||
|
}
|
||||||
globals.SugarLogger.Debug(utils.Format4Output(params, false))
|
globals.SugarLogger.Debug(utils.Format4Output(params, false))
|
||||||
if globals.EnableJdStoreWrite {
|
if globals.EnableJdStoreWrite {
|
||||||
if err = api.JdAPI.UpdateStoreInfo4Open(store.VendorStoreID, store.RealLastOperator, params); err != nil {
|
if err = api.JdAPI.UpdateStoreInfo4Open(store.VendorStoreID, store.RealLastOperator, params); err != nil {
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ func (p *PurchaseHandler) getStoreSkusBareInfoLimitSize(ctx *jxcontext.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *PurchaseHandler) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) {
|
func (p *PurchaseHandler) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) {
|
||||||
result, err := putils.FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
result, err := putils.FreeBatchStoreSkuInfo("获取门店商品信息", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
list, err := p.getStoreSkusBareInfoLimitSize(ctx, task, storeID, vendorStoreID, batchedStoreSkuList)
|
list, err := p.getStoreSkusBareInfoLimitSize(ctx, task, storeID, vendorStoreID, batchedStoreSkuList)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result = list
|
result = list
|
||||||
|
|||||||
@@ -129,7 +129,9 @@ func (p *PurchaseHandler) UpdateStore(db *dao.DaoDB, storeID int, userName strin
|
|||||||
// if globals.EnableMtwmStoreWrite {
|
// if globals.EnableMtwmStoreWrite {
|
||||||
// err = api.MtwmAPI.PoiSave(storeDetail.VendorStoreID, params)
|
// err = api.MtwmAPI.PoiSave(storeDetail.VendorStoreID, params)
|
||||||
// }
|
// }
|
||||||
errList.AddErr(p.UpdateStoreStatus(jxcontext.AdminCtx, storeID, storeDetail.VendorStoreID, jxutils.MergeStoreStatus(storeDetail.Status, storeDetail.VendorStatus)))
|
if storeDetail.SyncStatus&(model.SyncFlagNewMask|model.SyncFlagStoreStatus) != 0 {
|
||||||
|
errList.AddErr(p.UpdateStoreStatus(jxcontext.AdminCtx, storeID, storeDetail.VendorStoreID, jxutils.MergeStoreStatus(storeDetail.Status, storeDetail.VendorStatus)))
|
||||||
|
}
|
||||||
errList.AddErr(p.UpdateStoreOpTime(jxcontext.AdminCtx, storeID, storeDetail.VendorStoreID, storeDetail.GetOpTimeList()))
|
errList.AddErr(p.UpdateStoreOpTime(jxcontext.AdminCtx, storeID, storeDetail.VendorStoreID, storeDetail.GetOpTimeList()))
|
||||||
return errList.GetErrListAsOne()
|
return errList.GetErrListAsOne()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -94,8 +94,10 @@ func (p *PurchaseHandler) CreateStoreCategory(ctx *jxcontext.Context, storeID in
|
|||||||
panic("catName is empty")
|
panic("catName is empty")
|
||||||
}
|
}
|
||||||
globals.SugarLogger.Debugf("mtwm CreateStoreCategory vendorStoreID:%s, originName:%s, catName:%s, subCatName:%s, seq:%d", vendorStoreID, originName, catName, subCatName, storeCat.Seq)
|
globals.SugarLogger.Debugf("mtwm CreateStoreCategory vendorStoreID:%s, originName:%s, catName:%s, subCatName:%s, seq:%d", vendorStoreID, originName, catName, subCatName, storeCat.Seq)
|
||||||
if globals.EnableMtwmStoreWrite {
|
if !(originName == catName && subCatName == "") {
|
||||||
err = api.MtwmAPI.RetailCatUpdate(vendorStoreID, originName, catName, subCatName, storeCat.Seq)
|
if globals.EnableMtwmStoreWrite {
|
||||||
|
err = api.MtwmAPI.RetailCatUpdate(vendorStoreID, originName, catName, subCatName, storeCat.Seq)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storeCat.VendorCatID = storeCat.Name
|
storeCat.VendorCatID = storeCat.Name
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ func (p *DefSingleStorePlatform) DeleteStoreAllSkus(ctx *jxcontext.Context, pare
|
|||||||
VendorSkuID: v.SkuList[0].VendorSkuID,
|
VendorSkuID: v.SkuList[0].VendorSkuID,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err = FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
_, err = FreeBatchStoreSkuInfo("删除门店商品", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
_, err = p.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList)
|
_, err = p.DeleteStoreSkus(ctx, storeID, vendorStoreID, batchedStoreSkuList)
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
|
}, ctx, parentTask, storeStoreList, p.GetStoreSkusBatchSize(partner.FuncDeleteStoreSkus), isContinueWhenError)
|
||||||
@@ -76,7 +76,7 @@ func flatCatList(catList []*partner.BareCategoryInfo) (flattedCatList []*partner
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *DefSingleStorePlatform) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) {
|
func (p *DefSingleStorePlatform) GetStoreSkusBareInfo(ctx *jxcontext.Context, parentTask tasksch.ITask, storeID int, vendorStoreID string, inStoreSkuList []*partner.StoreSkuInfo) (outStoreSkuList []*partner.StoreSkuInfo, err error) {
|
||||||
resultList, err := FreeBatchStoreSkuInfo(func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
resultList, err := FreeBatchStoreSkuInfo("获取门店商品信息", func(task tasksch.ITask, batchedStoreSkuList []*partner.StoreSkuInfo) (result interface{}, successCount int, err error) {
|
||||||
result, err = p.GetStoreSkusFullInfo(ctx, parentTask, storeID, vendorStoreID, batchedStoreSkuList)
|
result, err = p.GetStoreSkusFullInfo(ctx, parentTask, storeID, vendorStoreID, batchedStoreSkuList)
|
||||||
return result, successCount, err
|
return result, successCount, err
|
||||||
}, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo), true)
|
}, ctx, parentTask, inStoreSkuList, p.GetStoreSkusBatchSize(partner.FuncGetStoreSkusFullInfo), true)
|
||||||
@@ -134,9 +134,9 @@ func (p *DefSingleStorePlatform) GetStoreCategory(ctx *jxcontext.Context, storeI
|
|||||||
return cat, err
|
return cat, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func FreeBatchStoreSkuInfo(handler func(tasksch.ITask, []*partner.StoreSkuInfo) (interface{}, int, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
|
func FreeBatchStoreSkuInfo(name string, handler func(tasksch.ITask, []*partner.StoreSkuInfo) (interface{}, int, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*partner.StoreSkuInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
|
||||||
if true { //len(storeSkuList) > batchSize {
|
if true { //len(storeSkuList) > batchSize {
|
||||||
task := tasksch.NewParallelTask2("FreeBatchStoreSkuInfo", tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
|
task := tasksch.NewParallelTask2(fmt.Sprintf("FreeBatchStoreSkuInfo:%s", name), tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||||||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
|
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
|
||||||
batchStoreSkuList := make([]*partner.StoreSkuInfo, len(batchItemList))
|
batchStoreSkuList := make([]*partner.StoreSkuInfo, len(batchItemList))
|
||||||
for k, v := range batchItemList {
|
for k, v := range batchItemList {
|
||||||
@@ -159,9 +159,9 @@ func FreeBatchStoreSkuInfo(handler func(tasksch.ITask, []*partner.StoreSkuInfo)
|
|||||||
return resultList, err
|
return resultList, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func FreeBatchStoreSkuSyncInfo(handler func(tasksch.ITask, []*dao.StoreSkuSyncInfo) (interface{}, int, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*dao.StoreSkuSyncInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
|
func FreeBatchStoreSkuSyncInfo(name string, handler func(tasksch.ITask, []*dao.StoreSkuSyncInfo) (interface{}, int, error), ctx *jxcontext.Context, parentTask tasksch.ITask, storeSkuList []*dao.StoreSkuSyncInfo, batchSize int, isContinueWhenError bool) (resultList []interface{}, err error) {
|
||||||
if true { //len(storeSkuList) > batchSize {
|
if true { //len(storeSkuList) > batchSize {
|
||||||
task := tasksch.NewParallelTask2("FreeBatchStoreSkuSyncInfo", tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
|
task := tasksch.NewParallelTask2(fmt.Sprintf("FreeBatchStoreSkuSyncInfo:%s", name), tasksch.NewParallelConfig().SetParallelCount(1).SetBatchSize(batchSize).SetIsContinueWhenError(isContinueWhenError), ctx,
|
||||||
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
|
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, successCount int, err error) {
|
||||||
batchStoreSkuList := make([]*dao.StoreSkuSyncInfo, len(batchItemList))
|
batchStoreSkuList := make([]*dao.StoreSkuSyncInfo, len(batchItemList))
|
||||||
for k, v := range batchItemList {
|
for k, v := range batchItemList {
|
||||||
|
|||||||
Reference in New Issue
Block a user