package initdata import ( "math" "runtime" "time" "git.rosy.net.cn/baseapi/platformapi/jdapi" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxstore/cms" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/model/legacymodel2" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals/api" ) func TransferLegacyJdOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { sqlBatchCount := 1000 rootTask := tasksch.NewSeqTask("TransferLegacyJdOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { sql := ` SELECT t1.* FROM jdorder t1 LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.vendor_order_id WHERE t2.id IS NULL LIMIT ? ` db := dao.GetDB() var jdOrderList []*legacymodel2.Jdorder if err = dao.GetRows(db, &jdOrderList, sql, sqlBatchCount); err != nil { return nil, err } if len(jdOrderList) > 0 { task := tasksch.NewParallelTask("TransferLegacyJdOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { var orderDetailList []*model.GoodsOrderOriginal for _, v := range batchItemList { jdOrder := v.(*legacymodel2.Jdorder) var detail map[string]interface{} if err = utils.UnmarshalUseNumber([]byte(jdOrder.Data), &detail); err != nil { return nil, err } resultList := detail["result"].(map[string]interface{})["resultList"].([]interface{}) if len(resultList) > 0 { originalData := resultList[0].(map[string]interface{}) orgCode := originalData["orgCode"].(string) if orgCode == "320406" { orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: jdOrder.VendorOrderID, VendorID: model.VendorIDJD, AccountNo: orgCode, OrderCreatedAt: utils.Str2Time(originalData["orderPurchaseTime"].(string)), OriginalData: string(utils.MustMarshal(originalData)), } orderDetailList = append(orderDetailList, orderDetail) } } } if len(orderDetailList) > 0 { err = dao.CreateMultiEntities(db, orderDetailList) } return nil, err }, jdOrderList) rootTask.AddChild(task).Run() _, err = task.GetResult(0) runtime.GC() } else { rootTask.Cancel() } return nil, err }, math.MaxInt32) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func TransferLegacyElmOrder(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { sqlBatchCount := 1000 rootTask := tasksch.NewSeqTask("TransferLegacyElmOrder", ctx.GetUserName(), func(rootTask *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { sql := ` SELECT t1.* FROM elemeorder t1 LEFT JOIN goods_order_original t2 ON t2.vendor_order_id = t1.orderid WHERE t2.id IS NULL LIMIT ? ` db := dao.GetDB() var elmOrderList []*legacymodel2.Elemeorder if err = dao.GetRows(db, &elmOrderList, sql, sqlBatchCount); err != nil { return "", err } if len(elmOrderList) > 0 { task := tasksch.NewParallelTask("TransferLegacyElmOrder2", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetBatchSize(40), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { var orderDetailList []*model.GoodsOrderOriginal for _, v := range batchItemList { elmOrder := v.(*legacymodel2.Elemeorder) var detail map[string]interface{} if err = utils.UnmarshalUseNumber([]byte(elmOrder.Data), &detail); err != nil { return nil, err } orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: elmOrder.Orderid, VendorID: model.VendorIDELM, AccountNo: "fakeelm", OrderCreatedAt: utils.Str2Time(detail["activeAt"].(string)), OriginalData: elmOrder.Data, } orderDetailList = append(orderDetailList, orderDetail) } if len(orderDetailList) > 0 { err = dao.CreateMultiEntities(db, orderDetailList) } return nil, err }, elmOrderList) rootTask.AddChild(task).Run() _, err = task.GetResult(0) runtime.GC() } else { rootTask.Cancel() } return nil, err }, math.MaxInt32) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func saveJdOrderList(existJdIDMap map[string]int, jdOrderList []interface{}, jdStoreOrderList []map[string]interface{}) (err error) { var storeOrderList []*model.TempGoodsOrderMobile for _, v := range jdStoreOrderList { order := &model.TempGoodsOrderMobile{ VendorOrderID: v["orderId"].(string), VendorID: model.VendorIDJD, AccountNo: v["venderId"].(string), OrderCreatedAt: utils.Timestamp2Time(utils.MustInterface2Int64(v["orderPaidTime"]) / 1000), Mobile: v["mobile"].(string), } storeOrderList = append(storeOrderList, order) } var orderDetailList []*model.GoodsOrderOriginal for _, v := range jdOrderList { orderMap := v.(map[string]interface{}) orderID := utils.Int64ToStr(utils.MustInterface2Int64(orderMap["orderId"])) if existJdIDMap[orderID] == 0 { orgCode := orderMap["orgCode"].(string) orderDetail := &model.GoodsOrderOriginal{ VendorOrderID: orderID, VendorID: model.VendorIDJD, AccountNo: orgCode, OrderCreatedAt: utils.Str2Time(orderMap["orderPurchaseTime"].(string)), OriginalData: string(utils.MustMarshal(orderMap)), } orderDetailList = append(orderDetailList, orderDetail) } } if len(orderDetailList) > 0 || len(storeOrderList) > 0 { db := dao.GetDB() if len(orderDetailList) > 0 { // err = dao.CreateMultiEntities(db, orderDetailList) for _, v := range orderDetailList { dao.CreateEntity(db, v) } } if len(storeOrderList) > 0 { // err = dao.CreateMultiEntities(db, storeOrderList) for _, v := range storeOrderList { dao.CreateEntity(db, v) } } } return err } func PullJdOrder(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { var existJdIDs []string db := dao.GetDB() if err = dao.GetRows(db, &existJdIDs, ` SELECT vendor_order_id FROM goods_order_original WHERE vendor_id = ?`, model.VendorIDJD); err != nil { return "", err } existJdIDMap := make(map[string]int) for _, v := range existJdIDs { existJdIDMap[v] = 1 } pageSize := 100 hourGap := 1 gapCount := int((toTime.Sub(fromTime)-time.Hour*time.Duration(hourGap))/(time.Hour*time.Duration(hourGap)) + 1) if gapCount <= 0 { gapCount = 1 } gapList := make([]int, gapCount) for k := range gapList { gapList[k] = k } rootTask := tasksch.NewParallelTask("PullJdOrder", tasksch.NewParallelConfig().SetIsContinueWhenError(isContinueWhenError).SetParallelCount(20), ctx.GetUserName(), func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { gapIndex := batchItemList[0].(int) hourIndex := gapIndex * hourGap subFromTime := fromTime.Add(time.Duration(hourIndex) * time.Hour) subToTime := fromTime.Add(time.Duration(hourIndex+hourGap) * time.Hour) if subToTime.Sub(toTime) > 0 { subToTime = toTime } if true { //gapIndex < gapCount-1 { subToTime = subToTime.Add(-1 * time.Second) // 减一秒 } commonParams := map[string]interface{}{ jdapi.KeyPageSize: pageSize, "orderPurchaseTime_begin": utils.Time2Str(subFromTime), "orderPurchaseTime_end": utils.Time2Str(subToTime), } // globals.SugarLogger.Debugf("PullJdOrder, commonParams=%s", utils.Format4Output(commonParams, false)) orderList, totalCount, err := api.JdAPI.OrderQuery(commonParams) if err != nil { return "", err } storeOrderList, err := api.JdAPI.GetStoreOrderInfoList(utils.Time2Str(subFromTime), utils.Time2Str(subToTime)) if err != nil { } if err = saveJdOrderList(existJdIDMap, orderList, storeOrderList); err != nil { return "", err } if false { globals.SugarLogger.Debugf("subFromTime:%s, subToTime:%s, totalCount:%d, len(orderList):%d", utils.Time2Str(subFromTime), utils.Time2Str(subToTime), totalCount, len(orderList)) } return nil, err }, gapList) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err } func UpdateJdOrderRealMobile(ctx *jxcontext.Context, fromTime, toTime time.Time, isAsync, isContinueWhenError bool) (hint string, err error) { return hint, err } func DeleteWrongSpu(ctx *jxcontext.Context, isAsync, isContinueWhenError bool) (hint string, err error) { // sql := ` // SELECT t1.* // FROM sku_name t1 // JOIN sku_name t2 ON t2.link_id = t1.id AND t2.deleted_at = ? // WHERE t1.deleted_at = ?; // ` // db := dao.GetDB() // var skuNameList []*model.SkuName // if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue, utils.DefaultTimeValue); err != nil { // return "", err // } // rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { // _, err = cms.DeleteSkuName(ctx, skuNameList[step].ID, ctx.GetUserName()) // if err != nil { // globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err) // err = nil // 强制忽略错误 // } // return nil, err // }, len(skuNameList)) sql := ` SELECT t1.* FROM sku_name t1 WHERE t1.deleted_at = ? AND t1.is_spu = 1 AND t1.jd_id > 0 ORDER BY t1.id ` db := dao.GetDB() var skuNameList []*model.SkuName if err = dao.GetRows(db, &skuNameList, sql, utils.DefaultTimeValue); err != nil { return "", err } rootTask := tasksch.NewSeqTask("DeleteWrongSpu", ctx.GetUserName(), func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { mapData := map[string]interface{}{ "name": skuNameList[step].Name, } _, err = cms.UpdateSkuName(ctx, skuNameList[step].ID, mapData, ctx.GetUserName()) if err != nil { globals.SugarLogger.Debugf("DeleteWrongSpu failed nameid:%d, name:%s, with error:%v", skuNameList[step].ID, skuNameList[step].Name, err) err = nil // 强制忽略错误 } return nil, err }, len(skuNameList)) tasksch.ManageTask(rootTask).Run() if !isAsync { _, err = rootTask.GetResult(0) } else { hint = rootTask.ID } return hint, err }