JCQ 2022/04/06 暂时注释 打开后会执行每10s处理一次消息 xuhang

This commit is contained in:
807875765@qq.com
2022-04-06 16:08:41 +08:00
parent f8312c5656
commit 39be1d9d9e
6 changed files with 326 additions and 20 deletions

View File

@@ -0,0 +1,280 @@
package jcq
import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"git.rosy.net.cn/baseapi"
"git.rosy.net.cn/baseapi/platformapi/jcqapi"
"git.rosy.net.cn/baseapi/platformapi/jdshopapi"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop"
"sync"
)
const (
accessKey = "C0EB36912C652079DB111A922ACA406B" //京东第三方的AK
secretKey = "86B6330051ECC88391E2630D34C2CA13" //京东第三方的SK
consumerGroupID = "open_message_573819178445" //消费组ID
popOrderCreatTopic = "open_message_pop_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单创建主题名称
popOrderChangeTopic = "open_message_pop_order_change_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单修改主题名称
popOrderOutTopic = "open_message_pop_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单出库主题名称
OrderOutTopic = "open_message_order_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //订单出库主题名称
OrderCancelTopic = "open_message_order_order_cancel_E1D746D42474D5F1F1A10CECE75D99F6" //订单取消主题名称
OrderPayTopic = "open_message_order_order_pay_E1D746D42474D5F1F1A10CECE75D99F6" //订单支付主题名称
OrderCreatTopic = "open_message_order_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //订单创建主题
DefaultSubscribeSize = 32 //默认拉取数量
popOrderCreatType = "popCreat" //存储到数据库pop订单创建类型
popOrderOutType = "popOut" //存储到数据库pop订单出库类型
popOrderChangeType = "popChange" //存储到数据库pop订单修改类型
OrderCreatType = "orderCreat" //订单创建类型
OrderPayType = "orderPay" //订单支付类型
OrderOutType = "orderOut" //订单出库主题
OrderCancelType = "orderCancel" //订单取消类型
)
var (
TopicList = []string{
popOrderCreatTopic, popOrderChangeTopic, popOrderOutTopic, OrderCreatTopic, OrderPayTopic, OrderOutTopic, OrderCancelTopic,
}
)
type JcqMessage struct { //model
ID int64 `orm:"column(id)" `
MessageID string `orm:"column(message_id)" `
MessageBody string `orm:"column(message_body)"`
BusinessID string `orm:"column(business_id)"`
PropertyRetryTimes string `orm:"column(property_retry_times)"`
Status string `orm:"column(status)"` //表示业务逻辑状态 n 表示业务为处理 y 表示已处理
MessageType string `orm:"column(message_type)" //消息类型 对应上面的枚举`
}
type JqcTask struct {
TopicId string
Wg *sync.WaitGroup
}
type PopCreatObject struct { //序列化属性首字母必须大写 否则会出现赋值问题 切记! 也可以考虑使用simplejson就需要这么多的struct
OrderCreateTime string `json:"orderCreateTime"`
OrderId int64 `json:"orderId"`
OrderPaymentType int `json:"orderPaymentType"`
OrderStatus string `json:"orderStatus"`
OrderType int `json:"orderType"`
VenderId int64 `json:"venderId"`
}
type PopChangeObject struct { //Pop修改
ErpOrderStatus int `json:"erpOrderStatus"`
Modified int `json:"modified"`
OrderCreateTime int `json:"orderCreateTime"`
OrderId int64 `json:"orderId"`
OrderPaymentType int `json:"orderPaymentType"`
OrderType int `json:"orderType"`
VenderId int `json:"venderId"`
Yn int `json:"yn"`
}
type PopCreatOrderMessage struct { //pop订单创建结构体
PopCreatObj PopCreatObject `json:"object"`
}
type CreatOrderMessage struct {
OrderCreatObj PopCreatObject `json:"object"`
}
type PayOrderMessage struct {
OrderPayObject OrderPayObject `json:"object"`
}
type OrderPayObject struct {
OrderId int64 `json:"orderId"`
OrderPaymentTime int `json:"orderPaymentTime"`
OrderPaymentType int `json:"orderPaymentType"`
OrderType int `json:"orderType"`
VenderId int `json:"venderId"`
OrderStatus int `json:"orderStatus"`
}
func (t *JqcTask) Do() {
handleTopic(t.TopicId)
t.Wg.Done()
}
var jcqApi *jcqapi.API
// JCQSchedule creat by Hang
// 用于jcq队列返回数据处理 以及业务逻辑 由于在consumeInfo2加了ack 所以不会存在重复拉取的可能性
// 使用ants 同时启动topic数量的携程 进行handle
func JCQSchedule() {
jcqApi = jcqapi.New(accessKey, secretKey)
currentTopic := OrderPayTopic
handleTopic(currentTopic)
/* p, _ := ants.NewPoolWithFunc(10, func(data interface{}) {
task := data.(*JqcTask)
task.Do()
})
defer p.Release()
var wg sync.WaitGroup
wg.Add(len(TopicList))
tasks := make([]*JqcTask, 0, len(TopicList))
for i := 0; i < len(TopicList); i++ {
task := &JqcTask{
TopicId: TopicList[i],
Wg: &wg,
}
tasks = append(tasks, task)
p.Invoke(task)
}
wg.Wait()*/
}
//处理topic 针对不同的topic进行存库操作
func handleTopic(topicId string) {
consumeResults, err := jcqApi.ConsumeInfo2(topicId, consumerGroupID, DefaultSubscribeSize)
if err == nil {
if len(consumeResults) != 0 {
entityList := make([]JcqMessage, 0)
//构建实体
for _, result := range consumeResults {
if topicId == popOrderCreatTopic {
var r PopCreatOrderMessage
//js, _ := simplejson.NewJson([]byte(result.MessageBody))
json.Unmarshal([]byte(result.MessageBody), &r)
fmt.Println(r)
}
jcq := JcqMessage{
MessageID: result.MessageID,
MessageBody: result.MessageBody,
BusinessID: result.Properties.BUSINESSID,
PropertyRetryTimes: result.Properties.PROPERTYRETRYTIMES,
Status: "n", //默认给定初始值 未做处理
MessageType: transType(topicId),
}
entityList = append(entityList, jcq)
}
db := dao.GetDB()
if err = dao.CreateMultiEntities(db, entityList); err != nil {
baseapi.SugarLogger.Warnf("saveMessage orderID:%s, save jqs_message failed with error:%v", entityList, err)
} else { //存库成功进行调用API操作
//定义菜市api
jdmcshopapi := jdshopapi.New("37d36b62c0d14bd4b872f948b335c95czinj", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608")
//定义果园api
jdgyshopapi := jdshopapi.New("f9c5ce9a5ce24218936924f7c4864cc9owe1", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608")
for _, result := range consumeResults {
//处理订单支付订单取消和pop订单出库的topic
if topicId == OrderPayTopic || topicId == OrderCancelTopic || topicId == popOrderOutTopic {
var r PayOrderMessage
//js, _ := simplejson.NewJson([]byte(result.MessageBody))
json.Unmarshal([]byte(result.MessageBody), &r)
orderResult, err := jdmcshopapi.GetOrderById(r.OrderPayObject.OrderId, true)
isGy := false
if orderResult == nil && err == nil {
isGy = true
orderResult, err = jdgyshopapi.GetOrderById(r.OrderPayObject.OrderId, true) //查询不到之后说明不是这个账号的 使用果园再查一遍
}
if err == nil { //确定返回结果
// TO DO 处理数据转换为原先回调的结果
if orderResult == nil {
baseapi.SugarLogger.Debug("未查询到订单ID", r.OrderPayObject.OrderId)
} else {
callBackResult := TransFormOrderResultToCallBackResult(orderResult, topicId, isGy)
jdshop.OnCallbackMsg(callBackResult) //进行处理
}
}
}
}
}
}
baseapi.SugarLogger.Debug("handle Topic the end")
}
}
// 用于转换类型
func transType(topic string) string {
switch topic {
case popOrderCreatTopic:
return popOrderCreatType
case popOrderChangeTopic:
return popOrderChangeType
case popOrderOutTopic:
return popOrderOutType
case OrderCreatTopic:
return OrderCreatType
case OrderPayTopic:
return OrderPayType
case OrderOutTopic:
return OrderOutType
case OrderCancelTopic:
return OrderCancelType
default:
return ""
}
}
// TransFormOrderResultToCallBackResult 转换数据类型 使其可以进入原先回调方法
func TransFormOrderResultToCallBackResult(getOrderResult *jdshopapi.GetEnOrderResult, topicId string, isGy bool) *jdshopapi.CallBackResult {
return &jdshopapi.CallBackResult{
OrderStateRemark: getOrderResult.OrderStateRemark,
OrderRemark: getOrderResult.OrderRemark,
OrderSellerPrice: getOrderResult.OrderSellerPrice,
OrderState: getOrderResult.OrderState,
OrderType: getOrderResult.OrderType,
OrderPayment: getOrderResult.OrderPayment,
PayType: getOrderResult.PayType,
StoreID: getOrderResult.StoreID,
OrderTotalPrice: getOrderResult.OrderTotalPrice,
OrderExt: getOrderResult.OrderExt,
StoreOrder: getOrderResult.StoreOrder,
OrderStartTime: getOrderResult.OrderStartTime,
OrderID: getOrderResult.OrderID,
OrderSource: getOrderResult.OrderSource,
FreightPrice: getOrderResult.FreightPrice,
MsgType: TransMsgType(topicId),
Pin:/*splitPin(getOrderResult.Pin),*/ "jd_temp",
IDSopShipmenttype: getOrderResult.IDSopShipmenttype,
ScDT: getOrderResult.ScDT,
SellerDiscount: getOrderResult.SellerDiscount,
ConsigneeInfo: getOrderResult.ConsigneeInfo,
ItemInfoList: getOrderResult.ItemInfoList,
VendorOrgCode: getVendorOrgCode(isGy),
MenDianID: getOrderResult.MenDianID,
BalanceUsed: getOrderResult.BalanceUsed}
}
// TransMsgType 目前业务处理函数仅对一下三种topic做了业务处理
func TransMsgType(topicId string) string {
switch topicId {
case OrderPayTopic:
return jcqapi.TopicOrderPay
case OrderCancelTopic:
return jcqapi.TopicOrderCancel
case popOrderOutTopic:
return jcqapi.TopicOrderOut
default:
return ""
}
}
//由于返回的pin长度过长 进行md5操作 压缩长度
func splitPin(pin string) string {
h := md5.New()
h.Write([]byte(pin))
s := hex.EncodeToString(h.Sum(nil))
return s
}
// 根据标识判断加密标识
func getVendorOrgCode(isGy bool) string {
if isGy {
return "2"
} else {
return "1"
}
}