From 81089fb85cb7b99e296d1e4bb7f4b18be54781f2 Mon Sep 17 00:00:00 2001 From: gazebo Date: Sat, 22 Sep 2018 10:26:31 +0800 Subject: [PATCH] - category sync almost ok. - refactor tasksch (isContinueWhenError) --- business/jxstore/cms/sku.go | 15 ++- business/jxstore/cms/sync.go | 127 +++++++++++++++------- business/jxutils/jxutils_cms.go | 16 ++- business/jxutils/tasksch/task.go | 105 +++++++++++------- business/jxutils/tasksch/task_man.go | 8 +- business/jxutils/tasksch/task_man_test.go | 20 ++-- business/jxutils/tasksch/task_test.go | 20 ++-- business/partner/partner.go | 5 + business/partner/purchase/jd/sku.go | 45 +++++++- business/partner/purchase/jd/store.go | 8 ++ business/partner/purchase/jd/store_sku.go | 4 +- controllers/cms_sku.go | 16 +++ routers/commentsRouter_controllers.go | 8 ++ 13 files changed, 283 insertions(+), 114 deletions(-) diff --git a/business/jxstore/cms/sku.go b/business/jxstore/cms/sku.go index bbd29db52..ddfb8f778 100644 --- a/business/jxstore/cms/sku.go +++ b/business/jxstore/cms/sku.go @@ -47,6 +47,16 @@ func AddCategory(cat *model.SkuCategory, userName string) (outCat *model.SkuCate dao.WrapAddIDCULDEntity(cat, userName) cat.JdSyncStatus = model.SyncFlagNewMask cat.JdID = genTmpID() + db := dao.GetDB() + if cat.Seq <= 0 { + var maxSeq struct { + MaxSeq int + } + if err = dao.GetRow(db, &maxSeq, "SELECT MAX(seq) max_seq FROM sku_category t1 WHERE level = ?", cat.Level); err != nil { + return nil, err + } + cat.Seq = maxSeq.MaxSeq + 1 + } if err = dao.CreateEntity(nil, cat); err == nil { outCat = cat err = CurVendorSync.SyncCategory(nil, cat.ID, false, userName) @@ -89,13 +99,12 @@ func ReorderCategories(parentID int, categoryIDs []int, userName string) (err er catsMap[cat.ID] = cat } for k, v := range categoryIDs { - catsMap[v].Seq = (k + 1) * 5 - // catsMap[v].JdSyncStatus = model.SyncFlagModifiedMask + catsMap[v].Seq = k + catsMap[v].JdSyncStatus = model.SyncFlagModifiedMask if _, err = dao.UpdateEntity(db, catsMap[v], "Seq"); err != nil { break } } - // todo 这里应该也需要先置标记 if err == nil { err = CurVendorSync.SyncReorderCategories(db, parentID, false, userName) } diff --git a/business/jxstore/cms/sync.go b/business/jxstore/cms/sync.go index 2ef392172..8ced229b3 100644 --- a/business/jxstore/cms/sync.go +++ b/business/jxstore/cms/sync.go @@ -47,23 +47,28 @@ func (v *VendorSync) SyncCategory(db *dao.DaoDB, categoryID int, isForce bool, u } err := dao.GetEntitiesByKV(db, &cats, cond, true) if err == nil { - tasksch.RunTask("", func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { - cat := batchItemList[0].(*model.SkuCategory) - updateFields := []string{model.FieldJdSyncStatus} - if (cat.JdSyncStatus & model.SyncFlagDeletedMask) != 0 { //删除 - err = multiStoresHandler.DeleteCategory(db, cat, userName) - } else if (cat.JdSyncStatus&model.SyncFlagNewMask) != 0 || isForce { // 新增 - err = multiStoresHandler.CreateCategory(db, cat, userName) - updateFields = append(updateFields, model.FieldJdID) - } else if (cat.JdSyncStatus & model.SyncFlagModifiedMask) != 0 { // 修改 - err = multiStoresHandler.UpdateCategory(db, cat, userName) - } - if err == nil { - cat.JdSyncStatus = 0 - _, err = dao.UpdateEntity(db, cat, model.FieldJdSyncStatus) + task := tasksch.RunTask("", false, nil, len(cats), 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + for _, v := range batchItemList { + cat := v.(*model.SkuCategory) + updateFields := []string{multiStoresHandler.GetFieldSyncStatusName()} + if (cat.JdSyncStatus & model.SyncFlagDeletedMask) != 0 { //删除 + err = multiStoresHandler.DeleteCategory(db, cat, userName) + } else if (cat.JdSyncStatus&model.SyncFlagNewMask) != 0 || isForce { // 新增 + err = multiStoresHandler.CreateCategory(db, cat, userName) + updateFields = append(updateFields, multiStoresHandler.GetFieldIDName()) + } else if (cat.JdSyncStatus & model.SyncFlagModifiedMask) != 0 { // 修改 + err = multiStoresHandler.UpdateCategory(db, cat, userName) + } + if err == nil { + cat.JdSyncStatus = 0 + _, err = dao.UpdateEntity(db, cat, updateFields...) + } else { + break + } } return nil, err - }, nil, len(cats), 1, "", cats) + }, cats) + _, err = task.GetResult(0) } return nil, err }) @@ -71,11 +76,17 @@ func (v *VendorSync) SyncCategory(db *dao.DaoDB, categoryID int, isForce bool, u } func (v *VendorSync) SyncReorderCategories(db *dao.DaoDB, categoryID int, isForce bool, userName string) (err error) { - // todo - if handler := GetPurchaseHandler(model.VendorIDJD); handler != nil { - err = handler.(partner.IMultipleStoresHandler).ReorderCategories(db, categoryID, userName) - } - return nil + err = v.LoopMultiStoresVendors(db, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + multiStoresHandler := batchItemList[0].(partner.IMultipleStoresHandler) + err2 := multiStoresHandler.ReorderCategories(db, categoryID, userName) + if err2 == nil { + cat := &model.SkuCategory{} + _, err2 = dao.UpdateEntityByKV(db, cat, utils.Params2Map(multiStoresHandler.GetFieldSyncStatusName(), 0), utils.Params2Map(model.FieldParentID, categoryID)) + return nil, err2 + } + return nil, err2 + }) + return err } func (v *VendorSync) SyncStore(db *dao.DaoDB, vendorID, storeID int, isForce bool, userName string) (err error) { @@ -109,26 +120,30 @@ func (v *VendorSync) SyncSku(db *dao.DaoDB, nameID, skuID int, isForce bool, use } err := dao.GetEntitiesByKV(db, &skuList, cond, true) if err == nil { - globals.SugarLogger.Debug(utils.Format4Output(skuList, false)) - task := tasksch.RunTask("SyncSku", func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { - sku := batchItemList[0].(*model.Sku) - if (skuID == -1 || skuID == sku.ID) && (isForce || sku.JdSyncStatus != 0) { - updateFields := []string{model.FieldJdSyncStatus} - if sku.JdSyncStatus&model.SyncFlagDeletedMask != 0 { // 删除 - err = multiStoresHandler.DeleteSku(db, sku, userName) - } else if sku.JdSyncStatus&model.SyncFlagNewMask != 0 { // 新增 - err = multiStoresHandler.CreateSku(db, sku, userName) - updateFields = append(updateFields, model.FieldJdID) - } else if sku.JdSyncStatus&model.SyncFlagModifiedMask != 0 { // 修改 - err = multiStoresHandler.UpdateSku(db, sku, userName) - } - if err == nil { - sku.JdSyncStatus = 0 - dao.UpdateEntity(db, sku, updateFields...) + // globals.SugarLogger.Debug(utils.Format4Output(skuList, false)) + task := tasksch.RunTask("SyncSku", false, nil, len(skuList), 1, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + for _, v := range batchItemList { + sku := v.(*model.Sku) + if (skuID == -1 || skuID == sku.ID) && (isForce || sku.JdSyncStatus != 0) { + updateFields := []string{multiStoresHandler.GetFieldSyncStatusName()} + if sku.JdSyncStatus&model.SyncFlagDeletedMask != 0 { // 删除 + err = multiStoresHandler.DeleteSku(db, sku, userName) + } else if sku.JdSyncStatus&model.SyncFlagNewMask != 0 { // 新增 + err = multiStoresHandler.CreateSku(db, sku, userName) + updateFields = append(updateFields, multiStoresHandler.GetFieldIDName()) + } else if sku.JdSyncStatus&model.SyncFlagModifiedMask != 0 { // 修改 + err = multiStoresHandler.UpdateSku(db, sku, userName) + } + if err == nil { + sku.JdSyncStatus = 0 + dao.UpdateEntity(db, sku, updateFields...) + } else { + break + } } } return nil, err - }, nil, len(skuList), 1, "", skuList) + }, skuList) _, err = task.GetResult(0) } return nil, err @@ -152,14 +167,14 @@ func (v *VendorSync) SyncStoreSku(db *dao.DaoDB, storeID int, skuIDs []int, isFo func (v *VendorSync) LoopStoreMap(db *dao.DaoDB, storeID int, handler tasksch.WorkFunc) (err error) { storeMaps, err := GetStoreVendorMaps(db, storeID, -1) if err == nil { - task := tasksch.RunTask("", handler, nil, len(storeMaps), 1, "", storeMaps) + task := tasksch.RunTask("LoopStoreMap", false, nil, len(storeMaps), 1, "", handler, storeMaps) _, err = task.GetResult(0) } return err } func (v *VendorSync) LoopMultiStoresVendors(db *dao.DaoDB, handler tasksch.WorkFunc) (err error) { - task := tasksch.RunTask("LoopMultiStoresVendors", handler, nil, len(MultiStoresVendorHandlers), 1, "", MultiStoresVendorHandlers) + task := tasksch.RunTask("LoopMultiStoresVendors", false, nil, len(MultiStoresVendorHandlers), 1, "", handler, MultiStoresVendorHandlers) _, err = task.GetResult(0) return err } @@ -177,7 +192,39 @@ func (v *VendorSync) LoopSingleStoreVendors(db *dao.DaoDB, taskName, userName st if parellelCount > 20 { parellelCount = 20 } - tasksch.RunManagedTask(taskName, handler, nil, parellelCount, 1, userName, storeMaps) + task := tasksch.RunManagedTask(taskName, false, nil, parellelCount, 1, userName, handler, storeMaps) + _, err = task.GetResult(0) + } + return err +} + +func (v *VendorSync) RefreshSkuIDs(nameID, skuID int, isForce bool, userName string) (err error) { + sql := ` + SELECT t1.id + FROM sku t1 + JOIN sku_name t2 ON t1.name_id = t2.id + WHERE 1 = 1 + ` + sqlParams := []interface{}{} + if nameID != -1 { + sql += " AND t1.name_id = ?" + sqlParams = append(sqlParams, nameID) + } + if skuID != -1 { + sql += " AND t1.id = ?" + sqlParams = append(sqlParams, skuID) + } + + var ids []int + db := dao.GetDB() + if err = dao.GetRows(db, &ids, sql, sqlParams); err == nil { + // globals.SugarLogger.Debug(utils.Format4Output(ids, false)) + err = v.LoopMultiStoresVendors(db, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + multiStoresHandler := batchItemList[0].(partner.IMultipleStoresHandler) + err := multiStoresHandler.SyncSkusIDMap(db, ids, userName) + globals.SugarLogger.Debug(err) + return nil, err + }) } return err } diff --git a/business/jxutils/jxutils_cms.go b/business/jxutils/jxutils_cms.go index cb7a239d2..b71d3f91a 100644 --- a/business/jxutils/jxutils_cms.go +++ b/business/jxutils/jxutils_cms.go @@ -3,6 +3,7 @@ package jxutils import ( "fmt" "math" + "reflect" "strings" "time" @@ -17,8 +18,13 @@ func MergeStoreStatus(status int, vendorStatus int) int { return vendorStatus } -func SplitSlice(list []interface{}, batchCount int) (listInList [][]interface{}) { - len := len(list) +func SplitSlice(list interface{}, batchCount int) (listInList [][]interface{}) { + typeInfo := reflect.TypeOf(list) + if typeInfo.Kind() != reflect.Slice { + panic("list must be slice") + } + valueInfo := reflect.ValueOf(list) + len := valueInfo.Len() if len > 0 { listInListLen := (len-1)/batchCount + 1 listInList = make([][]interface{}, listInListLen) @@ -32,7 +38,7 @@ func SplitSlice(list []interface{}, batchCount int) (listInList [][]interface{}) } listInList[index] = make([]interface{}, arrLen) } - listInList[index][i%batchCount] = list[i] + listInList[index][i%batchCount] = valueInfo.Index(i).Interface() } } return listInList @@ -107,3 +113,7 @@ func CaculateSkuPrice(unitPrice int, specQuality float32, specUnit string) int { } return int(math.Round(float64(float32(unitPrice) * specQuality / 500))) } + +func GetSliceLen(list interface{}) int { + return reflect.ValueOf(list).Len() +} diff --git a/business/jxutils/tasksch/task.go b/business/jxutils/tasksch/task.go index ce9d20d23..67cb15769 100644 --- a/business/jxutils/tasksch/task.go +++ b/business/jxutils/tasksch/task.go @@ -22,22 +22,29 @@ const ( TaskStatusEnd = 4 ) +const ( + MaxParallelCount = 10 +) + type WorkFunc func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) type ResultHandlerFunc func(taskName string, result []interface{}, err error) type Task struct { - ID string `json:"id"` - Name string `json:"name"` - CreatedBy string `json:"createdBy"` - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` - TerminatedAt time.Time `json:"terminatedAt"` - ParallelCount int `json:"parallelCount"` - TotalItemCount int `json:"totalItemCount"` - TotalJobCount int `json:"totalJobCount"` - FinishedItemCount int `json:"finishedItemCount"` - FinishedJobCount int `json:"finishedJobCount"` - Status int `json:"status"` + ID string `json:"id"` + Name string `json:"name"` + CreatedBy string `json:"createdBy"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + TerminatedAt time.Time `json:"terminatedAt"` + ParallelCount int `json:"parallelCount"` + TotalItemCount int `json:"totalItemCount"` + TotalJobCount int `json:"totalJobCount"` + FinishedItemCount int `json:"finishedItemCount"` + FinishedJobCount int `json:"finishedJobCount"` + FailedItemCount int `json:"failedItemCount"` + FailedJobCount int `json:"failedJobCount"` + IsContinueWhenError bool `json:"isContinueWhenError"` + Status int `json:"status"` C <-chan int `json:"-"` @@ -72,23 +79,31 @@ var ( ErrTaskIsCanceled = errors.New("任务被取消了") ) -func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task { +func RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { + if parallelCount > MaxParallelCount { + parallelCount = MaxParallelCount + } + listLen := jxutils.GetSliceLen(itemList) + if parallelCount > listLen { + parallelCount = listLen + } realItemList := utils.Interface2Slice(itemList) jobList := jxutils.SplitSlice(realItemList, batchSize) task := &Task{ - ID: utils.GetUUID(), - Name: taskName, - CreatedAt: time.Now(), - CreatedBy: userName, - UpdatedAt: time.Now(), - TotalJobCount: len(jobList), - TotalItemCount: len(realItemList), - ParallelCount: parallelCount, - taskChan: make(chan []interface{}, parallelCount*100), - quitChan: make(chan int, parallelCount), - subFinishChan: make(chan interface{}, parallelCount), - finishChan: make(chan int, 2), - Status: TaskStatusWorking, + ID: utils.GetUUID(), + Name: taskName, + CreatedAt: time.Now(), + CreatedBy: userName, + UpdatedAt: time.Now(), + TotalJobCount: len(jobList), + TotalItemCount: len(realItemList), + ParallelCount: parallelCount, + taskChan: make(chan []interface{}, len(realItemList)+parallelCount), // 确保能装下所有taskitem,加结束标记 + quitChan: make(chan int, parallelCount), + subFinishChan: make(chan interface{}, parallelCount), + finishChan: make(chan int, 2), + Status: TaskStatusWorking, + IsContinueWhenError: isContinueWhenError, } task.C = task.finishChan go func() { @@ -99,35 +114,34 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, retVal := make([]interface{}, 0) for { select { - case <-task.quitChan: + case <-task.quitChan: // 取消 goto end case job := <-task.taskChan: - if job == nil { + if job == nil { // 任务完成 chanRetVal = retVal goto end } else { result, err := worker(job, params...) globals.SugarLogger.Debugf("RunTask %s, after call worker result:%v, err:%v", taskName, result, err) + task.finishedOneJob(len(job), err) if err == nil { - task.finishedOneJob(len(job)) if result != nil { retVal = append(retVal, utils.Interface2Slice(result)...) } - } else { + } else if !isContinueWhenError { // 出错 chanRetVal = err - go func() { - task.Cancel() - }() goto end } } } } end: - // globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal) - if task.GetStatus() < TaskStatusEndBegin { + globals.SugarLogger.Debugf("RunTask %s, put to chann chanRetVal:%v", taskName, chanRetVal) + task.locker.RLock() + if task.Status < TaskStatusEndBegin { task.subFinishChan <- chanRetVal } + task.locker.RUnlock() }() } for _, job := range jobList { @@ -141,10 +155,12 @@ func RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, var taskErr error for i := 0; i < parallelCount; i++ { result := <-task.subFinishChan + // globals.SugarLogger.Debugf("RunTask %s, received from chann result:%v", taskName, result) if err2, ok := result.(error); ok { + task.Cancel() taskResult = nil taskErr = err2 - break + break // 出错情况下是否需要直接跳出? } else if result != nil { resultList := result.([]interface{}) taskResult = append(taskResult, resultList...) @@ -201,11 +217,13 @@ func (t *Task) GetResult(duration time.Duration) (retVal []interface{}, err erro } func (t *Task) Cancel() { - if t.GetStatus() < TaskStatusEndBegin { + t.locker.Lock() + defer t.locker.Unlock() + if t.Status < TaskStatusEndBegin && t.Status != TaskStatusCanceling { + t.Status = TaskStatusCanceling for i := 0; i < t.ParallelCount; i++ { t.quitChan <- 0 } - t.setStatus(TaskStatusCanceling) } } @@ -240,13 +258,18 @@ func (t *Task) GetStatus() int { ///////// -func (t *Task) finishedOneJob(itemCount int) { +func (t *Task) finishedOneJob(itemCount int, err error) { t.locker.Lock() defer t.locker.Unlock() t.UpdatedAt = time.Now() - t.FinishedItemCount += itemCount - t.FinishedJobCount++ + if err == nil { + t.FinishedItemCount += itemCount + t.FinishedJobCount++ + } else { + t.FailedItemCount += itemCount + t.FailedJobCount++ + } } func (t *Task) setStatus(status int) { diff --git a/business/jxutils/tasksch/task_man.go b/business/jxutils/tasksch/task_man.go index 41cefbf0e..43e355d65 100644 --- a/business/jxutils/tasksch/task_man.go +++ b/business/jxutils/tasksch/task_man.go @@ -17,8 +17,8 @@ func init() { defTaskMan.taskList = make(map[string]*Task) } -func (m *TaskMan) RunTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task { - task := RunTask(taskName, worker, resultHandler, parallelCount, batchSize, userName, itemList, params...) +func (m *TaskMan) RunTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { + task := RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...) m.taskList[task.ID] = task return task } @@ -34,8 +34,8 @@ func (m *TaskMan) GetTasks(taskID string, fromStatus, toStatus int, lastHours in return taskList } -func RunManagedTask(taskName string, worker WorkFunc, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, itemList interface{}, params ...interface{}) *Task { - return defTaskMan.RunTask(taskName, worker, resultHandler, parallelCount, batchSize, userName, itemList, params...) +func RunManagedTask(taskName string, isContinueWhenError bool, resultHandler ResultHandlerFunc, parallelCount, batchSize int, userName string, worker WorkFunc, itemList interface{}, params ...interface{}) *Task { + return defTaskMan.RunTask(taskName, isContinueWhenError, resultHandler, parallelCount, batchSize, userName, worker, itemList, params...) } func GetTasks(taskID string, fromStatus, toStatus int, lastHours int) (taskList []*Task) { diff --git a/business/jxutils/tasksch/task_man_test.go b/business/jxutils/tasksch/task_man_test.go index 7bc639371..59b19eebd 100644 --- a/business/jxutils/tasksch/task_man_test.go +++ b/business/jxutils/tasksch/task_man_test.go @@ -13,7 +13,10 @@ func TestTaskMan(t *testing.T) { for k := range itemList { itemList[k] = k } - task1 := RunManagedTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task1 := RunManagedTask("test", false, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -22,12 +25,12 @@ func TestTaskMan(t *testing.T) { retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) } return retSlice, nil - }, func(taskName string, result []interface{}, err error) { - // t.Log("finished here") - // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + }, itemList, "a", "b", 1, 2) - task2 := RunManagedTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task2 := RunManagedTask("test", false, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -36,10 +39,7 @@ func TestTaskMan(t *testing.T) { retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) } return retSlice, nil - }, func(taskName string, result []interface{}, err error) { - // t.Log("finished here") - // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + }, itemList, "a", "b", 1, 2) time.Sleep(2 * time.Second) task2.Cancel() if task1.GetStatus() == task2.GetStatus() { diff --git a/business/jxutils/tasksch/task_test.go b/business/jxutils/tasksch/task_test.go index 42077f624..f489f3493 100644 --- a/business/jxutils/tasksch/task_test.go +++ b/business/jxutils/tasksch/task_test.go @@ -13,7 +13,10 @@ func TestRunTask(t *testing.T) { for k := range itemList { itemList[k] = k } - task := RunTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := RunTask("test", false, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -22,10 +25,7 @@ func TestRunTask(t *testing.T) { retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) } return retSlice, nil - }, func(taskName string, result []interface{}, err error) { - // t.Log("finished here") - // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + }, itemList, "a", "b", 1, 2) result, err := task.GetResult(1 * time.Microsecond) if err == nil || task.GetStatus() != TaskStatusWorking { t.Fatal("task can not be done in 1 microsecond") @@ -46,7 +46,10 @@ func TestCancelTask(t *testing.T) { for k := range itemList { itemList[k] = k } - task := RunTask("test", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + task := RunTask("test", false, func(taskName string, result []interface{}, err error) { + // t.Log("finished here") + // t.Log(utils.Format4Output(result, false)) + }, 100, 7, "autotest", func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { sleepSecond := rand.Intn(5) t.Logf("sleep %d seconds", sleepSecond) time.Sleep(time.Duration(sleepSecond) * time.Second) @@ -55,10 +58,7 @@ func TestCancelTask(t *testing.T) { retSlice[k] = "hello:" + utils.Int2Str(batchItemList[k].(int)*2) } return retSlice, nil - }, func(taskName string, result []interface{}, err error) { - // t.Log("finished here") - // t.Log(utils.Format4Output(result, false)) - }, 100, 7, "autotest", itemList, "a", "b", 1, 2) + }, itemList, "a", "b", 1, 2) // time.Sleep(time.Second * 6) t.Logf("finishedItemCount:%d, finishedJobCount:%d", task.GetFinishedItemCount(), task.GetFinishedJobCount()) task.Cancel() diff --git a/business/partner/partner.go b/business/partner/partner.go index bef3ddbe3..d9989a8b0 100644 --- a/business/partner/partner.go +++ b/business/partner/partner.go @@ -78,6 +78,9 @@ type IPurchasePlatformHandler interface { // db *dao.DaoDB, type IMultipleStoresHandler interface { + GetFieldIDName() string + GetFieldSyncStatusName() string + ReadCategories() (cats []*model.SkuCategory, err error) CreateCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) @@ -90,6 +93,8 @@ type IMultipleStoresHandler interface { ReadSku(vendorSkuID string) (skuNameExt *model.SkuNameExt, err error) UpdateSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName string) (err error) + + SyncSkusIDMap(db *dao.DaoDB, skuIDs []int, userName string) (err error) } type ISingleStoreHandler interface { diff --git a/business/partner/purchase/jd/sku.go b/business/partner/purchase/jd/sku.go index a18d02760..aff14d932 100644 --- a/business/partner/purchase/jd/sku.go +++ b/business/partner/purchase/jd/sku.go @@ -3,8 +3,10 @@ package jd // 这里函数取得的信息,除了与自身实体相关的ID(比如PARENT ID),都已经转换成了本地ID了 import ( + "git.rosy.net.cn/baseapi/platformapi/jdapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils" + "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" @@ -137,7 +139,6 @@ func (p *PurchaseHandler) cuSku(db *dao.DaoDB, sku *model.Sku, handler func(skuE } if err == nil { skuName := jxutils.ComposeSkuName(skuInfoExt.Prefix, skuInfoExt.Name, skuInfoExt.Comment, skuInfoExt.Unit, sku.SpecQuality, sku.SpecUnit, 0) - globals.SugarLogger.Debug(utils.Format4Output(skuInfoExt, false), utils.Format4Output(sku, false)) skuPrice := jxutils.CaculateSkuPrice(skuInfoExt.Price, sku.SpecQuality, sku.SpecUnit) result, err2 := handler(&skuInfoExt, skuPrice, skuName, shopCategories, addParams) if err = err2; err == nil { @@ -252,6 +253,48 @@ func (p *PurchaseHandler) DeleteSku(db *dao.DaoDB, sku *model.Sku, userName stri return err } +func (p *PurchaseHandler) SyncSkusIDMap(db *dao.DaoDB, skuIDs []int, userName string) (err error) { + sql := ` + SELECT t1.id out_sku_id, t1.jd_id sku_id + FROM sku t1 + WHERE t1.jd_sync_status <> 0 + ` + // AND t1.jd_sync_status <> 0 + sqlParams := []interface{}{} + if skuIDs != nil && len(skuIDs) > 0 { + sql += " AND t1.id IN (" + dao.GenQuestionMarks(len(skuIDs)) + ")" + sqlParams = append(sqlParams, skuIDs) + } + var skuPairs []*jdapi.SkuIDPair + if err = dao.GetRows(db, &skuPairs, sql, sqlParams); err == nil { + // globals.SugarLogger.Debug(utils.Format4Output(skuPairs, false)) + globals.SugarLogger.Debug(len(skuPairs)) + task := tasksch.RunTask("SyncSkusIDMap", true, nil, 10, 1, userName, func(batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { + skuPairs := make([]*jdapi.SkuIDPair, len(batchItemList)) + for k, v := range batchItemList { + pair := v.(*jdapi.SkuIDPair) + skuPairs[k] = &jdapi.SkuIDPair{ + SkuId: pair.SkuId, + OutSkuId: pair.OutSkuId, //utils.GetUUID(), + } + } + if _, err = api.JdAPI.BatchUpdateOutSkuId(skuPairs); err == nil { + // for k, v := range batchItemList { + // pair := v.(*jdapi.SkuIDPair) + // skuPairs[k].OutSkuId = pair.OutSkuId + // } + // _, err = api.JdAPI.BatchUpdateOutSkuId(skuPairs) + } + if err != nil { + globals.SugarLogger.Debug(utils.Format4Output(skuPairs, false)) + } + return nil, err + }, skuPairs) + _, err = task.GetResult(0) + } + return err +} + func jdStatus2jxStatus(jdStatus int) (jxStatus int) { switch jdStatus { case 1: diff --git a/business/partner/purchase/jd/store.go b/business/partner/purchase/jd/store.go index e761c2629..42056ed0e 100644 --- a/business/partner/purchase/jd/store.go +++ b/business/partner/purchase/jd/store.go @@ -26,6 +26,14 @@ type tJdStoreInfo struct { SyncStatus int } +func (p *PurchaseHandler) GetFieldIDName() string { + return model.FieldJdID +} + +func (p *PurchaseHandler) GetFieldSyncStatusName() string { + return model.FieldJdSyncStatus +} + func (p *PurchaseHandler) ReadStore(vendorStoreID string) (*model.Store, error) { result, err := api.JdAPI.GetStoreInfoByStationNo(vendorStoreID) if err == nil { diff --git a/business/partner/purchase/jd/store_sku.go b/business/partner/purchase/jd/store_sku.go index 29d6239e7..c8cf02bdb 100644 --- a/business/partner/purchase/jd/store_sku.go +++ b/business/partner/purchase/jd/store_sku.go @@ -29,7 +29,7 @@ func (p *PurchaseHandler) SyncStoreSkus(db *dao.DaoDB, storeIDs []int, skuIDs [] globals.SugarLogger.Debug(sql, sqlParams) if err = dao.GetRows(db, &storeSkus, sql, sqlParams); err == nil { outStationNo := utils.Int2Str(storeID) - task := tasksch.RunTask("", func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { + task := tasksch.RunTask("", false, nil, 10, 50, userName, func(batchItemList []interface{}, params ...interface{}) (interface{}, error) { var skuPriceInfoList []*jdapi.SkuPriceInfo var skuVendibilityList []*jdapi.StockVendibility var skuStockList []*jdapi.SkuStock @@ -77,7 +77,7 @@ func (p *PurchaseHandler) SyncStoreSkus(db *dao.DaoDB, storeIDs []int, skuIDs [] } } return nil, err - }, nil, 10, 50, userName, storeSkus) + }, storeSkus) if _, err = task.GetResult(0); err == nil { sql := ` UPDATE store_sku_bind diff --git a/controllers/cms_sku.go b/controllers/cms_sku.go index 6e2d0a986..8dcfa5a99 100644 --- a/controllers/cms_sku.go +++ b/controllers/cms_sku.go @@ -299,3 +299,19 @@ func (c *SkuController) SyncSkus() { return retVal, "", err }) } + +// @Title 重新刷新商家ID +// @Description 重新刷新商家ID +// @Param token header string true "认证token" +// @Param nameID query int true "name ID, -1表示所有" +// @Param skuID query int true "sku ID, -1表示所有" +// @Param isForce query bool false "是否强制刷新,缺省为否" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /RefreshSkuIDs [put] +func (c *SkuController) RefreshSkuIDs() { + c.callRefreshSkuIDs(func(params *tSkuRefreshSkuIDsParams) (retVal interface{}, errCode string, err error) { + err = cms.CurVendorSync.RefreshSkuIDs(params.NameID, params.SkuID, false, GetUserNameFromToken(params.Token)) + return retVal, "", err + }) +} diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index e9b5d0e48..b8f01a4d1 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -263,6 +263,14 @@ func init() { MethodParams: param.Make(), Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"], + beego.ControllerComments{ + Method: "RefreshSkuIDs", + Router: `/RefreshSkuIDs`, + AllowHTTPMethods: []string{"put"}, + MethodParams: param.Make(), + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SkuController"], beego.ControllerComments{ Method: "ReorderCategories",