package jcqapi import ( "crypto/hmac" "crypto/sha1" "encoding/base64" "fmt" "net/http" "sort" "strings" "time" "git.rosy.net.cn/baseapi" "git.rosy.net.cn/baseapi/platformapi" "git.rosy.net.cn/baseapi/utils" ) const ( sigKey = "signature" httpURL = "http://jcq-hb-yd-001-httpsrv-nlb-FI.jvessel-open-hb.jdcloud.com:8080" httpURL2 = "http://114.67.73.174:8080/msg" ConsumerGroupID = "open_message_573819178445" //所有topic都是这个 ConsumerGroupID2 = "open_message_413277234485" //所有topic都是这个 TopicSuffix = "_E1D746D42474D5F1F1A10CECE75D99F6" TopicSuffix2 = "_D5E8352BE0786ED16F77B4548F62F09A" TopicRemkChg = "open_message_pop_order_remk_chg" //POP订单备注变更 TopicOrderOut = "open_message_pop_order_out" //POP订单出库 TopicOrderCreate = "open_message_pop_order_create" //POP订单创建 TopicOrderChange = "open_message_pop_order_change" //POP订单变更消息 TopicOrderPay = "open_message_order_order_pay" //订单付款 TopicOrderFinish = "open_message_order_order_finish" //订单完成 TopicOrderCancel = "open_message_order_order_cancel" //订单取消 Size = 10 //默认一次取10条 ) var ( TopicList = []string{ "open_message_order_order_pay", //订单付款 "open_message_pop_order_out", //POP订单出库 // "open_message_pop_order_create", //POP订单创建 // "open_message_pop_order_change", //POP订单变更消息 // "open_message_order_order_finish", //订单完成 "open_message_order_order_cancel", //订单取消 // "open_message_pop_order_remk_chg", //POP订单备注变更 } ) type API struct { platformapi.APICookie accessKey string secretKey string client *http.Client config *platformapi.APIConfig } func New(accessKey, secretKey string, config ...*platformapi.APIConfig) *API { curConfig := platformapi.DefAPIConfig if len(config) > 0 { curConfig = *config[0] } return &API{ accessKey: accessKey, secretKey: secretKey, client: &http.Client{Timeout: curConfig.ClientTimeout}, config: &curConfig, } } func (a *API) signParam(params map[string]interface{}) (sig string) { var valueList []string for k, v := range params { if k != sigKey { if str := fmt.Sprint(v); str != "" { valueList = append(valueList, fmt.Sprintf("%s=%s", k, str)) } } } sort.Sort(sort.StringSlice(valueList)) sig = strings.Join(valueList, "&") key := []byte(a.secretKey) mac := hmac.New(sha1.New, key) mac.Write([]byte(sig)) sEnc := base64.StdEncoding.EncodeToString(mac.Sum(nil)) return sEnc } func (a *API) AccessAPI(action string, url string, bizParams map[string]interface{}) (retVal map[string]interface{}, err error) { params := make(map[string]interface{}) time := time.Now().UTC().Format(time.RFC3339) params["accessKey"] = a.accessKey params["dateTime"] = time params = utils.MergeMaps(params, bizParams) signStr := a.signParam(params) delete(params, "accessKey") delete(params, "dateTime") err = platformapi.AccessPlatformAPIWithRetry(a.client, func() *http.Request { request, _ := http.NewRequest(http.MethodGet, utils.GenerateGetURL(url, action, params), nil) request.Header.Set("accessKey", a.accessKey) request.Header.Set("dateTime", time) request.Header.Set("signature", signStr) return request }, a.config, func(response *http.Response, bodyStr string, jsonResult1 map[string]interface{}) (errLevel string, err error) { if jsonResult1 == nil { return platformapi.ErrLevelRecoverableErr, fmt.Errorf("mapData is nil") } if err == nil { if jsonResult1["error"] != nil { errLevel = platformapi.ErrLevelGeneralFail err = utils.NewErrorCode(jsonResult1["error"].(map[string]interface{})["message"].(string), jsonResult1["error"].(map[string]interface{})["code"].(string)) baseapi.SugarLogger.Debugf("jdeclp AccessAPI failed, jsonResult1:%s", utils.Format4Output(jsonResult1, true)) } retVal = jsonResult1 } return errLevel, err }) return retVal, err } type ConsumeInfoResult struct { MessageID string `json:"messageId"` MessageBody string `json:"messageBody"` Properties struct { BUSINESSID string `json:"BUSINESS_ID"` PROPERTYRETRYTIMES string `json:"PROPERTY_RETRY_TIMES"` } `json:"properties"` } //消费信息 //https://docs.jdcloud.com/cn/message-queue/consume-message func (a *API) ConsumeInfo(topic, consumerGroupId string, size int) (consumeInfoResult []*ConsumeInfoResult, err error) { result, err := a.AccessAPI("v1/messages", httpURL, map[string]interface{}{ "topic": topic, "consumerGroupId": consumerGroupId, "size": size, // "ack": "true", //默认消费之后就确认 }) if err == nil { utils.Map2StructByJson(result["result"].(map[string]interface{})["messages"], &consumeInfoResult, false) } return consumeInfoResult, err } func (a *API) ConsumeInfo2(topic, consumerGroupId string, size int) (consumeInfoResult []*ConsumeInfoResult, err error) { result, err := a.AccessAPI("v2/messages", httpURL2, map[string]interface{}{ "topic": topic, "consumerGroupId": consumerGroupId, "size": size, //"ack": "true", //默认消费之后就确认 }) if err == nil { utils.Map2StructByJson(result["result"].(map[string]interface{})["messages"], &consumeInfoResult, false) } return consumeInfoResult, err }