diff --git a/utils/routinepool/routinepool.go b/utils/routinepool/routinepool.go new file mode 100644 index 00000000..018f0ce8 --- /dev/null +++ b/utils/routinepool/routinepool.go @@ -0,0 +1,71 @@ +package routinepool + +import ( + "crypto/md5" + "encoding/binary" + "fmt" + + "git.rosy.net.cn/baseapi" +) + +const ( + chanPoolSize = 8 +) + +type TaskParam struct { + handler func() + resultChan chan interface{} +} + +type Pool struct { + minRoutineCount int + maxRoutineCount int // not used currently + curRoutineCount int + taskChans []chan *TaskParam +} + +func New(minRoutineCount, maxRoutineCount int) *Pool { + if minRoutineCount < 0 || minRoutineCount > maxRoutineCount { + panic(fmt.Sprintf("wrong params, minRoutineCount:%d, maxRoutineCount:%d", minRoutineCount, maxRoutineCount)) + } + retVal := &Pool{ + minRoutineCount: minRoutineCount, + maxRoutineCount: maxRoutineCount, + } + + retVal.curRoutineCount = minRoutineCount + if retVal.curRoutineCount > 0 { + retVal.taskChans = make([]chan *TaskParam, retVal.curRoutineCount, retVal.maxRoutineCount) + for i := 0; i < retVal.curRoutineCount; i++ { + retVal.taskChans[i] = make(chan *TaskParam, chanPoolSize) + go taskFun(retVal.taskChans[i], i) + } + } + return retVal +} + +func taskFun(taskChan chan *TaskParam, index int) { + for { + taskParam := <-taskChan + baseapi.SugarLogger.Debugf("routine:%d, handle task:%v", index, taskParam.handler) + taskParam.handler() + taskParam.resultChan <- "" + } +} + +func (p *Pool) CallFun(func4Call func(), primaryID string) { + if p.curRoutineCount > 0 { + result := md5.Sum([]byte(primaryID)) + resultInt64 := int64(binary.LittleEndian.Uint32(result[:8])) + chanIndex := int(resultInt64 % int64(p.curRoutineCount)) + + chanParam := &TaskParam{ + handler: func4Call, + resultChan: make(chan interface{}), + } + p.taskChans[chanIndex] <- chanParam + _ = <-chanParam.resultChan + } else { + func4Call() + } +} diff --git a/utils/routinepool/routinepool_test.go b/utils/routinepool/routinepool_test.go new file mode 100644 index 00000000..3edbfa3a --- /dev/null +++ b/utils/routinepool/routinepool_test.go @@ -0,0 +1,58 @@ +package routinepool + +import ( + "math/rand" + "testing" + "time" + + "git.rosy.net.cn/baseapi" + "go.uber.org/zap" + + "git.rosy.net.cn/baseapi/utils" +) + +var ( + sugarLogger *zap.SugaredLogger +) + +func init() { + logger, _ := zap.NewDevelopment() + sugarLogger = logger.Sugar() + baseapi.Init(sugarLogger) + rand.Seed(time.Now().Unix()) +} + +func TestCallFun(t *testing.T) { + pool := New(10, 10) + for i := 0; i < 20; i++ { + pool.CallFun(func() { + x := i + sugarLogger.Debug(x) + }, utils.Int2Str(i)) + } + pool.CallFun(func() { + sugarLogger.Debug(15) + }, utils.Int2Str(15)) + pool.CallFun(func() { + sugarLogger.Debug(15) + }, utils.Int2Str(15)) + // time.Sleep(2 * time.Second) +} + +func getClosure(b int) func() { + a := b + return func() { + sugarLogger.Debug(a) + } +} +func TestClosure(t *testing.T) { + closures := make([]func(), 10) + for i := 0; i < 10; i++ { + closures[i] = getClosure(i) + } + for i := 0; i < 10; i++ { + handler := closures[rand.Intn(10)] + sugarLogger.Debug(handler) + handler() + } +}