- routine pool added.

This commit is contained in:
gazebo
2018-06-28 12:35:11 +08:00
parent 0ccaf81be8
commit 3b36c74a29
2 changed files with 129 additions and 0 deletions

View File

@@ -0,0 +1,71 @@
package routinepool
import (
"crypto/md5"
"encoding/binary"
"fmt"
"git.rosy.net.cn/baseapi"
)
const (
chanPoolSize = 8
)
type TaskParam struct {
handler func()
resultChan chan interface{}
}
type Pool struct {
minRoutineCount int
maxRoutineCount int // not used currently
curRoutineCount int
taskChans []chan *TaskParam
}
func New(minRoutineCount, maxRoutineCount int) *Pool {
if minRoutineCount < 0 || minRoutineCount > maxRoutineCount {
panic(fmt.Sprintf("wrong params, minRoutineCount:%d, maxRoutineCount:%d", minRoutineCount, maxRoutineCount))
}
retVal := &Pool{
minRoutineCount: minRoutineCount,
maxRoutineCount: maxRoutineCount,
}
retVal.curRoutineCount = minRoutineCount
if retVal.curRoutineCount > 0 {
retVal.taskChans = make([]chan *TaskParam, retVal.curRoutineCount, retVal.maxRoutineCount)
for i := 0; i < retVal.curRoutineCount; i++ {
retVal.taskChans[i] = make(chan *TaskParam, chanPoolSize)
go taskFun(retVal.taskChans[i], i)
}
}
return retVal
}
func taskFun(taskChan chan *TaskParam, index int) {
for {
taskParam := <-taskChan
baseapi.SugarLogger.Debugf("routine:%d, handle task:%v", index, taskParam.handler)
taskParam.handler()
taskParam.resultChan <- ""
}
}
func (p *Pool) CallFun(func4Call func(), primaryID string) {
if p.curRoutineCount > 0 {
result := md5.Sum([]byte(primaryID))
resultInt64 := int64(binary.LittleEndian.Uint32(result[:8]))
chanIndex := int(resultInt64 % int64(p.curRoutineCount))
chanParam := &TaskParam{
handler: func4Call,
resultChan: make(chan interface{}),
}
p.taskChans[chanIndex] <- chanParam
_ = <-chanParam.resultChan
} else {
func4Call()
}
}

View File

@@ -0,0 +1,58 @@
package routinepool
import (
"math/rand"
"testing"
"time"
"git.rosy.net.cn/baseapi"
"go.uber.org/zap"
"git.rosy.net.cn/baseapi/utils"
)
var (
sugarLogger *zap.SugaredLogger
)
func init() {
logger, _ := zap.NewDevelopment()
sugarLogger = logger.Sugar()
baseapi.Init(sugarLogger)
rand.Seed(time.Now().Unix())
}
func TestCallFun(t *testing.T) {
pool := New(10, 10)
for i := 0; i < 20; i++ {
pool.CallFun(func() {
x := i
sugarLogger.Debug(x)
}, utils.Int2Str(i))
}
pool.CallFun(func() {
sugarLogger.Debug(15)
}, utils.Int2Str(15))
pool.CallFun(func() {
sugarLogger.Debug(15)
}, utils.Int2Str(15))
// time.Sleep(2 * time.Second)
}
func getClosure(b int) func() {
a := b
return func() {
sugarLogger.Debug(a)
}
}
func TestClosure(t *testing.T) {
closures := make([]func(), 10)
for i := 0; i < 10; i++ {
closures[i] = getClosure(i)
}
for i := 0; i < 10; i++ {
handler := closures[rand.Intn(10)]
sugarLogger.Debug(handler)
handler()
}
}