package jcq import ( "crypto/md5" "encoding/hex" "encoding/json" "fmt" "git.rosy.net.cn/baseapi" "git.rosy.net.cn/baseapi/platformapi/jcqapi" "git.rosy.net.cn/baseapi/platformapi/jdshopapi" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner/purchase/jdshop" "sync" ) const ( accessKey = "C0EB36912C652079DB111A922ACA406B" //京东第三方的AK secretKey = "86B6330051ECC88391E2630D34C2CA13" //京东第三方的SK consumerGroupID = "open_message_573819178445" //消费组ID popOrderCreatTopic = "open_message_pop_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单创建主题名称 popOrderChangeTopic = "open_message_pop_order_change_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单修改主题名称 popOrderOutTopic = "open_message_pop_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //pop订单出库主题名称 OrderOutTopic = "open_message_order_order_out_E1D746D42474D5F1F1A10CECE75D99F6" //订单出库主题名称 OrderCancelTopic = "open_message_order_order_cancel_E1D746D42474D5F1F1A10CECE75D99F6" //订单取消主题名称 OrderPayTopic = "open_message_order_order_pay_E1D746D42474D5F1F1A10CECE75D99F6" //订单支付主题名称 OrderCreatTopic = "open_message_order_order_create_E1D746D42474D5F1F1A10CECE75D99F6" //订单创建主题 DefaultSubscribeSize = 32 //默认拉取数量 popOrderCreatType = "popCreat" //存储到数据库pop订单创建类型 popOrderOutType = "popOut" //存储到数据库pop订单出库类型 popOrderChangeType = "popChange" //存储到数据库pop订单修改类型 OrderCreatType = "orderCreat" //订单创建类型 OrderPayType = "orderPay" //订单支付类型 OrderOutType = "orderOut" //订单出库主题 OrderCancelType = "orderCancel" //订单取消类型 ) var ( TopicList = []string{ popOrderCreatTopic, popOrderChangeTopic, popOrderOutTopic, OrderCreatTopic, OrderPayTopic, OrderOutTopic, OrderCancelTopic, } ) type JcqMessage struct { //model ID int64 `orm:"column(id)" ` MessageID string `orm:"column(message_id)" ` MessageBody string `orm:"column(message_body)"` BusinessID string `orm:"column(business_id)"` PropertyRetryTimes string `orm:"column(property_retry_times)"` Status string `orm:"column(status)"` //表示业务逻辑状态 n 表示业务为处理 y 表示已处理 MessageType string `orm:"column(message_type)" //消息类型 对应上面的枚举` } type JqcTask struct { TopicId string Wg *sync.WaitGroup } type PopCreatObject struct { //序列化属性首字母必须大写 否则会出现赋值问题 切记! 也可以考虑使用simplejson就需要这么多的struct OrderCreateTime string `json:"orderCreateTime"` OrderId int64 `json:"orderId"` OrderPaymentType int `json:"orderPaymentType"` OrderStatus string `json:"orderStatus"` OrderType int `json:"orderType"` VenderId int64 `json:"venderId"` } type PopChangeObject struct { //Pop修改 ErpOrderStatus int `json:"erpOrderStatus"` Modified int `json:"modified"` OrderCreateTime int `json:"orderCreateTime"` OrderId int64 `json:"orderId"` OrderPaymentType int `json:"orderPaymentType"` OrderType int `json:"orderType"` VenderId int `json:"venderId"` Yn int `json:"yn"` } type PopCreatOrderMessage struct { //pop订单创建结构体 PopCreatObj PopCreatObject `json:"object"` } type CreatOrderMessage struct { OrderCreatObj PopCreatObject `json:"object"` } type PayOrderMessage struct { OrderPayObject OrderPayObject `json:"object"` } type OrderPayObject struct { OrderId int64 `json:"orderId"` OrderPaymentTime int `json:"orderPaymentTime"` OrderPaymentType int `json:"orderPaymentType"` OrderType int `json:"orderType"` VenderId int `json:"venderId"` OrderStatus int `json:"orderStatus"` } func (t *JqcTask) Do() { handleTopic(t.TopicId) t.Wg.Done() } var jcqApi *jcqapi.API // JCQSchedule creat by Hang // 用于jcq队列返回数据处理 以及业务逻辑 由于在consumeInfo2加了ack 所以不会存在重复拉取的可能性 // 使用ants 同时启动topic数量的携程 进行handle func JCQSchedule() { jcqApi = jcqapi.New(accessKey, secretKey) currentTopic := OrderPayTopic handleTopic(currentTopic) /* p, _ := ants.NewPoolWithFunc(10, func(data interface{}) { task := data.(*JqcTask) task.Do() }) defer p.Release() var wg sync.WaitGroup wg.Add(len(TopicList)) tasks := make([]*JqcTask, 0, len(TopicList)) for i := 0; i < len(TopicList); i++ { task := &JqcTask{ TopicId: TopicList[i], Wg: &wg, } tasks = append(tasks, task) p.Invoke(task) } wg.Wait()*/ } //处理topic 针对不同的topic进行存库操作 func handleTopic(topicId string) { consumeResults, err := jcqApi.ConsumeInfo2(topicId, consumerGroupID, DefaultSubscribeSize) if err == nil { if len(consumeResults) != 0 { entityList := make([]JcqMessage, 0) //构建实体 for _, result := range consumeResults { if topicId == popOrderCreatTopic { var r PopCreatOrderMessage //js, _ := simplejson.NewJson([]byte(result.MessageBody)) json.Unmarshal([]byte(result.MessageBody), &r) fmt.Println(r) } jcq := JcqMessage{ MessageID: result.MessageID, MessageBody: result.MessageBody, BusinessID: result.Properties.BUSINESSID, PropertyRetryTimes: result.Properties.PROPERTYRETRYTIMES, Status: "n", //默认给定初始值 未做处理 MessageType: transType(topicId), } entityList = append(entityList, jcq) } db := dao.GetDB() if err = dao.CreateMultiEntities(db, entityList); err != nil { baseapi.SugarLogger.Warnf("saveMessage orderID:%s, save jqs_message failed with error:%v", entityList, err) } else { //存库成功进行调用API操作 //定义菜市api jdmcshopapi := jdshopapi.New("37d36b62c0d14bd4b872f948b335c95czinj", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608") //定义果园api jdgyshopapi := jdshopapi.New("f9c5ce9a5ce24218936924f7c4864cc9owe1", "E1D746D42474D5F1F1A10CECE75D99F6", "efa7e1d1a22640fa990e6cf164b28608") for _, result := range consumeResults { //处理订单支付,订单取消和pop订单出库的topic if topicId == OrderPayTopic || topicId == OrderCancelTopic || topicId == popOrderOutTopic { var r PayOrderMessage //js, _ := simplejson.NewJson([]byte(result.MessageBody)) json.Unmarshal([]byte(result.MessageBody), &r) orderResult, err := jdmcshopapi.GetOrderById(r.OrderPayObject.OrderId, true) isGy := false if orderResult == nil && err == nil { isGy = true orderResult, err = jdgyshopapi.GetOrderById(r.OrderPayObject.OrderId, true) //查询不到之后说明不是这个账号的 使用果园再查一遍 } if err == nil { //确定返回结果 // TO DO 处理数据转换为原先回调的结果 if orderResult == nil { baseapi.SugarLogger.Debug("未查询到订单ID", r.OrderPayObject.OrderId) } else { callBackResult := TransFormOrderResultToCallBackResult(orderResult, topicId, isGy) jdshop.OnCallbackMsg(callBackResult) //进行处理 } } } } } } baseapi.SugarLogger.Debug("handle Topic the end") } } // 用于转换类型 func transType(topic string) string { switch topic { case popOrderCreatTopic: return popOrderCreatType case popOrderChangeTopic: return popOrderChangeType case popOrderOutTopic: return popOrderOutType case OrderCreatTopic: return OrderCreatType case OrderPayTopic: return OrderPayType case OrderOutTopic: return OrderOutType case OrderCancelTopic: return OrderCancelType default: return "" } } // TransFormOrderResultToCallBackResult 转换数据类型 使其可以进入原先回调方法 func TransFormOrderResultToCallBackResult(getOrderResult *jdshopapi.GetEnOrderResult, topicId string, isGy bool) *jdshopapi.CallBackResult { return &jdshopapi.CallBackResult{ OrderStateRemark: getOrderResult.OrderStateRemark, OrderRemark: getOrderResult.OrderRemark, OrderSellerPrice: getOrderResult.OrderSellerPrice, OrderState: getOrderResult.OrderState, OrderType: getOrderResult.OrderType, OrderPayment: getOrderResult.OrderPayment, PayType: getOrderResult.PayType, StoreID: getOrderResult.StoreID, OrderTotalPrice: getOrderResult.OrderTotalPrice, OrderExt: getOrderResult.OrderExt, StoreOrder: getOrderResult.StoreOrder, OrderStartTime: getOrderResult.OrderStartTime, OrderID: getOrderResult.OrderID, OrderSource: getOrderResult.OrderSource, FreightPrice: getOrderResult.FreightPrice, MsgType: TransMsgType(topicId), Pin:/*splitPin(getOrderResult.Pin),*/ "jd_temp", IDSopShipmenttype: getOrderResult.IDSopShipmenttype, ScDT: getOrderResult.ScDT, SellerDiscount: getOrderResult.SellerDiscount, ConsigneeInfo: getOrderResult.ConsigneeInfo, ItemInfoList: getOrderResult.ItemInfoList, VendorOrgCode: getVendorOrgCode(isGy), MenDianID: getOrderResult.MenDianID, BalanceUsed: getOrderResult.BalanceUsed} } // TransMsgType 目前业务处理函数仅对一下三种topic做了业务处理 func TransMsgType(topicId string) string { switch topicId { case OrderPayTopic: return jcqapi.TopicOrderPay case OrderCancelTopic: return jcqapi.TopicOrderCancel case popOrderOutTopic: return jcqapi.TopicOrderOut default: return "" } } //由于返回的pin长度过长 进行md5操作 压缩长度 func splitPin(pin string) string { h := md5.New() h.Write([]byte(pin)) s := hex.EncodeToString(h.Sum(nil)) return s } // 根据标识判断加密标识 func getVendorOrgCode(isGy bool) string { if isGy { return "2" } else { return "1" } }