package tempop import ( "fmt" "math" "regexp" "strings" "sync" "time" "git.rosy.net.cn/jx-callback/business/auth2/authprovider/weixin" "git.rosy.net.cn/jx-callback/business/partner/delivery" "git.rosy.net.cn/baseapi/platformapi/jdapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxcallback/orderman" "git.rosy.net.cn/jx-callback/business/jxstore/cms" "git.rosy.net.cn/jx-callback/business/jxutils" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/model/legacymodel" "git.rosy.net.cn/jx-callback/business/model/legacymodel2" "git.rosy.net.cn/jx-callback/business/partner" "git.rosy.net.cn/jx-callback/business/partner/purchase/ebai" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals/api" ) var innerDataPat *regexp.Regexp func init() { innerDataPat = regexp.MustCompile(`"result":(.*),"code":200`) } func Convert2JDSPU(ctx *jxcontext.Context, count int, isAsync, isContinueWhenError bool) (hint string, err error) { sql := ` SELECT t1.* FROM sku_name t1 LEFT JOIN sku_name t2 ON t2.link_id = t1.id AND t2.deleted_at = ? WHERE t1.deleted_at = ? AND t1.status <> ? AND t1.is_spu = 0/* AND t1.unit = '份'*/ AND t2.id IS NULL ORDER BY t1.id ` if count > 0 { sql += " LIMIT " + utils.Int2Str(count) } sqlParams := []interface{}{ utils.DefaultTimeValue, utils.DefaultTimeValue, model.SkuStatusDeleted, } db := dao.GetDB() var skuNameList []*model.SkuName if err = dao.GetRows(db, &skuNameList, sql, sqlParams...); err != nil { return "", err } for _, skuName := range skuNameList { sql = ` SELECT * FROM sku WHERE name_id = ? AND deleted_at = ? AND status = ?; ` sqlParams := []interface{}{ skuName.ID, utils.DefaultTimeValue, model.SkuStatusNormal, } var skuList []*model.Sku if err = dao.GetRows(db, &skuList, sql, sqlParams...); err != nil { return "", err } sql = ` SELECT t1.* FROM sku_name_place_bind t1 WHERE t1.name_id = ? ` sqlParams = []interface{}{ skuName.ID, } var skuNamePlaceBindList []*model.SkuNamePlaceBind if err = dao.GetRows(db, &skuNamePlaceBindList, sql, sqlParams...); err != nil { return "", err } globals.SugarLogger.Debugf("Convert2JDSPU, skuName:%s, skuCount:%d", skuName.Name, len(skuList)) dao.Begin(db) skuNameNew2 := *skuName skuNameNew := &skuNameNew2 dao.WrapAddIDCULEntity(skuNameNew, ctx.GetUserName()) skuNameNew.JdID = 0 skuNameNew.LinkID = skuName.ID skuNameNew.IsSpu = 1 skuNameNew.JdSyncStatus = model.SyncFlagNewMask // skuNameNew.Status = model.SkuStatusDontSale if err = dao.CreateEntity(db, skuNameNew); err != nil { dao.Rollback(db) return "", err } if len(skuList) > 0 { for _, sku := range skuList { skuNew2 := *sku skuNew := &skuNew2 dao.WrapAddIDCULEntity(skuNew, ctx.GetUserName()) skuNew.JdID = 0 skuNew.LinkID = sku.ID skuNew.NameID = skuNameNew.ID skuNew.JdSyncStatus = model.SyncFlagNewMask if skuNameNew.Status == model.SkuStatusDontSale { skuNew.Status = model.SkuStatusDontSale } globals.SugarLogger.Debugf("Convert2JDSPU, sku:%s", utils.Format4Output(skuNew, false)) if err = dao.CreateEntity(db, skuNew); err != nil { dao.Rollback(db) return "", err } } for _, placeBind := range skuNamePlaceBindList { dao.WrapAddIDCULEntity(placeBind, ctx.GetUserName()) placeBind.NameID = skuNameNew.ID globals.SugarLogger.Debugf("Convert2JDSPU, placeBind:%s", utils.Format4Output(placeBind, false)) if err = dao.CreateEntity(db, placeBind); err != nil { dao.Rollback(db) return "", err } } } dao.Commit(db) } sql = ` SELECT DISTINCT t1.* FROM sku_name t1 JOIN sku t2 ON t1.id = t2.name_id AND t2.jd_sync_status <> 0 AND t2.deleted_at = ? WHERE t1.link_id > 0; ` skuNameList = []*model.SkuName{} if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue); err != nil { return "", err } rootTask := tasksch.NewParallelTask("Convert2JDSPU", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { skuName := batchItemList[0].(*model.SkuName) _, err = cms.CurVendorSync.SyncSku(ctx, db, skuName.ID, -1, false, isContinueWhenError, ctx.GetUserName()) return nil, err }, skuNameList) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func Change2JDSPU4Store(ctx *jxcontext.Context, storeIDs []int, step int, isAsync, isContinueWhenError bool) (hint string, err error) { db := dao.GetDB() if len(storeIDs) == 0 { if err = dao.GetRows(db, &storeIDs, ` SELECT t1.id FROM store t1 JOIN store_map t2 ON t2.store_id = t1.id AND t2.vendor_id = 0 AND t2.deleted_at = ? AND t2.status <> ? WHERE t1.deleted_at = ? AND t1.status <> ? /* AND t1.city_code IN (110100, 120100, 440100, 440300, 510100)*/ `, utils.DefaultTimeValue, model.StoreStatusDisabled, utils.DefaultTimeValue, model.StoreStatusDisabled); err != nil { return "", err } } var sql string var sqlParams []interface{} dao.Begin(db) defer dao.Rollback(db) if step == 1 { sql = ` DELETE t1 FROM store_sku_bind t1 JOIN sku t2 ON t2.id = t1.sku_id AND t2.link_id > 0 WHERE 1 = 1 ` sqlParams = []interface{}{} if len(storeIDs) > 0 { sql += " AND store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")" sqlParams = append(sqlParams, storeIDs) } if _, err = dao.ExecuteSQL(db, sql, sqlParams...); err != nil { return "", err } sql = ` INSERT INTO store_sku_bind(created_at, updated_at, last_operator, deleted_at, store_id, sku_id, price, unit_price, status, ebai_id, mtwm_id, jd_sync_status, ebai_sync_status, mtwm_sync_status) SELECT NOW(), NOW(), ?, ?, t1.store_id, t2.id, t1.price, t1.unit_price, t1.status , 0, 0, ?, ?, ? FROM store_sku_bind t1 JOIN sku t2 ON t2.link_id = t1.sku_id AND t2.deleted_at = ? JOIN store t3 ON t3.id = t1.store_id JOIN sku_name t4 ON t4.id = t2.name_id LEFT JOIN sku_name_place_bind t5 ON t5.place_code = t3.city_code AND t5.name_id = t4.id WHERE t1.deleted_at = ? AND (t4.is_global = 1 OR t5.id IS NOT NULL) AND t1.price > 0 ` sqlParams = []interface{}{ ctx.GetUserName(), utils.DefaultTimeValue, // model.SkuStatusDontSale, model.SyncFlagNewMask, 0, //model.SyncFlagNewMask, 0, //model.SyncFlagNewMask, utils.DefaultTimeValue, utils.DefaultTimeValue, } } else if step == 2 { sql = ` SELECT COUNT(*) ct FROM store_sku_bind t1 JOIN sku t2 ON t2.id = t1.sku_id AND t2.link_id > 0 WHERE 1 = 1 ` sqlParams = []interface{}{} if len(storeIDs) > 0 { sql += " AND store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")" sqlParams = append(sqlParams, storeIDs) } ct := 0 if err = dao.GetRow(db, &ct, sql, sqlParams...); err != nil { return "", err } if ct == 0 { return "", fmt.Errorf("%s看起来还没有执行《将转化的SPU在门店上架》", utils.Format4Output(storeIDs, true)) } sql = ` UPDATE store_sku_bind t1 JOIN sku t2 ON t2.link_id = t1.sku_id SET t1.status = 0, t1.jd_sync_status = ? WHERE t1.deleted_at = ? ` sqlParams = []interface{}{ model.SyncFlagSaleMask | model.SyncFlagModifiedMask, utils.DefaultTimeValue, } } else { return "", fmt.Errorf("非法的step") } if len(storeIDs) > 0 { sql += " AND t1.store_id IN (" + dao.GenQuestionMarks(len(storeIDs)) + ")" sqlParams = append(sqlParams, storeIDs) } globals.SugarLogger.Debug(sql) globals.SugarLogger.Debug(utils.Format4Output(sqlParams, false)) var num int64 if num, err = dao.ExecuteSQL(db, sql, sqlParams...); err != nil { return "", err } globals.SugarLogger.Debug(num) dao.Commit(db) var skuIDs []int if step == 1 { sql = ` SELECT id FROM sku t1 WHERE t1.link_id > 0 AND t1.deleted_at = ? ` sqlParams = []interface{}{ utils.DefaultTimeValue, } } else if step == 2 { sql = ` SELECT t1.link_id FROM sku t1 WHERE t1.link_id > 0 AND t1.deleted_at = ? ` sqlParams = []interface{}{ utils.DefaultTimeValue, } } if err = dao.GetRows(db, &skuIDs, sql, sqlParams...); err != nil { return "", err } hint, err = cms.CurVendorSync.SyncStoresSkus(ctx, db, []int{model.VendorIDJD}, storeIDs, skuIDs, false, isAsync, isContinueWhenError) return hint, err } func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { sqlBatchCount := 1000 rootTask := tasksch.NewSeqTask("TransferLegacyJdOrder", ctx, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { sql := ` SELECT t1.* FROM jdorder t1 LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.vendor_order_id WHERE t2.id IS NULL AND LENGTH(t1.data) > 10 AND t1.cityname = 'all' ORDER BY t1.orderstatustime LIMIT ? ` db := dao.GetDB() var jdOrderList []*legacymodel2.Jdorder if err = dao.GetRows(db, &jdOrderList, sql, sqlBatchCount); err != nil { return nil, err } if len(jdOrderList) > 0 { task := tasksch.NewParallelTask("TransferLegacyJdOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { var orderDetailList []*model.GoodsOrderOriginal for _, v := range batchItemList { jdOrder := v.(*legacymodel2.Jdorder) var detail map[string]interface{} if err = utils.UnmarshalUseNumber([]byte(strings.Replace(strings.Replace(jdOrder.Data, "\n", "", -1), "\r", "", -1)), &detail); err != nil { return nil, err } resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{}) if len(resultList) > 0 { originalData := resultList[0].(map[string]interface{}) orgCode := originalData["orgCode"].(string) if orgCode == "320406" { orderPurchaseTime := utils.Interface2String(originalData["orderPurchaseTime"]) if orderPurchaseTime == "" { globals.SugarLogger.Debugf("TransferLegacyJdOrder abnormal order:%s", jdOrder.VendorOrderID) orderPurchaseTime = utils.Interface2String(originalData["orderStartTime"]) } if orderPurchaseTime != "" { orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: jdOrder.VendorOrderID, VendorID: model.VendorIDJD, AccountNo: orgCode, OrderCreatedAt: utils.Str2Time(orderPurchaseTime), OriginalData: string(utils.MustMarshal(originalData)), } orderDetailList = append(orderDetailList, orderDetail) } else { globals.SugarLogger.Debugf("TransferLegacyJdOrder abnormal2 order:%s", jdOrder.VendorOrderID) } } } } if len(orderDetailList) > 0 { err = dao.CreateMultiEntities(db, orderDetailList) } return nil, err }, jdOrderList) // rootTask.AddChild(task).Run() task.Run() _, err = task.GetResult(0) } else { rootTask.Cancel() } return nil, err }, math.MaxInt32) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { // sqlBatchCount := 1000 // rootTask := tasksch.NewSeqTask("TransferLegacyElmOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { // sql := ` // SELECT t1.* // FROM elemeorder t1 // LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.orderid // WHERE t2.id IS NULL AND LENGTH(t1.data) > 10 // ORDER BY t1.order_created_at // LIMIT ? // ` // db := dao.GetDB() // var elmOrderList []*legacymodel2.Elemeorder // if err = dao.GetRows(db, &elmOrderList, sql, sqlBatchCount); err != nil { // return "", err // } // if len(elmOrderList) > 0 { // task := tasksch.NewParallelTask("TransferLegacyElmOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { // var orderDetailList []*model.GoodsOrderOriginal // for _, v := range batchItemList { // elmOrder := v.(*legacymodel2.Elemeorder) // if len(elmOrder.Data) > 10 { // var detail map[string]interface{} // if err = utils.UnmarshalUseNumber([]byte(strings.Replace(strings.Replace(elmOrder.Data, "\n", "", -1), "\r", "", -1)), &detail); err != nil { // return nil, err // } // orderDetail := &model.GoodsOrderOriginal{ // VendorOrderID: elmOrder.Orderid, // VendorID: model.VendorIDELM, // AccountNo: "fakeelm", // OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)), // OriginalData: elmOrder.Data, // } // orderDetailList = append(orderDetailList, orderDetail) // } // } // if len(orderDetailList) > 0 { // err = dao.CreateMultiEntities(db, orderDetailList) // } // return nil, err // }, elmOrderList) // // rootTask.AddChild(task).Run() // task.Run() // _, err = task.GetResult(0) // } else { // rootTask.Cancel() // } // return nil, err // }, math.MaxInt32) // tasksch.ManageTask(rootTask).Run() // if !isAsync { // _, err = rootTask.GetResult(0) // } else { // hint = rootTask.ID // } return hint, err } func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}, jdStoreOrderList []map[string]interface{}) (err error) { var storeOrderList []*model.TempGoodsOrderMobile for _, v := range jdStoreOrderList { order := &model.TempGoodsOrderMobile{ VendorOrderID: v["orderId"].(string), VendorID: model.VendorIDJD, AccountNo: v["venderId"].(string), OrderCreatedAt: utils.Timestamp2Time(utils.MustInterface2Int64(v["orderPaidTime"]) / 1000), Mobile: v["mobile"].(string), } storeOrderList = append(storeOrderList, order) } var orderDetailList []*model.GoodsOrderOriginal for _, v := range jdOrderList { orderMap := v.(map[string]interface{}) orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"])) if existJdIDMap[orderID] == 0 { orgCode := orderMap["orgCode"].(string) orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: orderID, VendorID: model.VendorIDJD, AccountNo: orgCode, OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)), OriginalData: string(utils.MustMarshal(orderMap)), } orderDetailList = append(orderDetailList, orderDetail) } } if len(orderDetailList) > 0 || len(storeOrderList) > 0 { db := dao.GetDB() if len(orderDetailList) > 0 { // err = dao.CreateMultiEntities(db, orderDetailList) for _, v := range orderDetailList { dao.CreateEntity(db, v) } } if len(storeOrderList) > 0 { // err = dao.CreateMultiEntities(db, storeOrderList) for _, v := range storeOrderList { dao.CreateEntity(db, v) } } } return err } func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { var existJdIDs []string db := dao.GetDB() if err = dao.GetRows(db, &existJdIDs, ` SELECT vendor_order_id FROM goods_order_original WHERE vendor_id = ?`, model.VendorIDJD); err != nil { return "", err } existJdIDMap := make(map[string]int) for _, v := range existJdIDs { existJdIDMap[v] = 1 } pageSize := 100 hourGap := 1 gapCount := int((toTime.Sub(fromTime)-time.Hour*time.Duration(hourGap))/(time.Hour*time.Duration(hourGap)) + 1) if gapCount <= 0 { gapCount = 1 } gapList := make([]int, gapCount) for k := range gapList { gapList[k] = k } rootTask := tasksch.NewParallelTask("PullJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(20), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { gapIndex := batchItemList[0].(int) hourIndex := gapIndex * hourGap subFromTime := fromTime.Add(time.Duration(hourIndex) * time.Hour) subToTime := fromTime.Add(time.Duration(hourIndex+hourGap) * time.Hour) if subToTime.Sub(toTime) > 0 { subToTime = toTime } if true { //gapIndex < gapCount-1 { subToTime = subToTime.Add(-1 * time.Second) // 减一秒 } commonParams := map[string]interface{}{ jdapi.KeyPageSize: pageSize, "orderPurchaseTime_begin": utils.Time2Str(subFromTime), "orderPurchaseTime_end": utils.Time2Str(subToTime), } // globals.SugarLogger.Debugf("PullJdOrder, commonParams=%s", utils.Format4Output(commonParams, false)) orderList, totalCount, err := api.JdAPI.OrderQuery(commonParams) if err != nil { return "", err } storeOrderList, err := api.JdAPI.GetStoreOrderInfoList(utils.Time2Str(subFromTime), utils.Time2Str(subToTime)) if err != nil { } if err = saveJdOrderList(existJdIDMap, orderList, storeOrderList); err != nil { return "", err } if false { globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, totalCount:%d, len(orderList):%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), totalCount, len(orderList)) } return nil, err }, gapList) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func UpdateJdOrderRealMobile(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { return hint, err } func DeleteWrongSpu(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { // sql := ` // SELECT t1.* // FROM sku_name t1 // JOIN sku_name t2 ON t2.link_id = t1.id AND t2.deleted_at = ? // WHERE t1.deleted_at = ?; // ` // db := dao.GetDB() // var skuNameList []*model.SkuName // if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue, utils.DefaultTimeValue); err != nil { // return "", err // } // rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { // _, err = cms.DeleteSkuName(ctx, skuNameList[step].ID, ctx.GetUserName()) // if err != nil { // globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err) // err = nil // 强制忽略错误 // } // return nil, err // }, len(skuNameList)) sql := ` SELECT t1.* FROM sku_name t1 WHERE t1.deleted_at = ? AND t1.is_spu = 1 AND t1.jd_id > 0 ORDER BY t1.id ` db := dao.GetDB() var skuNameList []*model.SkuName if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue); err != nil { return "", err } rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { mapData := map[string]interface{}{ "name": skuNameList[step].Name, } _, err = cms.UpdateSkuName(ctx, skuNameList[step].ID, mapData, ctx.GetUserName()) if err != nil { globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err) err = nil // 强制忽略错误 } return nil, err }, len(skuNameList)) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } type GoodsOrderOriginalEx struct { model.GoodsOrderOriginal OrderStatus int } func CreateOrderFromOriginal(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { sqlBatchCount := 5000 rootTask := tasksch.NewSeqTask("CreateOrderFromOriginal", ctx, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { sql := ` SELECT t1.*, t3.order_status FROM goods_order_original t1 LEFT JOIN goods_order t2 ON t2.vendor_order_id = t1.vendor_order_id LEFT JOIN jxorder t3 ON t3.order_id = t1.vendor_order_id WHERE t2.id IS NULL LIMIT ?; ` db := dao.GetDB() var orderList []*GoodsOrderOriginalEx if err = dao.GetRows(db, &orderList, sql, sqlBatchCount); err != nil { return nil, err } if len(orderList) > 0 { task := tasksch.NewParallelTask("CreateOrderFromOriginal", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(1), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { orderOriginal := batchItemList[0].(*GoodsOrderOriginalEx) globals.SugarLogger.Debugf("CreateOrderFromOriginal processing orderID:%s", orderOriginal.VendorOrderID) if handler := partner.GetPurchasePlatformFromVendorID(orderOriginal.VendorID); handler != nil { var detail map[string]interface{} if err = utils.UnmarshalUseNumber([]byte(strings.Replace(strings.Replace(orderOriginal.OriginalData, "\n", "", -1), "\r", "", -1)), &detail); err != nil { globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err) return nil, err } order := handler.Map2Order(detail) if order.Status < model.OrderStatusEndBegin { if orderOriginal.OrderStatus == 3 { order.Status = model.OrderStatusFinished } else if orderOriginal.OrderStatus == 7 { order.Status = model.OrderStatusCanceled } else { order2, err2 := handler.GetOrder(order.VendorOrderID) if err = err2; err == nil { order.Status = order2.Status } else { err = nil // ignore get status error } } } if err == nil { if _, err = orderman.FixedOrderManager.SaveOrder(order, false, dao.GetDB()); err != nil { globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err) } } else { globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s, error:%v", orderOriginal.VendorOrderID, err) } } else { globals.SugarLogger.Debugf("CreateOrderFromOriginal abnormal orderID:%s", orderOriginal.VendorOrderID) } return nil, err }, orderList) // rootTask.AddChild(task).Run() task.Run() _, err = task.GetResult(0) } else { rootTask.Cancel() } return nil, err }, math.MaxInt32) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func TransformJdSpu2Sku(ctx *jxcontext.Context, skuNameIDs []int, count int, isAsync, isContinueWhenError bool) (hint string, err error) { sql := ` SELECT t1.* FROM sku_name t1 WHERE t1.deleted_at = ? AND t1.status <> ? AND t1.is_spu = 1 AND jd_id <> 0 ` sqlParams := []interface{}{ utils.DefaultTimeValue, model.SkuStatusDeleted, } if len(skuNameIDs) > 0 { sql += " AND t1.id IN (" + dao.GenQuestionMarks(len(skuNameIDs)) + ")" sqlParams = append(sqlParams, skuNameIDs) } sql += " ORDER BY t1.id" if count > 0 { sql += " LIMIT ?" sqlParams = append(sqlParams, count) } db := dao.GetDB() var skuNameList []*model.SkuName if err = dao.GetRows(db, &skuNameList, sql, sqlParams...); err != nil { return "", err } if len(skuNameList) == 0 { return "", fmt.Errorf("待转换的skuName为空") } batchSize := 40 rootTask := tasksch.NewSeqTask("TransformJdSpu2Sku", ctx, func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { var ( locker sync.Mutex skuIDs []int ) lastIndex := (step + 1) * batchSize if lastIndex > len(skuNameList) { lastIndex = len(skuNameList) } batchSkNameList := skuNameList[step*batchSize : lastIndex] subTask := tasksch.NewParallelTask(fmt.Sprintf("TransformJdSpu2Sku:%d", step), tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(subTask *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { skuName := batchItemList[0].(*model.SkuName) if !jxutils.IsEmptyID(skuName.JdID) { sql = ` SELECT * FROM sku WHERE name_id = ? AND deleted_at = ? AND status <> ?; ` sqlParams := []interface{}{ skuName.ID, utils.DefaultTimeValue, model.SkuStatusDeleted, } var skuList []*model.Sku if err = dao.GetRows(db, &skuList, sql, sqlParams...); err != nil { return "", err } globals.SugarLogger.Debugf("TransformJdSpu2Sku skuList:%s", utils.Format4Output(skuList, false)) if len(skuList) > 0 { for _, sku := range skuList { locker.Lock() skuIDs = append(skuIDs, sku.ID) locker.Unlock() if !jxutils.IsEmptyID(sku.JdID) { if globals.EnableJdStoreWrite { if err = api.JdAPI.UpdateSkuBaseInfo(utils.Int2Str(skuName.ID), utils.Int2Str(sku.ID), utils.Params2Map(jdapi.KeyFixedStatus, jdapi.SkuFixedStatusDeleted)); err != nil { if errExt, ok := err.(*utils.ErrorWithCode); ok && errExt.IntCode() == 11004 { err = nil } else { break } } } } } } if err == nil && globals.EnableJdStoreWrite { if err = api.JdAPI.UpdateSpu(utils.Int2Str(skuName.ID), utils.Params2Map(jdapi.KeyFixedStatus, jdapi.SkuFixedStatusOffline)); err == nil { err = api.JdAPI.UpdateSpu(utils.Int2Str(skuName.ID), utils.Params2Map(jdapi.KeyFixedStatus, jdapi.SkuFixedStatusDeleted)) } else if errExt, ok := err.(*utils.ErrorWithCode); ok && errExt.IntCode() == 11035 { err = nil } } if err == nil { skuName.IsSpu = 0 skuName.JdID = 0 if _, err = dao.UpdateEntity(db, skuName, "IsSpu", "JdID"); err == nil { sql := ` UPDATE sku t1 SET t1.jd_sync_status = ?, t1.jd_id = 0 WHERE t1.name_id = ? AND t1.deleted_at = ? AND t1.status <> ? ` sqlParams := []interface{}{ model.SyncFlagNewMask, skuName.ID, utils.DefaultTimeValue, model.SkuStatusDeleted, } if _, err = dao.ExecuteSQL(db, sql, sqlParams...); err == nil { _, err = cms.CurVendorSync.SyncSku(ctx, db, skuName.ID, -1, false, isContinueWhenError, ctx.GetUserName()) } } } } else { globals.SugarLogger.Debugf("TransformJdSpu2Sku skuName:%d is fake", skuName.ID) } return nil, err }, batchSkNameList) rootTask.AddChild(subTask).Run() if _, err = subTask.GetResult(0); err == nil { if len(skuIDs) > 0 { if _, err = dao.SetStoreSkuSyncStatus(db, model.VendorIDJD, nil, skuIDs, model.SyncFlagStoreSkuModifiedMask); err == nil { // time.Sleep(20 * time.Second) // _, err = cms.CurVendorSync.SyncStoresSkus(ctx, db, []int{model.VendorIDJD}, nil, skuIDs, false, isContinueWhenError) } } } return nil, err }, (len(skuNameList)-1)/batchSize+1) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func ReProcessJdBadComment(ctx *jxcontext.Context, isForce, isAsync, isContinueWhenError bool) (hint string, err error) { sql := ` SELECT * FROM jx_bad_comments ` if !isForce { sql += " WHERE (createtime IS NULL OR createtime = '') OR (updatetime IS NULL OR updatetime = '')" } // sql += " LIMIT 1" db := dao.GetDB() var commentList []*legacymodel.JxBadComments if err = dao.GetRows(db, &commentList, sql); err == nil { if len(commentList) > 0 { rootTask := tasksch.NewParallelTask("ReProcessJdBadComment", nil, ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { badComment := batchItemList[0].(*legacymodel.JxBadComments) comment1, _ := unmarshalCommentText(badComment.Msg) comment2, _ := unmarshalCommentText(badComment.UpdatedMsg) if len(comment1) > 0 { badComment.Createtime = utils.Timestamp2Str(utils.MustInterface2Int64(comment1["createTime"].(map[string]interface{})["time"]) / 1000) badComment.Msg = string(utils.MustMarshal(comment1)) if len(comment2) > 0 { badComment.Updatetime = utils.Timestamp2Str(utils.MustInterface2Int64(comment2["createTime"].(map[string]interface{})["time"]) / 1000) badComment.UpdatedMsg = string(utils.MustMarshal(comment2)) } _, err = dao.UpdateEntity(db, badComment) } return nil, err }, commentList) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } } } return hint, err } func unmarshalCommentText(commentStr string) (retVal map[string]interface{}, isNeedUpdate bool) { var err error for { var retVal map[string]interface{} // 必须要用局部变量 if commentStr == "" { return nil, false } if err = jxutils.Strings2Objs(commentStr, &retVal); err == nil { if retVal["data"] != nil { commentStr = retVal["data"].(string) } else if retVal["result"] != nil { return retVal["result"].(map[string]interface{}), true } else { return retVal, false } } else { strList := innerDataPat.FindStringSubmatch(commentStr) if strList[1] != "" { commentStr = strList[1] } else { return nil, false } } } } func RefreshEbaiBadComment(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { if utils.IsTimeZero(fromTime) { fromTime = utils.Str2Time("2018-05-03 00:00:00") } if utils.IsTimeZero(toTime) { toTime = time.Now().Add(-3 * time.Hour) } days := int(toTime.Sub(fromTime)/(24*time.Hour) + 1) globals.SugarLogger.Debugf("RefreshEbaiBadComment fromTime:%s, toTime:%s, days:%d", utils.Time2Str(fromTime), utils.Time2Str(toTime), days) rootTask := tasksch.NewSeqTask("RefreshEbaiBadComment", ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { batchFromTime := fromTime.Add(time.Duration(step) * 24 * time.Hour) batchToTime := batchFromTime.Add(24*time.Hour - time.Second) if batchToTime.Sub(toTime) > 0 { batchToTime = toTime } err = ebai.CurPurchaseHandler.RefreshComment(batchFromTime, batchToTime) return nil, err }, days) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func PrintMsg(ctx *jxcontext.Context, vendorID int, id1, id2, msgTitle, msgContent string) (printerStatus *partner.PrinterStatus, err error) { handler := partner.GetPrinterPlatformFromVendorID(vendorID) if handler == nil { return nil, fmt.Errorf("打印机厂商:%d当前不被支持,请检查vendorID", vendorID) } return handler.PrintMsg(ctx, id1, id2, msgTitle, msgContent) } func UpdateAllWeiXinRemark(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { var mobileList []string sql := ` SELECT tel FROM weixins WHERE openid <> '' AND tel <> ''` sqlParams := []interface{}{} if globals.EnableWXAuth2 { sql = ` SELECT t1.mobile FROM user t1 JOIN auth_bind t2 ON t2.user_id = t1.user_id AND t2.deleted_at = ? and t2.type = ? WHERE t1.deleted_at = ? AND t1.type & ? <> 0` sqlParams = []interface{}{ utils.DefaultTimeValue, weixin.AuthTypeMP, utils.DefaultTimeValue, model.UserTypeStoreBoss, } } if err = dao.GetRows(dao.GetDB(), &mobileList, sql, sqlParams...); err == nil { rootTask := tasksch.NewParallelTask("刷新微信备注", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { tel := batchItemList[0].(string) err = jxutils.HandleUserWXRemark(dao.GetDB(), tel) return nil, err }, mobileList) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } } return hint, err } // 从饿百得到执照信息 func RetrieveEbaiShopLicence(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { var ebaiStoreList []*model.StoreMap db := dao.GetDB() if err = dao.GetRows(db, &ebaiStoreList, ` SELECT * FROM store_map WHERE vendor_id = ? AND deleted_at = ? `, model.VendorIDEBAI, utils.DefaultTimeValue); err == nil { globals.SugarLogger.Debugf("RetrieveEbaiShopLicence, count:%d", len(ebaiStoreList)) rootTask := tasksch.NewParallelTask("XXXX", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { ebaiStore := batchItemList[0].(*model.StoreMap) xxList, err := api.EbaiAPI.ShopAptitudeGet("", utils.Str2Int64(ebaiStore.VendorStoreID)) if err == nil && ebaiStore != nil { if len(xxList) > 1 { ebaiStore2, err2 := api.EbaiAPI.ShopGet("", utils.Str2Int64(ebaiStore.VendorStoreID)) if err = err2; err == nil { shop := &legacymodel.EbaiShopLicence{ ShopName: utils.Interface2String(ebaiStore2["name"]), Licence: utils.Interface2String(xxList[1]["license_number"]), Address: utils.Interface2String(ebaiStore2["address"]), Owner: utils.Interface2String(xxList[1]["legal_representative_name"]), Tel: utils.Interface2String(ebaiStore2["service_phone"]), LicenceName: utils.Interface2String(xxList[1]["license_name"]), } err = dao.CreateEntity(db, shop) } } } return nil, err }, ebaiStoreList) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } } return hint, err } func RefreshMtpsWaybillFee(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { var waybillList []*model.Waybill db := dao.GetDB() if err = dao.GetRows(db, &waybillList, ` SELECT * FROM waybill WHERE status_time > '2019-04-01' AND waybill_vendor_id = 102 AND desired_fee = 0 `); err == nil { globals.SugarLogger.Debugf("RefreshMtpsWaybillFee, count:%d", len(waybillList)) rootTask := tasksch.NewParallelTask("RefreshMtpsWaybillFee", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { bill := batchItemList[0].(*model.Waybill) bill.DesiredFee, _ = delivery.CalculateBillDeliveryFee(bill) _, err = dao.UpdateEntity(db, bill, "DesiredFee") return nil, err }, waybillList) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } } return hint, err }