256 lines
6.4 KiB
Go
256 lines
6.4 KiB
Go
package eventhub
|
||
|
||
import (
|
||
"fmt"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"git.rosy.net.cn/jx-callback/globals"
|
||
|
||
"git.rosy.net.cn/baseapi/utils"
|
||
)
|
||
|
||
const (
|
||
cmdRegisterConsumer = 1
|
||
cmdUnregisterConsumer = 2
|
||
cmdNewEvent = 3
|
||
)
|
||
|
||
const (
|
||
maxGetOrderTimeDuration = 24 * time.Hour
|
||
minPollingDuration = 1 * time.Minute
|
||
defPollingDuration = 5 * time.Minute
|
||
maxPollingDuration = 10 * time.Minute
|
||
)
|
||
|
||
const (
|
||
cmdChanLen = 100
|
||
)
|
||
|
||
const (
|
||
allEventType = ""
|
||
)
|
||
|
||
type IEventProducer interface {
|
||
IsCriteriaMatch(eventInfo *EventInfo, criteria interface{}) bool
|
||
}
|
||
|
||
type EventInfo struct {
|
||
Type string
|
||
Data interface{}
|
||
}
|
||
|
||
type tRegisterInfo struct {
|
||
notifyChan chan *EventInfo
|
||
eventCategory string
|
||
eventTypeList []string
|
||
criteria interface{}
|
||
}
|
||
|
||
type tUnregisterInfo struct {
|
||
notifyChan chan *EventInfo
|
||
closeChan chan struct{}
|
||
}
|
||
|
||
type tCmd struct {
|
||
cmdType int
|
||
data interface{}
|
||
// regisgerInfo *RegisterInfo // CmdRegisterConsumer
|
||
// unregisterInfo *UnregisterInfo // CmdUnregisterConsumer
|
||
// eventInfo *EventInfo // CmdNewEvent
|
||
}
|
||
|
||
type EventHub struct {
|
||
cmdChan chan *tCmd
|
||
|
||
chanMap map[chan *EventInfo]*tRegisterInfo
|
||
typeChanMap map[string]map[chan *EventInfo]bool
|
||
|
||
eventCatMap map[string]IEventProducer
|
||
locker sync.RWMutex
|
||
}
|
||
|
||
func eventRoutine(eventHub *EventHub) {
|
||
eventHub.eventRoutine()
|
||
}
|
||
|
||
func New() (eventHub *EventHub) {
|
||
eventHub = &EventHub{
|
||
cmdChan: make(chan *tCmd, cmdChanLen),
|
||
chanMap: make(map[chan *EventInfo]*tRegisterInfo),
|
||
typeChanMap: make(map[string]map[chan *EventInfo]bool),
|
||
eventCatMap: make(map[string]IEventProducer),
|
||
}
|
||
utils.CallFuncAsync(func() {
|
||
eventRoutine(eventHub)
|
||
})
|
||
return eventHub
|
||
}
|
||
|
||
func (e *EventHub) eventRoutine() {
|
||
for {
|
||
cmd, ok := <-e.cmdChan
|
||
if ok {
|
||
switch cmd.cmdType {
|
||
case cmdRegisterConsumer:
|
||
regisgerInfo := cmd.data.(*tRegisterInfo)
|
||
e.chanMap[regisgerInfo.notifyChan] = regisgerInfo
|
||
for _, eventType := range regisgerInfo.eventTypeList {
|
||
if e.typeChanMap[eventType] == nil {
|
||
e.typeChanMap[eventType] = make(map[chan *EventInfo]bool)
|
||
}
|
||
e.typeChanMap[eventType][regisgerInfo.notifyChan] = true
|
||
}
|
||
case cmdUnregisterConsumer:
|
||
unregisgerInfo := cmd.data.(*tUnregisterInfo)
|
||
e.removeChan(unregisgerInfo.notifyChan)
|
||
close(unregisgerInfo.closeChan)
|
||
case cmdNewEvent:
|
||
eventInfo := cmd.data.(*EventInfo)
|
||
typeList := []string{allEventType, eventInfo.Type}
|
||
tmpChanMap := make(map[chan *EventInfo]*tRegisterInfo)
|
||
for _, eventType := range typeList {
|
||
for notifyChan := range e.typeChanMap[eventType] {
|
||
tmpChanMap[notifyChan] = e.chanMap[notifyChan]
|
||
}
|
||
}
|
||
for notifyChan, registerInfo := range tmpChanMap {
|
||
eventCategory, _ := splitEventType(eventInfo.Type)
|
||
if eventProducer := e.getEventProducer(eventCategory); eventProducer != nil {
|
||
if eventProducer.IsCriteriaMatch(eventInfo, registerInfo.criteria) {
|
||
notifyChan <- eventInfo
|
||
e.removeChan(notifyChan)
|
||
}
|
||
} else {
|
||
globals.SugarLogger.Warnf("eventRoutine, eventCategory:%s producer is nil")
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
func (e *EventHub) removeChan(notifyChann chan *EventInfo) {
|
||
typeList := e.chanMap[notifyChann].eventTypeList
|
||
for _, eventType := range typeList {
|
||
delete(e.typeChanMap[eventType], notifyChann)
|
||
}
|
||
delete(e.chanMap, notifyChann)
|
||
}
|
||
|
||
func (e *EventHub) Close() {
|
||
close(e.cmdChan)
|
||
}
|
||
|
||
func (e *EventHub) RegisterProducer(eventCategory string, producer IEventProducer) (err error) {
|
||
e.locker.Lock()
|
||
if e.eventCatMap[eventCategory] != nil {
|
||
err = fmt.Errorf("eventCategory:%s已经被注册了", eventCategory)
|
||
} else {
|
||
e.eventCatMap[eventCategory] = producer
|
||
}
|
||
e.locker.Unlock()
|
||
return err
|
||
}
|
||
|
||
// 正常不应该是动态的经常调用注册与反注册Producer,只是退出进程前礼貌的调用
|
||
func (e *EventHub) UnregisterProducer(eventCategory string) (err error) {
|
||
e.locker.Lock()
|
||
if e.eventCatMap[eventCategory] == nil {
|
||
err = fmt.Errorf("eventCategory:%s没有注册", eventCategory)
|
||
} else {
|
||
delete(e.eventCatMap, eventCategory)
|
||
}
|
||
e.locker.Unlock()
|
||
return err
|
||
}
|
||
|
||
func (e *EventHub) registerConsumer(eventCategory string, eventTypeList []string, criteria interface{}) (notifyChan chan *EventInfo) {
|
||
if len(eventTypeList) == 0 {
|
||
eventTypeList = []string{allEventType}
|
||
}
|
||
realEventTypeList := make([]string, len(eventTypeList))
|
||
for index, eventType := range eventTypeList {
|
||
realEventTypeList[index] = composeEventType(eventCategory, eventType)
|
||
}
|
||
info := &tRegisterInfo{
|
||
eventCategory: eventCategory,
|
||
eventTypeList: eventTypeList,
|
||
notifyChan: make(chan *EventInfo, 1),
|
||
criteria: criteria,
|
||
}
|
||
e.cmdChan <- &tCmd{
|
||
cmdType: cmdRegisterConsumer,
|
||
data: info,
|
||
}
|
||
return info.notifyChan
|
||
}
|
||
|
||
func (e *EventHub) unregisterConsumer(notifyChan chan *EventInfo) {
|
||
info := &tUnregisterInfo{
|
||
notifyChan: notifyChan,
|
||
closeChan: make(chan struct{}),
|
||
}
|
||
e.cmdChan <- &tCmd{
|
||
cmdType: cmdUnregisterConsumer,
|
||
data: info,
|
||
}
|
||
<-info.closeChan
|
||
}
|
||
|
||
func (e *EventHub) PostNewEvent(eventCategory string, event *EventInfo) {
|
||
newEvent := *event
|
||
newEvent.Type = composeEventType(eventCategory, event.Type)
|
||
e.cmdChan <- &tCmd{
|
||
cmdType: cmdNewEvent,
|
||
data: event,
|
||
}
|
||
}
|
||
|
||
func (e *EventHub) GetEvent(eventCategory string, eventTypeList []string, criteria interface{}, waitTime time.Duration) (event *EventInfo, err error) {
|
||
notifyChan := e.registerConsumer(eventCategory, eventTypeList, criteria)
|
||
pollingDuration := defPollingDuration
|
||
if waitTime != 0 {
|
||
pollingDuration = waitTime
|
||
if globals.IsProductEnv() {
|
||
if pollingDuration > maxPollingDuration {
|
||
pollingDuration = maxPollingDuration
|
||
} else if pollingDuration < minPollingDuration {
|
||
pollingDuration = minPollingDuration
|
||
}
|
||
}
|
||
}
|
||
timer := time.NewTimer(pollingDuration)
|
||
select {
|
||
case tmpEvent, ok := <-notifyChan:
|
||
timer.Stop()
|
||
if ok {
|
||
event = tmpEvent
|
||
}
|
||
case <-timer.C:
|
||
e.unregisterConsumer(notifyChan)
|
||
}
|
||
close(notifyChan)
|
||
return event, err
|
||
}
|
||
|
||
func (e *EventHub) getEventProducer(eventCategory string) IEventProducer {
|
||
e.locker.RLock()
|
||
defer e.locker.RUnlock()
|
||
return e.eventCatMap[eventCategory]
|
||
}
|
||
|
||
func composeEventType(eventCategory, eventType string) string {
|
||
return eventCategory + "/" + eventType
|
||
}
|
||
|
||
func splitEventType(eventType string) (eventCategory, pureEventType string) {
|
||
typeList := strings.Split(eventType, "/")
|
||
eventCategory = typeList[0]
|
||
pureEventType = typeList[1]
|
||
return eventCategory, pureEventType
|
||
}
|