From 3b2a5d9b1c387a12355a341d4da234dfaaa1be52 Mon Sep 17 00:00:00 2001 From: gazebo Date: Fri, 26 Apr 2019 17:50:25 +0800 Subject: [PATCH] - eventhub - sys/GetWXToken --- business/jxutils/eventhub/eventhub.go | 263 ++++++++++++++++++ .../eventhub/syseventhub/syseventhub.go | 81 ++++++ business/jxutils/tasks/configrefresh.go | 66 ++++- conf/app.conf | 6 + controllers/sys.go | 31 +++ globals/globals.go | 6 + routers/commentsRouter_controllers.go | 9 + routers/router.go | 5 + 8 files changed, 460 insertions(+), 7 deletions(-) create mode 100644 business/jxutils/eventhub/eventhub.go create mode 100644 business/jxutils/eventhub/syseventhub/syseventhub.go create mode 100644 controllers/sys.go diff --git a/business/jxutils/eventhub/eventhub.go b/business/jxutils/eventhub/eventhub.go new file mode 100644 index 000000000..6beacfa71 --- /dev/null +++ b/business/jxutils/eventhub/eventhub.go @@ -0,0 +1,263 @@ +package eventhub + +import ( + "fmt" + "strings" + "sync" + "time" + + "git.rosy.net.cn/jx-callback/globals" + + "git.rosy.net.cn/baseapi/utils" +) + +const ( + cmdRegisterConsumer = 1 + cmdUnregisterConsumer = 2 + cmdNewEvent = 3 +) + +const ( + maxGetOrderTimeDuration = 24 * time.Hour + minPollingDuration = 1 * time.Minute + defPollingDuration = 5 * time.Minute + maxPollingDuration = 10 * time.Minute +) + +const ( + cmdChanLen = 100 +) + +const ( + allEventType = "" +) + +type IEventProducer interface { + GetEvent(eventTypeList []string, criteria interface{}) (event *EventInfo, err error) + IsCriteriaMatch(eventInfo *EventInfo, criteria interface{}) bool +} + +type EventInfo struct { + Type string + Data interface{} +} + +type tRegisterInfo struct { + notifyChan chan *EventInfo + eventCategory string + eventTypeList []string + criteria interface{} +} + +type tUnregisterInfo struct { + notifyChan chan *EventInfo + closeChan chan struct{} +} + +type tCmd struct { + cmdType int + data interface{} + // regisgerInfo *RegisterInfo // CmdRegisterConsumer + // unregisterInfo *UnregisterInfo // CmdUnregisterConsumer + // eventInfo *EventInfo // CmdNewEvent +} + +type EventHub struct { + cmdChan chan *tCmd + + chanMap map[chan *EventInfo]*tRegisterInfo + typeChanMap map[string]map[chan *EventInfo]bool + + eventCatMap map[string]IEventProducer + locker sync.RWMutex +} + +func eventRoutine(eventHub *EventHub) { + eventHub.eventRoutine() +} + +func New() (eventHub *EventHub) { + eventHub = &EventHub{ + cmdChan: make(chan *tCmd, cmdChanLen), + chanMap: make(map[chan *EventInfo]*tRegisterInfo), + typeChanMap: make(map[string]map[chan *EventInfo]bool), + eventCatMap: make(map[string]IEventProducer), + } + utils.CallFuncAsync(func() { + eventRoutine(eventHub) + }) + return eventHub +} + +func (e *EventHub) eventRoutine() { + for { + cmd, ok := <-e.cmdChan + if ok { + switch cmd.cmdType { + case cmdRegisterConsumer: + regisgerInfo := cmd.data.(*tRegisterInfo) + e.chanMap[regisgerInfo.notifyChan] = regisgerInfo + for _, eventType := range regisgerInfo.eventTypeList { + if e.typeChanMap[eventType] == nil { + e.typeChanMap[eventType] = make(map[chan *EventInfo]bool) + } + e.typeChanMap[eventType][regisgerInfo.notifyChan] = true + } + case cmdUnregisterConsumer: + unregisgerInfo := cmd.data.(*tUnregisterInfo) + e.removeChan(unregisgerInfo.notifyChan) + close(unregisgerInfo.closeChan) + case cmdNewEvent: + eventInfo := cmd.data.(*EventInfo) + typeList := []string{allEventType, eventInfo.Type} + tmpChanMap := make(map[chan *EventInfo]*tRegisterInfo) + for _, eventType := range typeList { + for notifyChan := range e.typeChanMap[eventType] { + tmpChanMap[notifyChan] = e.chanMap[notifyChan] + } + } + for notifyChan, registerInfo := range tmpChanMap { + eventCategory, _ := splitEventType(eventInfo.Type) + if eventProducer := e.getEventProducer(eventCategory); eventProducer != nil { + if eventProducer.IsCriteriaMatch(eventInfo, registerInfo.criteria) { + notifyChan <- eventInfo + e.removeChan(notifyChan) + } + } else { + globals.SugarLogger.Warnf("eventRoutine, eventCategory:%s producer is nil") + } + } + } + } else { + break + } + } +} + +func (e *EventHub) removeChan(notifyChann chan *EventInfo) { + typeList := e.chanMap[notifyChann].eventTypeList + for _, eventType := range typeList { + delete(e.typeChanMap[eventType], notifyChann) + } + delete(e.chanMap, notifyChann) +} + +func (e *EventHub) Close() { + close(e.cmdChan) +} + +func (e *EventHub) RegisterProducer(eventCategory string, producer IEventProducer) (err error) { + e.locker.Lock() + if e.eventCatMap[eventCategory] != nil { + err = fmt.Errorf("eventCategory:%s已经被注册了", eventCategory) + } else { + e.eventCatMap[eventCategory] = producer + } + e.locker.Unlock() + return err +} + +// 正常不应该是动态的经常调用注册与反注册Producer,只是退出进程前礼貌的调用 +func (e *EventHub) UnregisterProducer(eventCategory string) (err error) { + e.locker.Lock() + if e.eventCatMap[eventCategory] == nil { + err = fmt.Errorf("eventCategory:%s没有注册", eventCategory) + } else { + delete(e.eventCatMap, eventCategory) + } + e.locker.Unlock() + return err +} + +func (e *EventHub) registerConsumer(eventCategory string, eventTypeList []string, criteria interface{}) (notifyChan chan *EventInfo) { + if len(eventTypeList) == 0 { + eventTypeList = []string{allEventType} + } + realEventTypeList := make([]string, len(eventTypeList)) + for index, eventType := range eventTypeList { + realEventTypeList[index] = composeEventType(eventCategory, eventType) + } + info := &tRegisterInfo{ + eventCategory: eventCategory, + eventTypeList: eventTypeList, + notifyChan: make(chan *EventInfo, 1), + criteria: criteria, + } + e.cmdChan <- &tCmd{ + cmdType: cmdRegisterConsumer, + data: info, + } + return info.notifyChan +} + +func (e *EventHub) unregisterConsumer(notifyChan chan *EventInfo) { + info := &tUnregisterInfo{ + notifyChan: notifyChan, + closeChan: make(chan struct{}), + } + e.cmdChan <- &tCmd{ + cmdType: cmdUnregisterConsumer, + data: info, + } + <-info.closeChan +} + +func (e *EventHub) PostNewEvent(eventCategory string, event *EventInfo) { + newEvent := *event + newEvent.Type = composeEventType(eventCategory, event.Type) + e.cmdChan <- &tCmd{ + cmdType: cmdNewEvent, + data: event, + } +} + +func (e *EventHub) GetEvent(eventCategory string, eventTypeList []string, criteria interface{}, waitTime time.Duration) (event *EventInfo, err error) { + eventProducer := e.getEventProducer(eventCategory) + if eventProducer == nil { + return nil, fmt.Errorf("eventCategory:%s没有注册", eventCategory) + } + event, err = eventProducer.GetEvent(eventTypeList, criteria) + if err == nil && event == nil { + notifyChan := e.registerConsumer(eventCategory, eventTypeList, criteria) + pollingDuration := defPollingDuration + if waitTime != 0 { + pollingDuration = waitTime + if globals.IsProductEnv() { + if pollingDuration > maxPollingDuration { + pollingDuration = maxPollingDuration + } else if pollingDuration < minPollingDuration { + pollingDuration = minPollingDuration + } + } + } + timer := time.NewTimer(pollingDuration) + select { + case tmpEvent, ok := <-notifyChan: + timer.Stop() + if ok { + event = tmpEvent + } + case <-timer.C: + e.unregisterConsumer(notifyChan) + } + close(notifyChan) + } + return event, err +} + +func (e *EventHub) getEventProducer(eventCategory string) IEventProducer { + e.locker.RLock() + defer e.locker.RUnlock() + return e.eventCatMap[eventCategory] +} + +func composeEventType(eventCategory, eventType string) string { + return eventCategory + "/" + eventType +} + +func splitEventType(eventType string) (eventCategory, pureEventType string) { + typeList := strings.Split(eventType, "/") + eventCategory = typeList[0] + pureEventType = typeList[1] + return eventCategory, pureEventType +} diff --git a/business/jxutils/eventhub/syseventhub/syseventhub.go b/business/jxutils/eventhub/syseventhub/syseventhub.go new file mode 100644 index 000000000..784092f54 --- /dev/null +++ b/business/jxutils/eventhub/syseventhub/syseventhub.go @@ -0,0 +1,81 @@ +package syseventhub + +import ( + "time" + + "git.rosy.net.cn/jx-callback/business/jxutils/eventhub" + "git.rosy.net.cn/jx-callback/globals/api" +) + +const ( + EventCategory = "sys" + + EventTypeWXToken = "wxtoken" +) + +type Hub struct { + eventHub *eventhub.EventHub +} + +type Criteria struct { +} + +var ( + SysEventHub *Hub +) + +func init() { + SysEventHub = New() +} + +func New() (hub *Hub) { + hub = &Hub{ + eventHub: eventhub.New(), + } + hub.eventHub.RegisterProducer(EventCategory, hub) + return hub +} + +func (h *Hub) GetEvent(eventTypeList []string, criteria interface{}) (event *eventhub.EventInfo, err error) { + for _, eventType := range eventTypeList { + switch eventType { + case EventTypeWXToken: + return h.getWXToken(criteria) + } + } + return event, err +} + +func (h *Hub) getWXToken(criteria interface{}) (event *eventhub.EventInfo, err error) { + criteriaToken := "" + if criteria != nil { + criteriaToken = criteria.(string) + } + token := api.WeixinAPI.CBGetToken() + if token != criteriaToken { + return &eventhub.EventInfo{ + Type: EventTypeWXToken, + Data: token, + }, nil + } + return nil, nil +} + +func (h *Hub) IsCriteriaMatch(eventInfo *eventhub.EventInfo, criteria interface{}) bool { + return true +} + +func (h *Hub) OnNewWXToken(token string) { + h.eventHub.PostNewEvent(EventCategory, &eventhub.EventInfo{ + Type: EventTypeWXToken, + Data: token, + }) +} + +func (h *Hub) GetWXToken(oldToken string, waitTime time.Duration) (token string) { + eventInfo, err := h.eventHub.GetEvent(EventCategory, []string{EventTypeWXToken}, oldToken, waitTime) + if err == nil && eventInfo != nil { + token = eventInfo.Data.(string) + } + return token +} diff --git a/business/jxutils/tasks/configrefresh.go b/business/jxutils/tasks/configrefresh.go index aa4b9ba76..120a1316d 100644 --- a/business/jxutils/tasks/configrefresh.go +++ b/business/jxutils/tasks/configrefresh.go @@ -1,8 +1,15 @@ package tasks import ( + "fmt" + "io/ioutil" + "net/http" + "strings" "time" + "git.rosy.net.cn/baseapi/platformapi" + + "git.rosy.net.cn/jx-callback/business/jxutils/eventhub/syseventhub" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/model/legacymodel" @@ -36,6 +43,12 @@ type ElmTokenForCompatible struct { Success bool `json:"success"` } +type CallResult struct { + Code string `json:"code"` + Desc string `json:"desc"` + Data string `json:"data"` +} + func RefreshConfig(configKey string, expiresTime time.Duration, configGetter func() (string, string), configSetter func(value string)) error { sleepGap := expiresTime / 10 needRefreshGap := expiresTime * 8 / 10 @@ -108,18 +121,23 @@ func RefreshConfig(configKey string, expiresTime time.Duration, configGetter fun func RefreshWeixinToken() (err error) { if api.WeixinAPI != nil { - err = RefreshConfig("wechat", weixinTokenExpires, func() (string, string) { + err = RefreshConfig("wechat", weixinTokenExpires, func() (token string, expireTimeStr string) { globals.SugarLogger.Debugf("RefreshWeixinToken RunMode:%s", beego.BConfig.RunMode) - if beego.BConfig.RunMode == "prod" { - if tokenInfo, err := api.WeixinAPI.CBRetrieveToken(); err == nil { - globals.SugarLogger.Debugf("RefreshWeixinToken tokenInfo:%s", utils.Format4Output(tokenInfo, true)) - return tokenInfo.AccessToken, "" + if globals.IsProductEnv() { + if beego.BConfig.RunMode == "prod" { + if tokenInfo, err := api.WeixinAPI.CBRetrieveToken(); err == nil { + globals.SugarLogger.Debugf("RefreshWeixinToken tokenInfo:%s", utils.Format4Output(tokenInfo, true)) + token = tokenInfo.AccessToken + } else { + globals.SugarLogger.Errorf("RefreshWeixinToken RefreshToken failed with error:%v", err) + } } else { - globals.SugarLogger.Errorf("RefreshWeixinToken RefreshToken failed with error:%v", err) + token = getWXTokenFromProd(api.WeixinAPI.CBGetToken()) } } - return "", "" + return token, "" }, func(value string) { + syseventhub.SysEventHub.OnNewWXToken(value) api.WeixinAPI.CBSetToken(value) }) } @@ -223,3 +241,37 @@ func RefreshYilianyunToken() (err error) { } }) } + +func getWXTokenFromProd(oldToken string) (token string) { + token = oldToken + if globals.GetWeixinTokenKey != "" && globals.GetWeixinTokenURL != "" { + for { + waitSecond := 5 * 60 + response, err := http.Get(fmt.Sprintf("%s?accessKey=%s&oldToken=%s&waitSecond=%d", globals.GetWeixinTokenURL, globals.GetWeixinTokenKey, oldToken, waitSecond)) + if err == nil { + defer response.Body.Close() + if response.StatusCode == http.StatusOK { + data, err2 := ioutil.ReadAll(response.Body) + if err = err2; err == nil { + var result CallResult + if err = utils.UnmarshalUseNumber(data, &result); err == nil { + if result.Code == "0" { + token = strings.Replace(result.Data, "\"", "", -1) + break + } else { + err = fmt.Errorf("return code is:%s", result.Code) + } + } + } + } else { + err = platformapi.ErrHTTPCodeIsNot200 + } + } + if err != nil { + globals.SugarLogger.Infof("getWXTokenFromProd failed with error:%v", err) + time.Sleep(30 * time.Second) + } + } + } + return token +} diff --git a/conf/app.conf b/conf/app.conf index 19a69713c..f2a3decd9 100644 --- a/conf/app.conf +++ b/conf/app.conf @@ -81,6 +81,8 @@ yilianyunClientSecret = "4885d07c2997b661102e4b6099c0bf3b" zhongwuAppID = 8000192 zhongwuAppSecret = "29435497822f52f3cf659c65da548a79" +getWeixinTokenKey = "c928ed0d-87a3-441a-8517-f92f0167296f" + [dev] jdToken = "df97f334-f7d8-4b36-9664-5784d8ae0baf" jdAppKey = "06692746f7224695ad4788ce340bc854" @@ -107,6 +109,8 @@ weixinToken = "17_roSCZgkCxhRnyFVtei0KdfHwdGP8PmLzJFhCieka4_X4_d-lgfaTxF6oIS6FE5 dbConnectStr = "root:WebServer@1@tcp(127.0.0.1:3306)/jxd_dev_0?charset=utf8mb4&loc=Local&parseTime=true" +getWeixinTokenURL = "http://beta.jxc4.com/v2/sys/GetWXToken" + [prod] EnableDocs = false @@ -192,6 +196,8 @@ dingdingQRCodeSecret = "N9dyC9qB84sauQPs4_JYrILMsG5Krqm9-PSSVJ8t9hb87rrHiFUirISx dingdingCallbackURL = "http://callback-jxgy.jxc4.com/dingding/msg" +getWeixinTokenURL = "http://www.jxc4.com/v2/sys/GetWXToken" + [prod2] httpport = 8082 diff --git a/controllers/sys.go b/controllers/sys.go new file mode 100644 index 000000000..49ec6a78f --- /dev/null +++ b/controllers/sys.go @@ -0,0 +1,31 @@ +package controllers + +import ( + "time" + + "git.rosy.net.cn/jx-callback/globals" + + "git.rosy.net.cn/jx-callback/business/jxutils/eventhub/syseventhub" + "github.com/astaxie/beego" +) + +type SysController struct { + beego.Controller +} + +// @Title 得到京西门店信息 +// @Description 得到京西门店信息,如下条件之间是与的关系 +// @Param accessKey query string true "假token" +// @Param oldToken query string true "之前的token" +// @Param waitSecond query int false "等待秒数" +// @Success 200 {object} controllers.CallResult +// @Failure 200 {object} controllers.CallResult +// @router /GetWXToken [get] +func (c *SysController) GetWXToken() { + c.callGetWXToken(func(params *tSysGetWXTokenParams) (retVal interface{}, errCode string, err error) { + if params.AccessKey == globals.GetWeixinTokenKey { + retVal = syseventhub.SysEventHub.GetWXToken(params.OldToken, time.Duration(params.WaitSecond)*time.Second) + } + return retVal, "", err + }) +} diff --git a/globals/globals.go b/globals/globals.go index 22a71c863..eab441301 100644 --- a/globals/globals.go +++ b/globals/globals.go @@ -37,6 +37,9 @@ var ( BackstageHost string WxBackstageHost string + + GetWeixinTokenURL string + GetWeixinTokenKey string ) func init() { @@ -72,6 +75,9 @@ func Init() { BackstageHost = beego.AppConfig.DefaultString("backstageHost", "") WxBackstageHost = beego.AppConfig.DefaultString("wxBackstageHost", "") + + GetWeixinTokenURL = beego.AppConfig.DefaultString("getWeixinTokenURL", "") + GetWeixinTokenKey = beego.AppConfig.DefaultString("getWeixinTokenKey", "") } func IsCallbackAlwaysReturnSuccess() bool { diff --git a/routers/commentsRouter_controllers.go b/routers/commentsRouter_controllers.go index da6a6b99e..1261ef2b0 100644 --- a/routers/commentsRouter_controllers.go +++ b/routers/commentsRouter_controllers.go @@ -1267,6 +1267,15 @@ func init() { Filters: nil, Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SysController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:SysController"], + beego.ControllerComments{ + Method: "GetWXToken", + Router: `/GetWXToken`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"] = append(beego.GlobalControllerRouter["git.rosy.net.cn/jx-callback/controllers:TaskController"], beego.ControllerComments{ Method: "CancelTask", diff --git a/routers/router.go b/routers/router.go index 72a2a7e2d..d6a0285aa 100644 --- a/routers/router.go +++ b/routers/router.go @@ -101,6 +101,11 @@ func init() { &controllers.TempOpController{}, ), ), + beego.NSNamespace("/sys", + beego.NSInclude( + &controllers.SysController{}, + ), + ), ) beego.AddNamespace(ns)