From 39be1d9d9e804a19768778b232d695ad246bb470 Mon Sep 17 00:00:00 2001 From: "807875765@qq.com" <807875765@qq.com> Date: Wed, 6 Apr 2022 16:08:41 +0800 Subject: [PATCH] =?UTF-8?q?JCQ=202022/04/06=20=E6=9A=82=E6=97=B6=E6=B3=A8?= =?UTF-8?q?=E9=87=8A=20=E6=89=93=E5=BC=80=E5=90=8E=E4=BC=9A=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=AF=8F10s=E5=A4=84=E7=90=86=E4=B8=80=E6=AC=A1?= =?UTF-8?q?=E6=B6=88=E6=81=AF=20=20xuhang?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jxcallback/scheduler/jcq/jcqscheduler.go | 280 ++++++++++++++++++ business/jxstore/misc/misc.go | 9 + business/partner/purchase/jdshop/callback.go | 5 +- .../partner/purchase/jdshop/callback_test.go | 5 + business/partner/purchase/jdshop/key.go | 45 +-- globals/beegodb/beegodb.go | 2 + 6 files changed, 326 insertions(+), 20 deletions(-) create mode 100644 business/jxcallback/scheduler/jcq/jcqscheduler.go diff --git a/business/jxcallback/scheduler/jcq/jcqscheduler.go b/business/jxcallback/scheduler/jcq/jcqscheduler.go new file mode 100644 index 000000000..f9e8171b2 --- /dev/null +++ b/business/jxcallback/scheduler/jcq/jcqscheduler.go @@ -0,0 +1,280 @@ +package jcq + +import ( + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "git.rosy.net.cn/baseapi" + "git.rosy.net.cn/baseapi/platformapi/jcqapi" + "git.rosy.net.cn/baseapi/platformapi/jdshopapi" + "git.rosy.net.cn/jx-callback/business/model/dao" + "git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop" + "sync" +) + +const ( + accessKey = "C0EB36912C652079DB111A922ACA406B" //京东第三方的AK + secretKey = "86B6330051ECC88391E2630D34C2CA13" //京东第三方的SK + consumerGroupID = "open_message_573819178445" //消费组ID + popOrderCreatTopic = "open_message_pop_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单创建主题名称 + popOrderChangeTopic = "open_message_pop_order_change_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单修改主题名称 + popOrderOutTopic = "open_message_pop_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单出库主题名称 + OrderOutTopic = "open_message_order_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //订单出库主题名称 + OrderCancelTopic = "open_message_order_order_cancel_E1D746D42474D5F1F1A10CECE75D99F6" //订单取消主题名称 + OrderPayTopic = "open_message_order_order_pay_E1D746D42474D5F1F1A10CECE75D99F6" //订单支付主题名称 + OrderCreatTopic = "open_message_order_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //订单创建主题 + DefaultSubscribeSize = 32 //默认拉取数量 + popOrderCreatType = "popCreat" //存储到数据库pop订单创建类型 + popOrderOutType = "popOut" //存储到数据库pop订单出库类型 + popOrderChangeType = "popChange" //存储到数据库pop订单修改类型 + OrderCreatType = "orderCreat" //订单创建类型 + OrderPayType = "orderPay" //订单支付类型 + OrderOutType = "orderOut" //订单出库主题 + OrderCancelType = "orderCancel" //订单取消类型 + +) + +var ( + TopicList = []string{ + popOrderCreatTopic, popOrderChangeTopic, popOrderOutTopic, OrderCreatTopic, OrderPayTopic, OrderOutTopic, OrderCancelTopic, + } +) + +type JcqMessage struct { //model + ID int64 `orm:"column(id)" ` + MessageID string `orm:"column(message_id)" ` + MessageBody string `orm:"column(message_body)"` + BusinessID string `orm:"column(business_id)"` + PropertyRetryTimes string `orm:"column(property_retry_times)"` + Status string `orm:"column(status)"` //表示业务逻辑状态 n 表示业务为处理 y 表示已处理 + MessageType string `orm:"column(message_type)" //消息类型 对应上面的枚举` +} + +type JqcTask struct { + TopicId string + Wg *sync.WaitGroup +} + +type PopCreatObject struct { //序列化属性首字母必须大写 否则会出现赋值问题 切记! 也可以考虑使用simplejson就需要这么多的struct + OrderCreateTime string `json:"orderCreateTime"` + OrderId int64 `json:"orderId"` + OrderPaymentType int `json:"orderPaymentType"` + OrderStatus string `json:"orderStatus"` + OrderType int `json:"orderType"` + VenderId int64 `json:"venderId"` +} + +type PopChangeObject struct { //Pop修改 + ErpOrderStatus int `json:"erpOrderStatus"` + Modified int `json:"modified"` + OrderCreateTime int `json:"orderCreateTime"` + OrderId int64 `json:"orderId"` + OrderPaymentType int `json:"orderPaymentType"` + OrderType int `json:"orderType"` + VenderId int `json:"venderId"` + Yn int `json:"yn"` +} +type PopCreatOrderMessage struct { //pop订单创建结构体 + PopCreatObj PopCreatObject `json:"object"` +} + +type CreatOrderMessage struct { + OrderCreatObj PopCreatObject `json:"object"` +} + +type PayOrderMessage struct { + OrderPayObject OrderPayObject `json:"object"` +} + +type OrderPayObject struct { + OrderId int64 `json:"orderId"` + OrderPaymentTime int `json:"orderPaymentTime"` + OrderPaymentType int `json:"orderPaymentType"` + OrderType int `json:"orderType"` + VenderId int `json:"venderId"` + OrderStatus int `json:"orderStatus"` +} + +func (t *JqcTask) Do() { + handleTopic(t.TopicId) + t.Wg.Done() +} + +var jcqApi *jcqapi.API + +// JCQSchedule creat by Hang +// 用于jcq队列返回数据处理 以及业务逻辑 由于在consumeInfo2加了ack 所以不会存在重复拉取的可能性 +// 使用ants 同时启动topic数量的携程 进行handle +func JCQSchedule() { + jcqApi = jcqapi.New(accessKey, secretKey) + currentTopic := OrderPayTopic + handleTopic(currentTopic) + + /* p, _ := ants.NewPoolWithFunc(10, func(data interface{}) { + task := data.(*JqcTask) + task.Do() + }) + defer p.Release() + var wg sync.WaitGroup + wg.Add(len(TopicList)) + tasks := make([]*JqcTask, 0, len(TopicList)) + for i := 0; i < len(TopicList); i++ { + task := &JqcTask{ + TopicId: TopicList[i], + Wg: &wg, + } + tasks = append(tasks, task) + p.Invoke(task) + } + wg.Wait()*/ + +} + +//处理topic 针对不同的topic进行存库操作 +func handleTopic(topicId string) { + consumeResults, err := jcqApi.ConsumeInfo2(topicId, consumerGroupID, DefaultSubscribeSize) + if err == nil { + if len(consumeResults) != 0 { + entityList := make([]JcqMessage, 0) + //构建实体 + for _, result := range consumeResults { + if topicId == popOrderCreatTopic { + var r PopCreatOrderMessage + //js, _ := simplejson.NewJson([]byte(result.MessageBody)) + json.Unmarshal([]byte(result.MessageBody), &r) + fmt.Println(r) + } + jcq := JcqMessage{ + MessageID: result.MessageID, + MessageBody: result.MessageBody, + BusinessID: result.Properties.BUSINESSID, + PropertyRetryTimes: result.Properties.PROPERTYRETRYTIMES, + Status: "n", //默认给定初始值 未做处理 + MessageType: transType(topicId), + } + entityList = append(entityList, jcq) + } + db := dao.GetDB() + if err = dao.CreateMultiEntities(db, entityList); err != nil { + baseapi.SugarLogger.Warnf("saveMessage orderID:%s, save jqs_message failed with error:%v", entityList, err) + } else { //存库成功进行调用API操作 + //定义菜市api + jdmcshopapi := jdshopapi.New("37d36b62c0d14bd4b872f948b335c95czinj", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608") + //定义果园api + jdgyshopapi := jdshopapi.New("f9c5ce9a5ce24218936924f7c4864cc9owe1", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608") + for _, result := range consumeResults { + //处理订单支付,订单取消和pop订单出库的topic + if topicId == OrderPayTopic || topicId == OrderCancelTopic || topicId == popOrderOutTopic { + var r PayOrderMessage + //js, _ := simplejson.NewJson([]byte(result.MessageBody)) + json.Unmarshal([]byte(result.MessageBody), &r) + orderResult, err := jdmcshopapi.GetOrderById(r.OrderPayObject.OrderId, true) + isGy := false + if orderResult == nil && err == nil { + isGy = true + orderResult, err = jdgyshopapi.GetOrderById(r.OrderPayObject.OrderId, true) //查询不到之后说明不是这个账号的 使用果园再查一遍 + } + if err == nil { //确定返回结果 + // TO DO 处理数据转换为原先回调的结果 + if orderResult == nil { + baseapi.SugarLogger.Debug("未查询到订单ID", r.OrderPayObject.OrderId) + } else { + callBackResult := TransFormOrderResultToCallBackResult(orderResult, topicId, isGy) + jdshop.OnCallbackMsg(callBackResult) //进行处理 + } + } + } + } + } + + } + baseapi.SugarLogger.Debug("handle Topic the end") + } +} + +// 用于转换类型 +func transType(topic string) string { + switch topic { + case popOrderCreatTopic: + return popOrderCreatType + case popOrderChangeTopic: + return popOrderChangeType + case popOrderOutTopic: + return popOrderOutType + case OrderCreatTopic: + return OrderCreatType + case OrderPayTopic: + return OrderPayType + case OrderOutTopic: + return OrderOutType + case OrderCancelTopic: + return OrderCancelType + default: + return "" + } + +} + +// TransFormOrderResultToCallBackResult 转换数据类型 使其可以进入原先回调方法 +func TransFormOrderResultToCallBackResult(getOrderResult *jdshopapi.GetEnOrderResult, topicId string, isGy bool) *jdshopapi.CallBackResult { + return &jdshopapi.CallBackResult{ + OrderStateRemark: getOrderResult.OrderStateRemark, + OrderRemark: getOrderResult.OrderRemark, + OrderSellerPrice: getOrderResult.OrderSellerPrice, + OrderState: getOrderResult.OrderState, + OrderType: getOrderResult.OrderType, + OrderPayment: getOrderResult.OrderPayment, + PayType: getOrderResult.PayType, + StoreID: getOrderResult.StoreID, + OrderTotalPrice: getOrderResult.OrderTotalPrice, + OrderExt: getOrderResult.OrderExt, + StoreOrder: getOrderResult.StoreOrder, + OrderStartTime: getOrderResult.OrderStartTime, + OrderID: getOrderResult.OrderID, + OrderSource: getOrderResult.OrderSource, + FreightPrice: getOrderResult.FreightPrice, + MsgType: TransMsgType(topicId), + Pin:/*splitPin(getOrderResult.Pin),*/ "jd_temp", + IDSopShipmenttype: getOrderResult.IDSopShipmenttype, + ScDT: getOrderResult.ScDT, + SellerDiscount: getOrderResult.SellerDiscount, + ConsigneeInfo: getOrderResult.ConsigneeInfo, + ItemInfoList: getOrderResult.ItemInfoList, + VendorOrgCode: getVendorOrgCode(isGy), + MenDianID: getOrderResult.MenDianID, + BalanceUsed: getOrderResult.BalanceUsed} +} + +// TransMsgType 目前业务处理函数仅对一下三种topic做了业务处理 +func TransMsgType(topicId string) string { + switch topicId { + case OrderPayTopic: + return jcqapi.TopicOrderPay + case OrderCancelTopic: + return jcqapi.TopicOrderCancel + case popOrderOutTopic: + return jcqapi.TopicOrderOut + default: + return "" + } + +} + +//由于返回的pin长度过长 进行md5操作 压缩长度 +func splitPin(pin string) string { + h := md5.New() + h.Write([]byte(pin)) + s := hex.EncodeToString(h.Sum(nil)) + return s + +} + +// 根据标识判断加密标识 +func getVendorOrgCode(isGy bool) string { + if isGy { + return "2" + } else { + return "1" + } +} diff --git a/business/jxstore/misc/misc.go b/business/jxstore/misc/misc.go index 309560ac7..ffab0453e 100644 --- a/business/jxstore/misc/misc.go +++ b/business/jxstore/misc/misc.go @@ -341,6 +341,15 @@ func Init() { act.RrefreshEbaiVendorAct(jxcontext.AdminCtx) }, dailyWorkTimeList2) } + //update xuhang 2022/04/06用于同步京东数据以取代爬取订单 但由于京东的v2API已不支持手机号 因此以下代码暂时注释 后续接入虚拟号可再次open + /* ScheduleTimerFuncByInterval(func() { + jdshop.InitKey() + }, 10*time.Second, 8*time.Hour)*/ + //此处需要新增一个message 存储操作 服务器启动的30s后开始执行 每10s刷新一次 + //进行一个JCQ队列定时任务执行 + /* ScheduleTimerFuncByInterval(func() { + jcq.JCQSchedule() + }, 10*time.Second, 10*time.Second)*/ ScheduleTimerFunc("AutoSaleStoreSku", func() { cms.AutoSaleStoreSku(jxcontext.AdminCtx, nil, false) }, autoSaleStoreSkuTimeList) diff --git a/business/partner/purchase/jdshop/callback.go b/business/partner/purchase/jdshop/callback.go index a1b10b868..1b5a966ae 100644 --- a/business/partner/purchase/jdshop/callback.go +++ b/business/partner/purchase/jdshop/callback.go @@ -74,8 +74,11 @@ func OnCallbackMsg(msg *jdshopapi.CallBackResult) (err error) { } func SaveJdsOrders(msg *jdshopapi.CallBackResult) (err error) { + if msg.OrderState == "TRADE_CANCELED" { + return nil + } //清洗脏数据 部分数据按照 order, err := result2Orders(msg) - if err != nil || order == nil { + if err != nil && order == nil { return err } globals.SugarLogger.Debugf("SaveJdsOrders : %v", utils.Format4Output(order, false)) diff --git a/business/partner/purchase/jdshop/callback_test.go b/business/partner/purchase/jdshop/callback_test.go index 422da6326..fa14c09fa 100644 --- a/business/partner/purchase/jdshop/callback_test.go +++ b/business/partner/purchase/jdshop/callback_test.go @@ -27,3 +27,8 @@ func TestSaveJdsOrders(t *testing.T) { } fmt.Println("测试2") } + +func TestDecrypt(t *testing.T) { + InitKey() + fmt.Println(Decrypt("dGeoMeGNcXeT8iCHn3hTrCFYY8qfMnOptNcMFzAJA2/Dx/CPiZ526ec0NN0kWKs4+HwEGCLu9hAB9D0MIf8UB6q4G8IVgD3oXlOb89CFgGe0yO1HA9j51ESPFXh8=", "1")) +} diff --git a/business/partner/purchase/jdshop/key.go b/business/partner/purchase/jdshop/key.go index b192fcf79..ed5f60eaf 100644 --- a/business/partner/purchase/jdshop/key.go +++ b/business/partner/purchase/jdshop/key.go @@ -4,15 +4,14 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "git.rosy.net.cn/baseapi/platformapi/jdshopapi" "git.rosy.net.cn/jx-callback/globals" - - "git.rosy.net.cn/jx-callback/globals/api" ) var ( - KeyList []*Key - // KeyList2 []*Key + KeyList []*Key + KeyList2 []*Key ) type Key struct { @@ -27,7 +26,8 @@ type Key struct { } func InitKey() { - keyResult, err := api.JdShopAPI.KeyGet() + newapi := jdshopapi.New("37d36b62c0d14bd4b872f948b335c95czinj", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608") + keyResult, err := newapi.KeyGet() if err != nil { return } @@ -37,24 +37,31 @@ func InitKey() { err = json.Unmarshal(data, &vv) KeyList = append(KeyList, vv) } - // keyResult2, err := api.JdShop2API.KeyGet() - // if err != nil { - // return - // } - // for _, v := range keyResult2.Response.ServiceKeyList[0].Keys { - // data, _ := json.Marshal(v) - // vv := &Key{} - // err = json.Unmarshal(data, &vv) - // KeyList2 = append(KeyList2, vv) - // } + keyResult2, err := jdshopapi.New("f9c5ce9a5ce24218936924f7c4864cc9owe1", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608").KeyGet() + if err != nil { + return + } + for _, v := range keyResult2.Response.ServiceKeyList[0].Keys { + data, _ := json.Marshal(v) + vv := &Key{} + err = json.Unmarshal(data, &vv) + KeyList2 = append(KeyList2, vv) + } globals.SugarLogger.Debugf("jdshop key refreshed..") } func GetKey(keySign, vendorOrgCode string) (key string) { - // if vendorOrgCode == "1" { - for _, v := range KeyList { - data, _ := base64.StdEncoding.DecodeString(v.ID) - if keySign == hex.EncodeToString(data) { + if vendorOrgCode == "1" { + for _, v := range KeyList { + data, _ := base64.StdEncoding.DecodeString(v.ID) + if keySign == hex.EncodeToString(data) { + return v.KeyString + } + } + } else { + for _, v := range KeyList2 { + //data, _ := base64.StdEncoding.DecodeString(v.ID) + //if keySign == hex.EncodeToString(data) { return v.KeyString } } diff --git a/globals/beegodb/beegodb.go b/globals/beegodb/beegodb.go index 76d9ea039..c111d7f7d 100644 --- a/globals/beegodb/beegodb.go +++ b/globals/beegodb/beegodb.go @@ -1,6 +1,7 @@ package beegodb import ( + "git.rosy.net.cn/jx-callback/business/jxcallback/scheduler/jcq" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/legacymodel" "git.rosy.net.cn/jx-callback/globals" @@ -38,6 +39,7 @@ func Init() { orm.RegisterModel(new(model.Message)) orm.RegisterModel(new(model.MessageStatus)) orm.RegisterModel(new(model.ImMessageRecord)) + orm.RegisterModel(new(jcq.JcqMessage)) orm.RegisterModel(&model.Place{}) orm.RegisterModel(&model.Store{}, &model.StoreSub{}, &model.StoreMap{}, &model.StoreCourierMap{}, &model.StoreCategoryMap{})