- SyncStoresCategory

- TaskError for json output.
This commit is contained in:
gazebo
2018-10-27 16:12:00 +08:00
parent 970e8dc7b2
commit eac24b9a5a
8 changed files with 163 additions and 26 deletions

View File

@@ -273,26 +273,47 @@ func (v *VendorSync) SyncSku(ctx *jxcontext.Context, db *dao.DaoDB, nameID, skuI
return "", err
}
func (v *VendorSync) SyncStoresCategory(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, isAsync bool) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresCategory")
return v.LoopStoresMap(ctx, db, "SyncStoresCategory", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
if handler := v.GetSingleStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask("SyncStoresCategory loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.SyncStoreCategory(ctx, task, storeID, false)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}
_, err = handler.SyncStoreCategory(ctx, t, loopMapInfo.StoreMapList[0].StoreID, false)
}
return nil, err
})
}
//
func (v *VendorSync) SyncStoresSkus(ctx *jxcontext.Context, db *dao.DaoDB, vendorIDs []int, storeIDs []int, skuIDs []int, isAsync bool, userName string) (hint string, err error) {
globals.SugarLogger.Debug("SyncStoresSkus")
hint, err = v.LoopStoresMap(ctx, db, "SyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
return v.LoopStoresMap(ctx, db, "SyncStoresSkus", isAsync, vendorIDs, storeIDs, func(t *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (interface{}, error) {
loopMapInfo := batchItemList[0].(*LoopStoreMapInfo)
handler := v.GetStoreHandler(loopMapInfo.VendorID)
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false)
if handler := v.GetStoreHandler(loopMapInfo.VendorID); handler != nil {
if len(loopMapInfo.StoreMapList) > 1 {
loopStoreTask := tasksch.NewSeqTask("SyncStoresSkus loop stores", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) {
storeID := loopMapInfo.StoreMapList[step].StoreID
_, err = handler.SyncStoreSkus(ctx, task, storeID, skuIDs, false)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}, len(loopMapInfo.StoreMapList))
t.AddChild(loopStoreTask).Run()
_, err = loopStoreTask.GetResult(0)
return nil, err
}
_, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false)
}
_, err = handler.SyncStoreSkus(ctx, t, loopMapInfo.StoreMapList[0].StoreID, skuIDs, false)
return nil, err
})
return hint, err
}
func (v *VendorSync) LoopStoresMap(ctx *jxcontext.Context, db *dao.DaoDB, taskName string, isAsync bool, vendorIDs []int, storeIDs []int, handler tasksch.WorkFunc) (hint string, err error) {

View File

@@ -43,7 +43,7 @@ func NewParallelConfig() *ParallelConfig {
return &ParallelConfig{
ParallelCount: DefParallelCount,
BatchSize: 1,
IsContinueWhenError: false,
IsContinueWhenError: true,
ResultHandler: nil,
}
}
@@ -127,7 +127,7 @@ func (task *ParallelTask) Run() {
goto end
}
task.locker.Lock()
task.DetailErrList = append(task.DetailErrList, err)
task.DetailErrList = append(task.DetailErrList, NewTaskError(err))
task.locker.Unlock()
}
}
@@ -175,7 +175,7 @@ func (task *ParallelTask) Run() {
task.Status = TaskStatusFinished
}
}
task.Err = taskErr
task.Err = NewTaskError(taskErr)
task.Result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()

View File

@@ -46,7 +46,7 @@ func (task *SeqTask) Run() {
break
}
task.locker.Lock()
task.DetailErrList = append(task.DetailErrList, err)
task.DetailErrList = append(task.DetailErrList, NewTaskError(err))
task.locker.Unlock()
} else if result != nil {
taskResult = append(taskResult, utils.Interface2Slice(result)...)
@@ -64,7 +64,7 @@ func (task *SeqTask) Run() {
task.Status = TaskStatusFinished
}
}
task.Err = taskErr
task.Err = NewTaskError(taskErr)
task.Result = taskResult
task.TerminatedAt = time.Now()
task.locker.Unlock()

View File

