Files
jx-callback/business/jxutils/eventhub/eventhub.go
2019-10-30 16:19:26 +08:00

261 lines
6.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package eventhub
import (
"fmt"
"strings"
"sync"
"time"
"git.rosy.net.cn/jx-callback/globals"
"git.rosy.net.cn/baseapi/utils"
)
const (
cmdRegisterConsumer = 1
cmdUnregisterConsumer = 2
cmdNewEvent = 3
)
const (
maxGetOrderTimeDuration = 24 * time.Hour
minPollingDuration = 1 * time.Minute
defPollingDuration = 5 * time.Minute
maxPollingDuration = 10 * time.Minute
)
const (
cmdChanLen = 100
)
const (
allEventType = ""
)
type IEventProducer interface {
IsCriteriaMatch(eventInfo *EventInfo, criteria interface{}) bool
}
type EventInfo struct {
Type string
Data interface{}
}
type tRegisterInfo struct {
notifyChan chan *EventInfo
eventTypeList []string
criteria interface{}
}
type tUnregisterInfo struct {
notifyChan chan *EventInfo
closeChan chan struct{}
}
type tCmd struct {
cmdType int
data interface{}
// regisgerInfo *RegisterInfo // CmdRegisterConsumer
// unregisterInfo *UnregisterInfo // CmdUnregisterConsumer
// eventInfo *EventInfo // CmdNewEvent
}
type EventHub struct {
cmdChan chan *tCmd
chanMap map[chan *EventInfo]*tRegisterInfo
typeChanMap map[string]map[chan *EventInfo]bool
eventCatMap map[string]IEventProducer
locker sync.RWMutex
}
func eventRoutine(eventHub *EventHub) {
eventHub.eventRoutine()
}
func New() (eventHub *EventHub) {
eventHub = &EventHub{
cmdChan: make(chan *tCmd, cmdChanLen),
chanMap: make(map[chan *EventInfo]*tRegisterInfo),
typeChanMap: make(map[string]map[chan *EventInfo]bool),
eventCatMap: make(map[string]IEventProducer),
}
utils.CallFuncAsync(func() {
eventRoutine(eventHub)
})
return eventHub
}
func (e *EventHub) eventRoutine() {
for {
cmd, ok := <-e.cmdChan
if ok {
switch cmd.cmdType {
case cmdRegisterConsumer:
regisgerInfo := cmd.data.(*tRegisterInfo)
e.chanMap[regisgerInfo.notifyChan] = regisgerInfo
for _, eventType := range regisgerInfo.eventTypeList {
if e.typeChanMap[eventType] == nil {
e.typeChanMap[eventType] = make(map[chan *EventInfo]bool)
}
e.typeChanMap[eventType][regisgerInfo.notifyChan] = true
}
case cmdUnregisterConsumer:
unregisgerInfo := cmd.data.(*tUnregisterInfo)
e.removeChan(unregisgerInfo.notifyChan)
close(unregisgerInfo.closeChan)
case cmdNewEvent:
eventInfo := cmd.data.(*EventInfo)
typeList := []string{allEventType, eventInfo.Type}
tmpChanMap := make(map[chan *EventInfo]*tRegisterInfo)
for _, eventType := range typeList {
for notifyChan := range e.typeChanMap[eventType] {
tmpChanMap[notifyChan] = e.chanMap[notifyChan]
}
}
for notifyChan, registerInfo := range tmpChanMap {
eventCategory, _ := splitEventType(eventInfo.Type)
if eventProducer := e.getEventProducer(eventCategory); eventProducer != nil {
if eventProducer.IsCriteriaMatch(eventInfo, registerInfo.criteria) {
notifyChan <- eventInfo
e.removeChan(notifyChan)
}
} else {
globals.SugarLogger.Warnf("eventRoutine, eventCategory:%s producer is nil", eventCategory)
}
}
}
} else {
break
}
}
}
func (e *EventHub) removeChan(notifyChann chan *EventInfo) {
typeList := e.chanMap[notifyChann].eventTypeList
for _, eventType := range typeList {
delete(e.typeChanMap[eventType], notifyChann)
}
delete(e.chanMap, notifyChann)
}
func (e *EventHub) Close() {
close(e.cmdChan)
}
func (e *EventHub) RegisterProducer(eventCategory string, producer IEventProducer) (err error) {
e.locker.Lock()
if e.eventCatMap[eventCategory] != nil {
err = fmt.Errorf("eventCategory:%s已经被注册了", eventCategory)
} else {
e.eventCatMap[eventCategory] = producer
}
e.locker.Unlock()
return err
}
// 正常不应该是动态的经常调用注册与反注册Producer只是退出进程前礼貌的调用
func (e *EventHub) UnregisterProducer(eventCategory string) (err error) {
e.locker.Lock()
if e.eventCatMap[eventCategory] == nil {
err = fmt.Errorf("eventCategory:%s没有注册", eventCategory)
} else {
delete(e.eventCatMap, eventCategory)
}
e.locker.Unlock()
return err
}
func (e *EventHub) registerConsumer(eventCategory string, eventTypeList []string, criteria interface{}) (notifyChan chan *EventInfo) {
if len(eventTypeList) == 0 {
eventTypeList = []string{allEventType}
}
realEventTypeList := make([]string, len(eventTypeList))
for index, eventType := range eventTypeList {
realEventTypeList[index] = composeEventType(eventCategory, eventType)
}
info := &tRegisterInfo{
eventTypeList: realEventTypeList,
notifyChan: make(chan *EventInfo, 1),
criteria: criteria,
}
e.cmdChan <- &tCmd{
cmdType: cmdRegisterConsumer,
data: info,
}
return info.notifyChan
}
func (e *EventHub) unregisterConsumer(notifyChan chan *EventInfo) {
info := &tUnregisterInfo{
notifyChan: notifyChan,
closeChan: make(chan struct{}),
}
e.cmdChan <- &tCmd{
cmdType: cmdUnregisterConsumer,
data: info,
}
<-info.closeChan
}
func (e *EventHub) PostNewEvent(eventCategory string, event *EventInfo) {
utils.CallFuncAsync(func() {
newEvent := *event
newEvent.Type = composeEventType(eventCategory, event.Type)
e.cmdChan <- &tCmd{
cmdType: cmdNewEvent,
data: &newEvent,
}
})
}
func (e *EventHub) GetEvent(eventCategory string, eventTypeList []string, criteria interface{}, waitTime time.Duration) (event *EventInfo, err error) {
notifyChan := e.registerConsumer(eventCategory, eventTypeList, criteria)
pollingDuration := defPollingDuration
if waitTime != 0 {
pollingDuration = waitTime
if globals.IsProductEnv() {
if pollingDuration > maxPollingDuration {
pollingDuration = maxPollingDuration
} else if pollingDuration < minPollingDuration {
pollingDuration = minPollingDuration
}
}
}
globals.SugarLogger.Debugf("GetEvent pollingDuration:%d seconds", pollingDuration/time.Second)
timer := time.NewTimer(pollingDuration)
select {
case tmpEvent, ok := <-notifyChan:
timer.Stop()
if ok {
event = tmpEvent
}
case <-timer.C:
e.unregisterConsumer(notifyChan)
}
close(notifyChan)
return event, err
}
func (e *EventHub) getEventProducer(eventCategory string) IEventProducer {
e.locker.RLock()
defer e.locker.RUnlock()
return e.eventCatMap[eventCategory]
}
func composeEventType(eventCategory, eventType string) string {
return eventCategory + "/" + eventType
}
func splitEventType(eventType string) (eventCategory, pureEventType string) {
typeList := strings.Split(eventType, "/")
eventCategory = typeList[0]
if len(typeList) > 1 {
pureEventType = typeList[1]
} else {
globals.SugarLogger.Warnf("splitEventType eventType:%s", eventType)
}
return eventCategory, pureEventType
}