281 lines
10 KiB
Go
281 lines
10 KiB
Go
package jcq
|
||
|
||
import (
|
||
"crypto/md5"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"fmt"
|
||
"git.rosy.net.cn/baseapi"
|
||
"git.rosy.net.cn/baseapi/platformapi/jcqapi"
|
||
"git.rosy.net.cn/baseapi/platformapi/jdshopapi"
|
||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||
"git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop"
|
||
"sync"
|
||
)
|
||
|
||
const (
|
||
accessKey = "C0EB36912C652079DB111A922ACA406B" //京东第三方的AK
|
||
secretKey = "86B6330051ECC88391E2630D34C2CA13" //京东第三方的SK
|
||
consumerGroupID = "open_message_573819178445" //消费组ID
|
||
popOrderCreatTopic = "open_message_pop_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单创建主题名称
|
||
popOrderChangeTopic = "open_message_pop_order_change_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单修改主题名称
|
||
popOrderOutTopic = "open_message_pop_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单出库主题名称
|
||
OrderOutTopic = "open_message_order_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //订单出库主题名称
|
||
OrderCancelTopic = "open_message_order_order_cancel_E1D746D42474D5F1F1A10CECE75D99F6" //订单取消主题名称
|
||
OrderPayTopic = "open_message_order_order_pay_E1D746D42474D5F1F1A10CECE75D99F6" //订单支付主题名称
|
||
OrderCreatTopic = "open_message_order_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //订单创建主题
|
||
DefaultSubscribeSize = 32 //默认拉取数量
|
||
popOrderCreatType = "popCreat" //存储到数据库pop订单创建类型
|
||
popOrderOutType = "popOut" //存储到数据库pop订单出库类型
|
||
popOrderChangeType = "popChange" //存储到数据库pop订单修改类型
|
||
OrderCreatType = "orderCreat" //订单创建类型
|
||
OrderPayType = "orderPay" //订单支付类型
|
||
OrderOutType = "orderOut" //订单出库主题
|
||
OrderCancelType = "orderCancel" //订单取消类型
|
||
|
||
)
|
||
|
||
var (
|
||
TopicList = []string{
|
||
popOrderCreatTopic, popOrderChangeTopic, popOrderOutTopic, OrderCreatTopic, OrderPayTopic, OrderOutTopic, OrderCancelTopic,
|
||
}
|
||
)
|
||
|
||
type JcqMessage struct { //model
|
||
ID int64 `orm:"column(id)" `
|
||
MessageID string `orm:"column(message_id)" `
|
||
MessageBody string `orm:"column(message_body)"`
|
||
BusinessID string `orm:"column(business_id)"`
|
||
PropertyRetryTimes string `orm:"column(property_retry_times)"`
|
||
Status string `orm:"column(status)"` //表示业务逻辑状态 n 表示业务为处理 y 表示已处理
|
||
MessageType string `orm:"column(message_type)" //消息类型 对应上面的枚举`
|
||
}
|
||
|
||
type JqcTask struct {
|
||
TopicId string
|
||
Wg *sync.WaitGroup
|
||
}
|
||
|
||
type PopCreatObject struct { //序列化属性首字母必须大写 否则会出现赋值问题 切记! 也可以考虑使用simplejson就需要这么多的struct
|
||
OrderCreateTime string `json:"orderCreateTime"`
|
||
OrderId int64 `json:"orderId"`
|
||
OrderPaymentType int `json:"orderPaymentType"`
|
||
OrderStatus string `json:"orderStatus"`
|
||
OrderType int `json:"orderType"`
|
||
VenderId int64 `json:"venderId"`
|
||
}
|
||
|
||
type PopChangeObject struct { //Pop修改
|
||
ErpOrderStatus int `json:"erpOrderStatus"`
|
||
Modified int `json:"modified"`
|
||
OrderCreateTime int `json:"orderCreateTime"`
|
||
OrderId int64 `json:"orderId"`
|
||
OrderPaymentType int `json:"orderPaymentType"`
|
||
OrderType int `json:"orderType"`
|
||
VenderId int `json:"venderId"`
|
||
Yn int `json:"yn"`
|
||
}
|
||
type PopCreatOrderMessage struct { //pop订单创建结构体
|
||
PopCreatObj PopCreatObject `json:"object"`
|
||
}
|
||
|
||
type CreatOrderMessage struct {
|
||
OrderCreatObj PopCreatObject `json:"object"`
|
||
}
|
||
|
||
type PayOrderMessage struct {
|
||
OrderPayObject OrderPayObject `json:"object"`
|
||
}
|
||
|
||
type OrderPayObject struct {
|
||
OrderId int64 `json:"orderId"`
|
||
OrderPaymentTime int `json:"orderPaymentTime"`
|
||
OrderPaymentType int `json:"orderPaymentType"`
|
||
OrderType int `json:"orderType"`
|
||
VenderId int `json:"venderId"`
|
||
OrderStatus int `json:"orderStatus"`
|
||
}
|
||
|
||
func (t *JqcTask) Do() {
|
||
handleTopic(t.TopicId)
|
||
t.Wg.Done()
|
||
}
|
||
|
||
var jcqApi *jcqapi.API
|
||
|
||
// JCQSchedule creat by Hang
|
||
// 用于jcq队列返回数据处理 以及业务逻辑 由于在consumeInfo2加了ack 所以不会存在重复拉取的可能性
|
||
// 使用ants 同时启动topic数量的携程 进行handle
|
||
func JCQSchedule() {
|
||
jcqApi = jcqapi.New(accessKey, secretKey)
|
||
currentTopic := OrderPayTopic
|
||
handleTopic(currentTopic)
|
||
|
||
/* p, _ := ants.NewPoolWithFunc(10, func(data interface{}) {
|
||
task := data.(*JqcTask)
|
||
task.Do()
|
||
})
|
||
defer p.Release()
|
||
var wg sync.WaitGroup
|
||
wg.Add(len(TopicList))
|
||
tasks := make([]*JqcTask, 0, len(TopicList))
|
||
for i := 0; i < len(TopicList); i++ {
|
||
task := &JqcTask{
|
||
TopicId: TopicList[i],
|
||
Wg: &wg,
|
||
}
|
||
tasks = append(tasks, task)
|
||
p.Invoke(task)
|
||
}
|
||
wg.Wait()*/
|
||
|
||
}
|
||
|
||
//处理topic 针对不同的topic进行存库操作
|
||
func handleTopic(topicId string) {
|
||
consumeResults, err := jcqApi.ConsumeInfo2(topicId, consumerGroupID, DefaultSubscribeSize)
|
||
if err == nil {
|
||
if len(consumeResults) != 0 {
|
||
entityList := make([]JcqMessage, 0)
|
||
//构建实体
|
||
for _, result := range consumeResults {
|
||
if topicId == popOrderCreatTopic {
|
||
var r PopCreatOrderMessage
|
||
//js, _ := simplejson.NewJson([]byte(result.MessageBody))
|
||
json.Unmarshal([]byte(result.MessageBody), &r)
|
||
fmt.Println(r)
|
||
}
|
||
jcq := JcqMessage{
|
||
MessageID: result.MessageID,
|
||
MessageBody: result.MessageBody,
|
||
BusinessID: result.Properties.BUSINESSID,
|
||
PropertyRetryTimes: result.Properties.PROPERTYRETRYTIMES,
|
||
Status: "n", //默认给定初始值 未做处理
|
||
MessageType: transType(topicId),
|
||
}
|
||
entityList = append(entityList, jcq)
|
||
}
|
||
db := dao.GetDB()
|
||
if err = dao.CreateMultiEntities(db, entityList); err != nil {
|
||
baseapi.SugarLogger.Warnf("saveMessage orderID:%s, save jqs_message failed with error:%v", entityList, err)
|
||
} else { //存库成功进行调用API操作
|
||
//定义菜市api
|
||
jdmcshopapi := jdshopapi.New("37d36b62c0d14bd4b872f948b335c95czinj", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608")
|
||
//定义果园api
|
||
jdgyshopapi := jdshopapi.New("f9c5ce9a5ce24218936924f7c4864cc9owe1", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608")
|
||
for _, result := range consumeResults {
|
||
//处理订单支付,订单取消和pop订单出库的topic
|
||
if topicId == OrderPayTopic || topicId == OrderCancelTopic || topicId == popOrderOutTopic {
|
||
var r PayOrderMessage
|
||
//js, _ := simplejson.NewJson([]byte(result.MessageBody))
|
||
json.Unmarshal([]byte(result.MessageBody), &r)
|
||
orderResult, err := jdmcshopapi.GetOrderById(r.OrderPayObject.OrderId, true)
|
||
isGy := false
|
||
if orderResult == nil && err == nil {
|
||
isGy = true
|
||
orderResult, err = jdgyshopapi.GetOrderById(r.OrderPayObject.OrderId, true) //查询不到之后说明不是这个账号的 使用果园再查一遍
|
||
}
|
||
if err == nil { //确定返回结果
|
||
// TO DO 处理数据转换为原先回调的结果
|
||
if orderResult == nil {
|
||
baseapi.SugarLogger.Debug("未查询到订单ID", r.OrderPayObject.OrderId)
|
||
} else {
|
||
callBackResult := TransFormOrderResultToCallBackResult(orderResult, topicId, isGy)
|
||
jdshop.OnCallbackMsg(callBackResult) //进行处理
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
}
|
||
baseapi.SugarLogger.Debug("handle Topic the end")
|
||
}
|
||
}
|
||
|
||
// 用于转换类型
|
||
func transType(topic string) string {
|
||
switch topic {
|
||
case popOrderCreatTopic:
|
||
return popOrderCreatType
|
||
case popOrderChangeTopic:
|
||
return popOrderChangeType
|
||
case popOrderOutTopic:
|
||
return popOrderOutType
|
||
case OrderCreatTopic:
|
||
return OrderCreatType
|
||
case OrderPayTopic:
|
||
return OrderPayType
|
||
case OrderOutTopic:
|
||
return OrderOutType
|
||
case OrderCancelTopic:
|
||
return OrderCancelType
|
||
default:
|
||
return ""
|
||
}
|
||
|
||
}
|
||
|
||
// TransFormOrderResultToCallBackResult 转换数据类型 使其可以进入原先回调方法
|
||
func TransFormOrderResultToCallBackResult(getOrderResult *jdshopapi.GetEnOrderResult, topicId string, isGy bool) *jdshopapi.CallBackResult {
|
||
return &jdshopapi.CallBackResult{
|
||
OrderStateRemark: getOrderResult.OrderStateRemark,
|
||
OrderRemark: getOrderResult.OrderRemark,
|
||
OrderSellerPrice: getOrderResult.OrderSellerPrice,
|
||
OrderState: getOrderResult.OrderState,
|
||
OrderType: getOrderResult.OrderType,
|
||
OrderPayment: getOrderResult.OrderPayment,
|
||
PayType: getOrderResult.PayType,
|
||
StoreID: getOrderResult.StoreID,
|
||
OrderTotalPrice: getOrderResult.OrderTotalPrice,
|
||
OrderExt: getOrderResult.OrderExt,
|
||
StoreOrder: getOrderResult.StoreOrder,
|
||
OrderStartTime: getOrderResult.OrderStartTime,
|
||
OrderID: getOrderResult.OrderID,
|
||
OrderSource: getOrderResult.OrderSource,
|
||
FreightPrice: getOrderResult.FreightPrice,
|
||
MsgType: TransMsgType(topicId),
|
||
Pin:/*splitPin(getOrderResult.Pin),*/ "jd_temp",
|
||
IDSopShipmenttype: getOrderResult.IDSopShipmenttype,
|
||
ScDT: getOrderResult.ScDT,
|
||
SellerDiscount: getOrderResult.SellerDiscount,
|
||
ConsigneeInfo: getOrderResult.ConsigneeInfo,
|
||
ItemInfoList: getOrderResult.ItemInfoList,
|
||
VendorOrgCode: getVendorOrgCode(isGy),
|
||
MenDianID: getOrderResult.MenDianID,
|
||
BalanceUsed: getOrderResult.BalanceUsed}
|
||
}
|
||
|
||
// TransMsgType 目前业务处理函数仅对一下三种topic做了业务处理
|
||
func TransMsgType(topicId string) string {
|
||
switch topicId {
|
||
case OrderPayTopic:
|
||
return jcqapi.TopicOrderPay
|
||
case OrderCancelTopic:
|
||
return jcqapi.TopicOrderCancel
|
||
case popOrderOutTopic:
|
||
return jcqapi.TopicOrderOut
|
||
default:
|
||
return ""
|
||
}
|
||
|
||
}
|
||
|
||
//由于返回的pin长度过长 进行md5操作 压缩长度
|
||
func splitPin(pin string) string {
|
||
h := md5.New()
|
||
h.Write([]byte(pin))
|
||
s := hex.EncodeToString(h.Sum(nil))
|
||
return s
|
||
|
||
}
|
||
|
||
// 根据标识判断加密标识
|
||
func getVendorOrgCode(isGy bool) string {
|
||
if isGy {
|
||
return "2"
|
||
} else {
|
||
return "1"
|
||
}
|
||
}
|