110 lines
2.5 KiB
Go
110 lines
2.5 KiB
Go
package routinepool
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"runtime/debug"
|
|
|
|
"git.rosy.net.cn/jx-callback/globals"
|
|
|
|
"git.rosy.net.cn/baseapi/utils"
|
|
)
|
|
|
|
const (
|
|
chanPoolSize = 8
|
|
)
|
|
|
|
type TaskParam struct {
|
|
handler func()
|
|
resultChan chan interface{}
|
|
}
|
|
|
|
type Pool struct {
|
|
minRoutineCount int
|
|
maxRoutineCount int // not used currently
|
|
curRoutineCount int
|
|
taskChans []chan *TaskParam
|
|
}
|
|
|
|
func New(minRoutineCount, maxRoutineCount int) *Pool {
|
|
if minRoutineCount < 0 || minRoutineCount > maxRoutineCount {
|
|
panic(fmt.Sprintf("wrong params, minRoutineCount:%d, maxRoutineCount:%d", minRoutineCount, maxRoutineCount))
|
|
}
|
|
retVal := &Pool{
|
|
minRoutineCount: minRoutineCount,
|
|
maxRoutineCount: maxRoutineCount,
|
|
}
|
|
|
|
retVal.curRoutineCount = minRoutineCount
|
|
if retVal.curRoutineCount > 0 {
|
|
retVal.taskChans = make([]chan *TaskParam, retVal.curRoutineCount, retVal.maxRoutineCount)
|
|
for i := 0; i < retVal.curRoutineCount; i++ {
|
|
retVal.taskChans[i] = make(chan *TaskParam, chanPoolSize)
|
|
go taskFun(retVal.taskChans[i], i)
|
|
}
|
|
}
|
|
return retVal
|
|
}
|
|
|
|
func callHandler(handler func()) (retVal interface{}) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
retVal = r
|
|
debug.PrintStack()
|
|
}
|
|
}()
|
|
handler()
|
|
return retVal
|
|
}
|
|
|
|
func taskFun(taskChan chan *TaskParam, index int) {
|
|
for {
|
|
taskParam := <-taskChan
|
|
// baseapi.SugarLogger.Debugf("routine:%d, handle task:%v", index, taskParam.handler)
|
|
result := callHandler(taskParam.handler)
|
|
if taskParam.resultChan != nil {
|
|
taskParam.resultChan <- result
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pool) callFun(func4Call func(), primaryID string, isAsync bool) (r interface{}) {
|
|
result := md5.Sum([]byte(primaryID))
|
|
resultInt64 := int64(binary.LittleEndian.Uint32(result[8:]))
|
|
chanIndex := int(resultInt64 % int64(p.curRoutineCount))
|
|
chanParam := &TaskParam{
|
|
handler: func4Call,
|
|
}
|
|
if !isAsync {
|
|
chanParam.resultChan = make(chan interface{})
|
|
}
|
|
p.taskChans[chanIndex] <- chanParam
|
|
if !isAsync {
|
|
r = <-chanParam.resultChan
|
|
close(chanParam.resultChan)
|
|
}
|
|
return r
|
|
}
|
|
|
|
func (p *Pool) CallFun(func4Call func(), primaryID string) {
|
|
globals.SugarLogger.Debugf("CallFun %v", p.curRoutineCount)
|
|
if p.curRoutineCount > 0 {
|
|
r := p.callFun(func4Call, primaryID, false)
|
|
if r != nil {
|
|
panic(r)
|
|
}
|
|
} else {
|
|
func4Call()
|
|
}
|
|
}
|
|
|
|
func (p *Pool) CallFunAsync(func4Call func(), primaryID string) {
|
|
if p.curRoutineCount == 0 {
|
|
panic("CallFunAsync can not run when p.curRoutineCount == 0")
|
|
}
|
|
utils.CallFuncAsync(func() {
|
|
p.callFun(func4Call, primaryID, true)
|
|
})
|
|
}
|