同步错误返回
This commit is contained in:
@@ -3,9 +3,15 @@ package cms
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.rosy.net.cn/baseapi"
|
||||||
|
"git.rosy.net.cn/baseapi/platformapi/dingdingapi"
|
||||||
"git.rosy.net.cn/baseapi/utils"
|
"git.rosy.net.cn/baseapi/utils"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils"
|
"git.rosy.net.cn/jx-callback/business/jxutils"
|
||||||
|
"git.rosy.net.cn/jx-callback/business/jxutils/ddmsg"
|
||||||
|
"git.rosy.net.cn/jx-callback/business/jxutils/excel"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
||||||
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
||||||
"git.rosy.net.cn/jx-callback/business/model"
|
"git.rosy.net.cn/jx-callback/business/model"
|
||||||
@@ -15,6 +21,19 @@ import (
|
|||||||
"git.rosy.net.cn/jx-callback/globals/refutil"
|
"git.rosy.net.cn/jx-callback/globals/refutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type SyncErrResult struct {
|
||||||
|
SkuID int `json:"商品ID"`
|
||||||
|
VendorSkuID string `json:"平台商品ID"`
|
||||||
|
NameID int `json:"商品nameID"`
|
||||||
|
VendorPrice int64 `json:"平台价"`
|
||||||
|
ErrMsg string `json:"错误信息"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyncErrResultLock struct {
|
||||||
|
syncErrResult []SyncErrResult
|
||||||
|
locker sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
type LoopStoreMapInfo struct {
|
type LoopStoreMapInfo struct {
|
||||||
VendorID int
|
VendorID int
|
||||||
StoreMapList []*model.StoreMap
|
StoreMapList []*model.StoreMap
|
||||||
@@ -45,6 +64,14 @@ var (
|
|||||||
var (
|
var (
|
||||||
ErrHaveNotImplementedYet = errors.New("还没有实现")
|
ErrHaveNotImplementedYet = errors.New("还没有实现")
|
||||||
ErrEntityNotExist = errors.New("找不到相应实体")
|
ErrEntityNotExist = errors.New("找不到相应实体")
|
||||||
|
SyncErrResultTitle = []string{
|
||||||
|
"商品ID",
|
||||||
|
"平台商品ID",
|
||||||
|
"商品nameID",
|
||||||
|
"平台价",
|
||||||
|
"错误信息",
|
||||||
|
}
|
||||||
|
syncErrResultLock SyncErrResultLock
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
func (p *MultiStoreHandlerWrapper) DeleteCategory(db *dao.DaoDB, cat *model.SkuCategory, userName string) (err error) {
|
||||||
@@ -540,9 +567,9 @@ func (v *VendorSync) LoopStoresMap2(ctx *jxcontext.Context, db *dao.DaoDB, taskN
|
|||||||
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
|
taskName = fmt.Sprintf("%s,处理平台%s", taskName, model.VendorChineseNames[loopInfoList[0].VendorID])
|
||||||
}
|
}
|
||||||
task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
|
task = tasksch.NewParallelTask(taskName, tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, handler, loopInfoList)
|
||||||
task.SetFinishHook(task)
|
task.SetFinishHook(func(task tasksch.ITask, ctx *jxcontext.Context) {
|
||||||
failedList := task.GetErrMsg()
|
err = WirteToExcelBySyncFailed(task, ctx)
|
||||||
|
})
|
||||||
tasksch.HandleTask(task, nil, isManageIt).Run()
|
tasksch.HandleTask(task, nil, isManageIt).Run()
|
||||||
if !isAsync {
|
if !isAsync {
|
||||||
resultList, err2 := task.GetResult(0)
|
resultList, err2 := task.GetResult(0)
|
||||||
@@ -698,3 +725,55 @@ func GetTimeMixByInt(begin1, end1, begin2, end2 int16) (beginAt, endAt int16) {
|
|||||||
}
|
}
|
||||||
return beginAt, endAt
|
return beginAt, endAt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WirteToExcelBySyncFailed(task tasksch.ITask, ctx *jxcontext.Context) (err error) {
|
||||||
|
var (
|
||||||
|
sheetList1 []*excel.Obj2ExcelSheetConfig
|
||||||
|
downloadURL1, fileName1 string
|
||||||
|
)
|
||||||
|
syncErrResultLock.syncErrResult = syncErrResultLock.syncErrResult[0:0]
|
||||||
|
failedList := task.GetErrMsg()
|
||||||
|
if len(failedList) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, v := range failedList {
|
||||||
|
for _, vv := range v.([]*partner.StoreSkuInfoWithErr) {
|
||||||
|
result := SyncErrResult{
|
||||||
|
SkuID: vv.StoreSkuInfo.SkuID,
|
||||||
|
VendorSkuID: vv.StoreSkuInfo.VendorSkuID,
|
||||||
|
NameID: vv.StoreSkuInfo.NameID,
|
||||||
|
VendorPrice: vv.StoreSkuInfo.VendorPrice,
|
||||||
|
ErrMsg: vv.ErrMsg,
|
||||||
|
}
|
||||||
|
syncErrResultLock.AppendData(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
excelConf1 := &excel.Obj2ExcelSheetConfig{
|
||||||
|
Title: "同步错误",
|
||||||
|
Data: syncErrResultLock.syncErrResult,
|
||||||
|
CaptionList: SyncErrResultTitle,
|
||||||
|
}
|
||||||
|
sheetList1 = append(sheetList1, excelConf1)
|
||||||
|
if excelConf1 != nil {
|
||||||
|
downloadURL1, fileName1, err = jxutils.UploadExeclAndPushMsg(sheetList1, time.Now().Format("2006-01-02")+"同步错误返回")
|
||||||
|
baseapi.SugarLogger.Debug("WriteToExcel: download is [%v]", downloadURL1)
|
||||||
|
} else {
|
||||||
|
baseapi.SugarLogger.Debug("WriteToExcel: dataSuccess is nil!")
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
baseapi.SugarLogger.Errorf("WriteToExcel:upload %s , %s failed error:%v", fileName1, err)
|
||||||
|
} else {
|
||||||
|
if authInfo, err := ctx.GetV2AuthInfo(); err == nil {
|
||||||
|
noticeMsg := fmt.Sprintf("[详情点我]path1=%s\n", downloadURL1)
|
||||||
|
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "同步错误返回", noticeMsg)
|
||||||
|
baseapi.SugarLogger.Debugf("WriteToExcel:upload %s success, downloadURL1:%s", fileName1, downloadURL1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *SyncErrResultLock) AppendData(syncErrResult SyncErrResult) {
|
||||||
|
d.locker.Lock()
|
||||||
|
defer d.locker.Unlock()
|
||||||
|
d.syncErrResult = append(d.syncErrResult, syncErrResult)
|
||||||
|
}
|
||||||
|
|||||||
@@ -65,8 +65,7 @@ type ITask interface {
|
|||||||
AddBatchErr(err error)
|
AddBatchErr(err error)
|
||||||
AddErrMsg(failedList ...interface{})
|
AddErrMsg(failedList ...interface{})
|
||||||
GetErrMsg() (failedList []interface{})
|
GetErrMsg() (failedList []interface{})
|
||||||
SetFinishHook(task *ParallelTask)
|
SetFinishHook(func(task ITask, ctx *jxcontext.Context))
|
||||||
GetFinishHook() *ParallelTask
|
|
||||||
json.Marshaler
|
json.Marshaler
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,7 +131,7 @@ type BaseTask struct {
|
|||||||
ctx *jxcontext.Context
|
ctx *jxcontext.Context
|
||||||
isGetResultCalled bool
|
isGetResultCalled bool
|
||||||
FailedList []interface{}
|
FailedList []interface{}
|
||||||
finishHook *ParallelTask
|
finishHook func(task ITask, ctx *jxcontext.Context)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s TaskList) Len() int {
|
func (s TaskList) Len() int {
|
||||||
@@ -174,6 +173,9 @@ func (t *BaseTask) GetID() string {
|
|||||||
return t.ID
|
return t.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *BaseTask) Run() {
|
||||||
|
}
|
||||||
|
|
||||||
// 此函数成功返回结果后,结果在任务中会被删除(以免被管理的任务不必要的HOLD住对象)
|
// 此函数成功返回结果后,结果在任务中会被删除(以免被管理的任务不必要的HOLD住对象)
|
||||||
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
|
func (t *BaseTask) GetResult(duration time.Duration) (retVal []interface{}, err error) {
|
||||||
if t.GetStatus() >= TaskStatusEndBegin {
|
if t.GetStatus() >= TaskStatusEndBegin {
|
||||||
@@ -360,26 +362,19 @@ func (t *BaseTask) Error() (errMsg string) {
|
|||||||
return errMsg
|
return errMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTask) SetFinishHook(task *ParallelTask) {
|
func (t *BaseTask) SetFinishHook(hook func(task ITask, ctx *jxcontext.Context)) {
|
||||||
t.locker.RLock()
|
t.locker.RLock()
|
||||||
defer t.locker.RUnlock()
|
defer t.locker.RUnlock()
|
||||||
t.finishHook = task
|
t.finishHook = hook
|
||||||
}
|
|
||||||
|
|
||||||
func (t *BaseTask) GetFinishHook() *ParallelTask {
|
|
||||||
return t.finishHook
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BaseTask) GetErrMsg() (failedList []interface{}) {
|
func (t *BaseTask) GetErrMsg() (failedList []interface{}) {
|
||||||
t.locker.RLock()
|
t.locker.RLock()
|
||||||
defer t.locker.RUnlock()
|
failedList = append(failedList, t.FailedList...)
|
||||||
if len(t.FailedList) == 0 {
|
t.locker.RUnlock()
|
||||||
return nil
|
|
||||||
}
|
for _, v := range t.Children {
|
||||||
if t.parent != nil {
|
failedList = append(failedList, v.GetErrMsg()...)
|
||||||
for _, v := range t.FailedList {
|
|
||||||
failedList = append(failedList, v)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return failedList
|
return failedList
|
||||||
}
|
}
|
||||||
@@ -387,9 +382,7 @@ func (t *BaseTask) GetErrMsg() (failedList []interface{}) {
|
|||||||
func (t *BaseTask) AddErrMsg(failedList ...interface{}) {
|
func (t *BaseTask) AddErrMsg(failedList ...interface{}) {
|
||||||
t.locker.Lock()
|
t.locker.Lock()
|
||||||
defer t.locker.Unlock()
|
defer t.locker.Unlock()
|
||||||
for _, v := range failedList {
|
t.FailedList = append(t.FailedList, failedList...)
|
||||||
t.FailedList = append(t.FailedList, v)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (t *BaseTask) GetDetailErrList() []error {
|
// func (t *BaseTask) GetDetailErrList() []error {
|
||||||
@@ -427,8 +420,7 @@ func (t *BaseTask) run(taskHandler func()) {
|
|||||||
utils.CallFuncAsync(func() {
|
utils.CallFuncAsync(func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
// globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, utils.Format4Output(t, false), r)
|
globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, utils.Format4Output(t, false), r)
|
||||||
globals.SugarLogger.Errorf("panic in BaseTask.run task:%s, task detail:%s, r:%v", t.Name, "", r)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -462,12 +454,9 @@ func (t *BaseTask) run(taskHandler func()) {
|
|||||||
globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err)
|
globals.SugarLogger.Infof("BaseTask run, failed with error:%v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(t.finishChan)
|
close(t.finishChan)
|
||||||
time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值
|
time.Sleep(10 * time.Millisecond) // 等待GetResult中的isGetResultCalled赋值
|
||||||
globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled)
|
globals.SugarLogger.Debugf("BaseTask task ID:%s, name:%s finished, isGetResultCalled:%t", t.ID, t.Name, t.isGetResultCalled)
|
||||||
p := t.GetFinishHook()
|
|
||||||
if p != nil {
|
|
||||||
if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 {
|
if !t.isGetResultCalled && t.parent == nil && len(GetTasks(t.ID, TaskStatusBegin, TaskStatusEnd, 24, "")) > 0 {
|
||||||
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
|
if authInfo, err := t.ctx.GetV2AuthInfo(); err == nil { // 这里应该是不管登录类型,直接以可能的方式发消息
|
||||||
var content string
|
var content string
|
||||||
@@ -484,8 +473,8 @@ func (t *BaseTask) run(taskHandler func()) {
|
|||||||
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
|
ddmsg.SendUserMessage(dingdingapi.MsgTyeText, authInfo.UserID, "异步任务完成", content)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
if t.finishHook != nil {
|
||||||
|
t.finishHook(t,t.ctx)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user