package routinepool import ( "crypto/md5" "encoding/binary" "fmt" ) const ( chanPoolSize = 8 ) type TaskParam struct { handler func() resultChan chan interface{} } type Pool struct { minRoutineCount int maxRoutineCount int // not used currently curRoutineCount int taskChans []chan *TaskParam } func New(minRoutineCount, maxRoutineCount int) *Pool { if minRoutineCount < 0 || minRoutineCount > maxRoutineCount { panic(fmt.Sprintf("wrong params, minRoutineCount:%d, maxRoutineCount:%d", minRoutineCount, maxRoutineCount)) } retVal := &Pool{ minRoutineCount: minRoutineCount, maxRoutineCount: maxRoutineCount, } retVal.curRoutineCount = minRoutineCount if retVal.curRoutineCount > 0 { retVal.taskChans = make([]chan *TaskParam, retVal.curRoutineCount, retVal.maxRoutineCount) for i := 0; i < retVal.curRoutineCount; i++ { retVal.taskChans[i] = make(chan *TaskParam, chanPoolSize) go taskFun(retVal.taskChans[i], i) } } return retVal } func callHandler(handler func()) (retVal interface{}) { defer func() { if r := recover(); r != nil { retVal = r } }() handler() return retVal } func taskFun(taskChan chan *TaskParam, index int) { for { taskParam := <-taskChan // baseapi.SugarLogger.Debugf("routine:%d, handle task:%v", index, taskParam.handler) taskParam.resultChan <- callHandler(taskParam.handler) } } func (p *Pool) CallFun(func4Call func(), primaryID string) { if p.curRoutineCount > 0 { result := md5.Sum([]byte(primaryID)) resultInt64 := int64(binary.LittleEndian.Uint32(result[8:])) chanIndex := int(resultInt64 % int64(p.curRoutineCount)) chanParam := &TaskParam{ handler: func4Call, resultChan: make(chan interface{}), } p.taskChans[chanIndex] <- chanParam r := <-chanParam.resultChan if r != nil { panic(r) } } else { func4Call() } }