Files
baseapi/utils/routinepool/routinepool.go
2025-11-21 09:09:09 +08:00

106 lines
2.3 KiB
Go

package routinepool
import (
"crypto/md5"
"encoding/binary"
"fmt"
"runtime/debug"
"git.rosy.net.cn/baseapi/utils"
)
const (
chanPoolSize = 1000
)
type TaskParam struct {
handler func()
resultChan chan interface{}
}
type Pool struct {
minRoutineCount int
maxRoutineCount int // not used currently
curRoutineCount int
taskChans []chan *TaskParam
}
func New(minRoutineCount, maxRoutineCount int) *Pool {
if minRoutineCount < 0 || minRoutineCount > maxRoutineCount {
panic(fmt.Sprintf("wrong params, minRoutineCount:%d, maxRoutineCount:%d", minRoutineCount, maxRoutineCount))
}
retVal := &Pool{
minRoutineCount: minRoutineCount,
maxRoutineCount: maxRoutineCount,
}
retVal.curRoutineCount = minRoutineCount
if retVal.curRoutineCount > 0 {
retVal.taskChans = make([]chan *TaskParam, retVal.curRoutineCount, retVal.maxRoutineCount)
for i := 0; i < retVal.curRoutineCount; i++ {
retVal.taskChans[i] = make(chan *TaskParam, chanPoolSize)
go taskFun(retVal.taskChans[i], i)
}
}
return retVal
}
func callHandler(handler func()) (retVal interface{}) {
defer func() {
if r := recover(); r != nil {
retVal = r
debug.PrintStack()
}
}()
handler()
return retVal
}
func taskFun(taskChan chan *TaskParam, index int) {
for {
taskParam := <-taskChan
result := callHandler(taskParam.handler)
if taskParam.resultChan != nil {
taskParam.resultChan <- result
}
}
}
func (p *Pool) callFun(func4Call func(), primaryID string, isAsync bool) (r interface{}) {
result := md5.Sum([]byte(primaryID + utils.GetUUID()))
resultInt64 := int64(binary.LittleEndian.Uint32(result[8:]))
chanIndex := int(resultInt64 % int64(p.curRoutineCount))
chanParam := &TaskParam{
handler: func4Call,
}
if !isAsync {
chanParam.resultChan = make(chan interface{})
}
p.taskChans[chanIndex] <- chanParam
if !isAsync {
r = <-chanParam.resultChan
close(chanParam.resultChan)
}
return r
}
func (p *Pool) CallFun(func4Call func(), primaryID string) {
if p.curRoutineCount > 0 {
r := p.callFun(func4Call, primaryID, false)
if r != nil {
panic(r)
}
} else {
func4Call()
}
}
func (p *Pool) CallFunAsync(func4Call func(), primaryID string) {
if p.curRoutineCount == 0 {
panic("CallFunAsync can not run when p.curRoutineCount == 0")
}
utils.CallFuncAsync(func() {
p.callFun(func4Call, primaryID, true)
})
}