package eventhub import ( "fmt" "strings" "sync" "time" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/baseapi/utils" ) const ( cmdRegisterConsumer = 1 cmdUnregisterConsumer = 2 cmdNewEvent = 3 ) const ( maxGetOrderTimeDuration = 24 * time.Hour minPollingDuration = 1 * time.Minute defPollingDuration = 5 * time.Minute maxPollingDuration = 10 * time.Minute ) const ( cmdChanLen = 100 ) const ( allEventType = "" ) type IEventProducer interface { GetEvent(eventTypeList []string, criteria interface{}) (event *EventInfo, err error) IsCriteriaMatch(eventInfo *EventInfo, criteria interface{}) bool } type EventInfo struct { Type string Data interface{} } type tRegisterInfo struct { notifyChan chan *EventInfo eventCategory string eventTypeList []string criteria interface{} } type tUnregisterInfo struct { notifyChan chan *EventInfo closeChan chan struct{} } type tCmd struct { cmdType int data interface{} // regisgerInfo *RegisterInfo // CmdRegisterConsumer // unregisterInfo *UnregisterInfo // CmdUnregisterConsumer // eventInfo *EventInfo // CmdNewEvent } type EventHub struct { cmdChan chan *tCmd chanMap map[chan *EventInfo]*tRegisterInfo typeChanMap map[string]map[chan *EventInfo]bool eventCatMap map[string]IEventProducer locker sync.RWMutex } func eventRoutine(eventHub *EventHub) { eventHub.eventRoutine() } func New() (eventHub *EventHub) { eventHub = &EventHub{ cmdChan: make(chan *tCmd, cmdChanLen), chanMap: make(map[chan *EventInfo]*tRegisterInfo), typeChanMap: make(map[string]map[chan *EventInfo]bool), eventCatMap: make(map[string]IEventProducer), } utils.CallFuncAsync(func() { eventRoutine(eventHub) }) return eventHub } func (e *EventHub) eventRoutine() { for { cmd, ok := <-e.cmdChan if ok { switch cmd.cmdType { case cmdRegisterConsumer: regisgerInfo := cmd.data.(*tRegisterInfo) e.chanMap[regisgerInfo.notifyChan] = regisgerInfo for _, eventType := range regisgerInfo.eventTypeList { if e.typeChanMap[eventType] == nil { e.typeChanMap[eventType] = make(map[chan *EventInfo]bool) } e.typeChanMap[eventType][regisgerInfo.notifyChan] = true } case cmdUnregisterConsumer: unregisgerInfo := cmd.data.(*tUnregisterInfo) e.removeChan(unregisgerInfo.notifyChan) close(unregisgerInfo.closeChan) case cmdNewEvent: eventInfo := cmd.data.(*EventInfo) typeList := []string{allEventType, eventInfo.Type} tmpChanMap := make(map[chan *EventInfo]*tRegisterInfo) for _, eventType := range typeList { for notifyChan := range e.typeChanMap[eventType] { tmpChanMap[notifyChan] = e.chanMap[notifyChan] } } for notifyChan, registerInfo := range tmpChanMap { eventCategory, _ := splitEventType(eventInfo.Type) if eventProducer := e.getEventProducer(eventCategory); eventProducer != nil { if eventProducer.IsCriteriaMatch(eventInfo, registerInfo.criteria) { notifyChan <- eventInfo e.removeChan(notifyChan) } } else { globals.SugarLogger.Warnf("eventRoutine, eventCategory:%s producer is nil") } } } } else { break } } } func (e *EventHub) removeChan(notifyChann chan *EventInfo) { typeList := e.chanMap[notifyChann].eventTypeList for _, eventType := range typeList { delete(e.typeChanMap[eventType], notifyChann) } delete(e.chanMap, notifyChann) } func (e *EventHub) Close() { close(e.cmdChan) } func (e *EventHub) RegisterProducer(eventCategory string, producer IEventProducer) (err error) { e.locker.Lock() if e.eventCatMap[eventCategory] != nil { err = fmt.Errorf("eventCategory:%s已经被注册了", eventCategory) } else { e.eventCatMap[eventCategory] = producer } e.locker.Unlock() return err } // 正常不应该是动态的经常调用注册与反注册Producer,只是退出进程前礼貌的调用 func (e *EventHub) UnregisterProducer(eventCategory string) (err error) { e.locker.Lock() if e.eventCatMap[eventCategory] == nil { err = fmt.Errorf("eventCategory:%s没有注册", eventCategory) } else { delete(e.eventCatMap, eventCategory) } e.locker.Unlock() return err } func (e *EventHub) registerConsumer(eventCategory string, eventTypeList []string, criteria interface{}) (notifyChan chan *EventInfo) { if len(eventTypeList) == 0 { eventTypeList = []string{allEventType} } realEventTypeList := make([]string, len(eventTypeList)) for index, eventType := range eventTypeList { realEventTypeList[index] = composeEventType(eventCategory, eventType) } info := &tRegisterInfo{ eventCategory: eventCategory, eventTypeList: eventTypeList, notifyChan: make(chan *EventInfo, 1), criteria: criteria, } e.cmdChan <- &tCmd{ cmdType: cmdRegisterConsumer, data: info, } return info.notifyChan } func (e *EventHub) unregisterConsumer(notifyChan chan *EventInfo) { info := &tUnregisterInfo{ notifyChan: notifyChan, closeChan: make(chan struct{}), } e.cmdChan <- &tCmd{ cmdType: cmdUnregisterConsumer, data: info, } <-info.closeChan } func (e *EventHub) PostNewEvent(eventCategory string, event *EventInfo) { newEvent := *event newEvent.Type = composeEventType(eventCategory, event.Type) e.cmdChan <- &tCmd{ cmdType: cmdNewEvent, data: event, } } func (e *EventHub) GetEvent(eventCategory string, eventTypeList []string, criteria interface{}, waitTime time.Duration) (event *EventInfo, err error) { eventProducer := e.getEventProducer(eventCategory) if eventProducer == nil { return nil, fmt.Errorf("eventCategory:%s没有注册", eventCategory) } event, err = eventProducer.GetEvent(eventTypeList, criteria) if err == nil && event == nil { notifyChan := e.registerConsumer(eventCategory, eventTypeList, criteria) pollingDuration := defPollingDuration if waitTime != 0 { pollingDuration = waitTime if globals.IsProductEnv() { if pollingDuration > maxPollingDuration { pollingDuration = maxPollingDuration } else if pollingDuration < minPollingDuration { pollingDuration = minPollingDuration } } } timer := time.NewTimer(pollingDuration) select { case tmpEvent, ok := <-notifyChan: timer.Stop() if ok { event = tmpEvent } case <-timer.C: e.unregisterConsumer(notifyChan) } close(notifyChan) } return event, err } func (e *EventHub) getEventProducer(eventCategory string) IEventProducer { e.locker.RLock() defer e.locker.RUnlock() return e.eventCatMap[eventCategory] } func composeEventType(eventCategory, eventType string) string { return eventCategory + "/" + eventType } func splitEventType(eventType string) (eventCategory, pureEventType string) { typeList := strings.Split(eventType, "/") eventCategory = typeList[0] pureEventType = typeList[1] return eventCategory, pureEventType }