@@ -43,6 +43,20 @@ type ITask interface {
json.Marshaler
}
type TaskError struct {
error
}
func (t *TaskError) MarshalJSON() ([]byte, error) {
return json.Marshal(t.Error())
}
func NewTaskError(err error) *TaskError {
return &TaskError{
err,
}
}
type BaseTask struct {
Name string `json:"name"`
ID string `json:"id"`
@@ -61,10 +75,11 @@ type BaseTask struct {
FailedJobCount int `json:"failedJobCount"`
Status int `json:"status"`
Result []interface{} `json:"result"`
Err error `json:"err"`
Children TaskList `json:"children"`
DetailErrList []error `json:"detailErrList"`
Result []interface{} `json:"result"`
Children TaskList `json:"children"`
Err error `json:"err"`
DetailErrList []error `json:"detailErrList"`
finishChan chan int
C <-chan int `json:"-"`

View File

@@ -0,0 +1,14 @@
package tasksch
import (
"errors"
"fmt"
"testing"
"git.rosy.net.cn/baseapi/utils"
)
func TestTaskError(t *testing.T) {
err := NewTaskError(errors.New("hello"))
fmt.Println(utils.Format4Output(err, false))
}

View File

@@ -49,6 +49,8 @@ type tStoreSkuFullInfo struct {
EbaiCat1ID int64 `orm:"column(ebai_cat1_id)"`
EbaiCat2ID int64 `orm:"column(ebai_cat2_id)"`
EbaiCat3ID int64 `orm:"column(ebai_cat3_id)"`
PricePercentage int
}
type tStoreCatInfo struct {
@@ -81,7 +83,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
return "", err
}
sql := `
SELECT t1.*, t2.spec_quality, t2.spec_unit, t2.weight, t2.status sku_status,
SELECT t8.price_percentage, t1.*, t2.spec_quality, t2.spec_unit, t2.weight, t2.status sku_status,
t3.prefix, t3.name, t2.comment, t3.is_global, t3.unit, t3.img,
t4.name cat_name,
t4.id cat_id, t4.level cat_level, t5.ebai_id cat_ebai_id,
@@ -96,6 +98,7 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
LEFT JOIN store_sku_category_map t5p ON t5p.store_id = t1.store_id AND t5p.category_id = t4p.id AND t5p.deleted_at = ?
LEFT JOIN sku_vendor_category cat1 ON t4.ebai_category_id = cat1.vendor_category_id AND cat1.vendor_id = ?
LEFT JOIN sku_vendor_category cat2 ON cat1.parent_id = cat2.vendor_category_id AND cat1.vendor_id = ?
JOIN store_map t8 ON t8.store_id = t1.store_id AND t8.vendor_id = ? AND t8.deleted_at = ?
WHERE t1.store_id = ? AND (t1.ebai_sync_status <> 0)
`
sqlParams := []interface{}{
@@ -103,6 +106,8 @@ func (p *PurchaseHandler) SyncStoreSkus(ctx *jxcontext.Context, parentTask tasks
utils.DefaultTimeValue,
model.VendorIDEBAI,
model.VendorIDEBAI,
model.VendorIDEBAI,
utils.DefaultTimeValue,
storeID,
}
if len(skuIDs) > 0 {
@@ -270,12 +275,13 @@ func (p *PurchaseHandler) RefreshStoresAllSkusID(ctx *jxcontext.Context, parentT
///////////
func genSkuParamsFromStoreSkuInfo(storeSku *tStoreSkuFullInfo) map[string]interface{} {
price := jxutils.CaculateSkuVendorPrice(storeSku.Price, storeSku.PricePercentage)
return map[string]interface{}{
"name": jxutils.ComposeSkuName(storeSku.Prefix, storeSku.Name, storeSku.Comment, storeSku.Unit, storeSku.SpecQuality, storeSku.SpecUnit, 0),
"status": jxSkuStatus2Ebai(jxutils.MergeSkuStatus(storeSku.SkuStatus, storeSku.Status)),
"left_num": ebaiapi.MaxLeftNum,
"sale_price": storeSku.Price,
"market_price": storeSku.Price,
"sale_price": price,
"market_price": price,
"cat1_id": getEbaiCat(storeSku.EbaiCat1ID, 1),
"cat2_id": getEbaiCat(storeSku.EbaiCat2ID, 2),
"cat3_id": getEbaiCat(storeSku.EbaiCat3ID, 3),
@@ -391,13 +397,13 @@ func (p *PurchaseHandler) SyncStoreCategory(ctx *jxcontext.Context, parentTask t
if catInfo.EbaiSyncStatus&model.SyncFlagDeletedMask != 0 { // 删除
err = api.EbaiAPI.ShopCategoryDelete(strStoreID, catInfo.EbaiID)
} else if catInfo.EbaiSyncStatus&model.SyncFlagNewMask != 0 { // 新增
ebaiID, err2 := api.EbaiAPI.ShopCategoryCreate(strStoreID, catInfo.ParentEbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID))
ebaiID, err2 := api.EbaiAPI.ShopCategoryCreate(strStoreID, catInfo.ParentEbaiID, formatName(catInfo.Name), jxCatSeq2Ebai(catInfo.Seq), utils.Int2Str(catInfo.CategoryID))
if err = err2; err == nil {
catInfo.EbaiID = ebaiID
updateFields = append(updateFields, model.FieldEbaiID)
}
} else if catInfo.EbaiSyncStatus&model.SyncFlagModifiedMask != 0 { // 修改
err = api.EbaiAPI.ShopCategoryUpdate(strStoreID, catInfo.EbaiID, formatName(catInfo.Name), int(catInfo.Seq+1), utils.Int2Str(catInfo.CategoryID))
err = api.EbaiAPI.ShopCategoryUpdate(strStoreID, catInfo.EbaiID, formatName(catInfo.Name), jxCatSeq2Ebai(catInfo.Seq), utils.Int2Str(catInfo.CategoryID))
}
}
if err == nil {
@@ -473,3 +479,8 @@ func (p *PurchaseHandler) updateLocalCatAsNew(db *dao.DaoDB, localCatMap map[str
func formatName(name string) string {
return strings.Trim(utils.FilterMb4(name), "\n\r\t ")
}
// 饿百的排序是从大到小
func jxCatSeq2Ebai(seq int) int {
return 10000 - seq
}

View File

@@ -3,6 +3,7 @@ package controllers
import (
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxstore/cms"
"git.rosy.net.cn/jx-callback/business/model/dao"
"github.com/astaxie/beego"
)
@@ -10,6 +11,65 @@ type SyncController struct {
beego.Controller
}
// @Title 同步商家商品信息
// @Description 同步商家商品信息
// @Param token header string true "认证token"
// @Param storeIDs formData string true "门店ID列表"
// @Param vendorIDs formData string true "厂商ID列表"
// @Param isAsync formData bool true "是否异步操作"
// @Param skuIDs formData string false "SKU ID列表缺省为全部"
// @Param isContinueWhenErr formData bool false "单个同步失败是否继续缺省false"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /SyncStoresSkus [put]
func (c *SyncController) SyncStoresSkus() {
c.callSyncStoresSkus(func(params *tSyncSyncStoresSkusParams) (retVal interface{}, errCode string, err error) {
db := dao.GetDB()
var storeIDs []int
var skuIDs []int
var vendorIDs []int
if err = utils.UnmarshalUseNumber([]byte(params.StoreIDs), &storeIDs); err != nil {
return retVal, "", err
}
if err = utils.UnmarshalUseNumber([]byte(params.VendorIDs), &vendorIDs); err != nil {
return retVal, "", err
}
if params.SkuIDs != "" {
if err = utils.UnmarshalUseNumber([]byte(params.SkuIDs), &skuIDs); err != nil {
return retVal, "", err
}
}
retVal, err = cms.CurVendorSync.SyncStoresSkus(params.Ctx, db, vendorIDs, storeIDs, skuIDs, params.IsAsync, params.Ctx.GetUserName())
return retVal, "", err
})
}
// @Title 同步商家商品信息
// @Description 同步商家商品信息
// @Param token header string true "认证token"
// @Param storeIDs formData string true "门店ID列表"
// @Param vendorIDs formData string true "厂商ID列表"
// @Param isAsync formData bool true "是否异步操作"
// @Param isContinueWhenErr formData bool false "单个同步失败是否继续缺省false"
// @Success 200 {object} controllers.CallResult
// @Failure 200 {object} controllers.CallResult
// @router /SyncStoresCategory [put]
func (c *SyncController) SyncStoresCategory() {
c.callSyncStoresCategory(func(params *tSyncSyncStoresCategoryParams) (retVal interface{}, errCode string, err error) {
db := dao.GetDB()
var storeIDs []int
var vendorIDs []int
if err = utils.UnmarshalUseNumber([]byte(params.StoreIDs), &storeIDs); err != nil {
return retVal, "", err
}
if err = utils.UnmarshalUseNumber([]byte(params.VendorIDs), &vendorIDs); err != nil {
return retVal, "", err
}
retVal, err = cms.CurVendorSync.SyncStoresCategory(params.Ctx, db, vendorIDs, storeIDs, params.IsAsync)
return retVal, "", err
})
}
// @Title 全部刷新所有门店商家ID
// @Description 全部刷新所有门店商家ID
// @Param token header string true "认证token"

View File

@@ -575,6 +575,22 @@ func init() {
MethodParams: param.Make(),
Params: nil})
beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"],
beego.ControllerComments{
Method: "SyncStoresCategory",
Router: `/SyncStoresCategory`,
AllowHTTPMethods: []string{"put"},
MethodParams: param.Make(),
Params: nil})
beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SyncController"],
beego.ControllerComments{
Method: "SyncStoresSkus",
Router: `/SyncStoresSkus`,
AllowHTTPMethods: []string{"put"},
MethodParams: param.Make(),
Params: nil})
beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"],
beego.ControllerComments{
Method: "CancelTask",