161 lines
5.1 KiB
Go
161 lines
5.1 KiB
Go
package jcqapi
|
|
|
|
import (
|
|
"crypto/hmac"
|
|
"crypto/sha1"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.rosy.net.cn/baseapi/platformapi"
|
|
"git.rosy.net.cn/baseapi/utils"
|
|
)
|
|
|
|
const (
|
|
sigKey = "signature"
|
|
httpURL = "http://jcq-hb-yd-001-httpsrv-nlb-FI.jvessel-open-hb.jdcloud.com:8080"
|
|
httpURL2 = "http://114.67.73.174:8080/msg"
|
|
|
|
ConsumerGroupID = "open_message_573819178445" //所有topic都是这个
|
|
ConsumerGroupID2 = "open_message_413277234485" //所有topic都是这个
|
|
|
|
TopicSuffix = "_E1D746D42474D5F1F1A10CECE75D99F6"
|
|
TopicSuffix2 = "_D5E8352BE0786ED16F77B4548F62F09A"
|
|
|
|
TopicRemkChg = "open_message_pop_order_remk_chg" //POP订单备注变更
|
|
TopicOrderOut = "open_message_pop_order_out" //POP订单出库
|
|
TopicOrderCreate = "open_message_pop_order_create" //POP订单创建
|
|
TopicOrderChange = "open_message_pop_order_change" //POP订单变更消息
|
|
TopicOrderPay = "open_message_order_order_pay" //订单付款
|
|
TopicOrderFinish = "open_message_order_order_finish" //订单完成
|
|
TopicOrderCancel = "open_message_order_order_cancel" //订单取消
|
|
|
|
Size = 10 //默认一次取10条
|
|
)
|
|
|
|
var (
|
|
TopicList = []string{
|
|
"open_message_order_order_pay", //订单付款
|
|
"open_message_pop_order_out", //POP订单出库
|
|
// "open_message_pop_order_create", //POP订单创建
|
|
// "open_message_pop_order_change", //POP订单变更消息
|
|
// "open_message_order_order_finish", //订单完成
|
|
"open_message_order_order_cancel", //订单取消
|
|
// "open_message_pop_order_remk_chg", //POP订单备注变更
|
|
}
|
|
)
|
|
|
|
type API struct {
|
|
platformapi.APICookie
|
|
|
|
accessKey string
|
|
secretKey string
|
|
client *http.Client
|
|
config *platformapi.APIConfig
|
|
}
|
|
|
|
func New(accessKey, secretKey string, config ...*platformapi.APIConfig) *API {
|
|
curConfig := platformapi.DefAPIConfig
|
|
if len(config) > 0 {
|
|
curConfig = *config[0]
|
|
}
|
|
return &API{
|
|
accessKey: accessKey,
|
|
secretKey: secretKey,
|
|
client: &http.Client{Timeout: curConfig.ClientTimeout},
|
|
config: &curConfig,
|
|
}
|
|
}
|
|
|
|
func (a *API) signParam(params map[string]interface{}) (sig string) {
|
|
var valueList []string
|
|
for k, v := range params {
|
|
if k != sigKey {
|
|
if str := fmt.Sprint(v); str != "" {
|
|
valueList = append(valueList, fmt.Sprintf("%s=%s", k, str))
|
|
}
|
|
}
|
|
}
|
|
sort.Sort(sort.StringSlice(valueList))
|
|
sig = strings.Join(valueList, "&")
|
|
key := []byte(a.secretKey)
|
|
mac := hmac.New(sha1.New, key)
|
|
mac.Write([]byte(sig))
|
|
sEnc := base64.StdEncoding.EncodeToString(mac.Sum(nil))
|
|
return sEnc
|
|
}
|
|
|
|
func (a *API) AccessAPI(action string, url string, bizParams map[string]interface{}) (retVal map[string]interface{}, err error) {
|
|
params := make(map[string]interface{})
|
|
time := time.Now().UTC().Format(time.RFC3339)
|
|
params["accessKey"] = a.accessKey
|
|
params["dateTime"] = time
|
|
params = utils.MergeMaps(params, bizParams)
|
|
signStr := a.signParam(params)
|
|
delete(params, "accessKey")
|
|
delete(params, "dateTime")
|
|
err = platformapi.AccessPlatformAPIWithRetry(a.client,
|
|
func() *http.Request {
|
|
request, _ := http.NewRequest(http.MethodGet, utils.GenerateGetURL(url, action, params), nil)
|
|
request.Header.Set("accessKey", a.accessKey)
|
|
request.Header.Set("dateTime", time)
|
|
request.Header.Set("signature", signStr)
|
|
return request
|
|
},
|
|
a.config,
|
|
func(response *http.Response, bodyStr string, jsonResult1 map[string]interface{}) (errLevel string, err error) {
|
|
if jsonResult1 == nil {
|
|
return platformapi.ErrLevelRecoverableErr, fmt.Errorf("mapData is nil")
|
|
}
|
|
if err == nil {
|
|
if jsonResult1["error"] != nil {
|
|
errLevel = platformapi.ErrLevelGeneralFail
|
|
err = utils.NewErrorCode(jsonResult1["error"].(map[string]interface{})["message"].(string), jsonResult1["error"].(map[string]interface{})["code"].(string))
|
|
}
|
|
retVal = jsonResult1
|
|
}
|
|
return errLevel, err
|
|
})
|
|
return retVal, err
|
|
}
|
|
|
|
type ConsumeInfoResult struct {
|
|
MessageID string `json:"messageId"`
|
|
MessageBody string `json:"messageBody"`
|
|
Properties struct {
|
|
BUSINESSID string `json:"BUSINESS_ID"`
|
|
PROPERTYRETRYTIMES string `json:"PROPERTY_RETRY_TIMES"`
|
|
} `json:"properties"`
|
|
}
|
|
|
|
//消费信息
|
|
//https://docs.jdcloud.com/cn/message-queue/consume-message
|
|
func (a *API) ConsumeInfo(topic, consumerGroupId string, size int) (consumeInfoResult []*ConsumeInfoResult, err error) {
|
|
result, err := a.AccessAPI("v1/messages", httpURL, map[string]interface{}{
|
|
"topic": topic,
|
|
"consumerGroupId": consumerGroupId,
|
|
"size": size,
|
|
// "ack": "true", //默认消费之后就确认
|
|
})
|
|
if err == nil {
|
|
utils.Map2StructByJson(result["result"].(map[string]interface{})["messages"], &consumeInfoResult, false)
|
|
}
|
|
return consumeInfoResult, err
|
|
}
|
|
|
|
func (a *API) ConsumeInfo2(topic, consumerGroupId string, size int) (consumeInfoResult []*ConsumeInfoResult, err error) {
|
|
result, err := a.AccessAPI("v2/messages", httpURL2, map[string]interface{}{
|
|
"topic": topic,
|
|
"consumerGroupId": consumerGroupId,
|
|
"size": size,
|
|
//"ack": "true", //默认消费之后就确认
|
|
})
|
|
if err == nil {
|
|
utils.Map2StructByJson(result["result"].(map[string]interface{})["messages"], &consumeInfoResult, false)
|
|
}
|
|
return consumeInfoResult, err
|
|
}
|