This commit is contained in:
邹宗楠
2023-05-30 09:06:09 +08:00
parent d626fcf12d
commit 4b9c783b9a
3 changed files with 345 additions and 142 deletions

View File

@@ -1,49 +1,85 @@
package event
//
//import (
// "fmt"
// "sync"
//)
//
//type SimplePool struct {
// wg sync.WaitGroup
// work chan func() //任务队列
//}
//
//func NewSimplePoll(workers int) *SimplePool {
// p := &SimplePool{
// wg: sync.WaitGroup{},
// work: make(chan func()),
// }
// p.wg.Add(workers)
// //根据指定的并发量去读取管道并执行
// for i := 0; i < workers; i++ {
// go func() {
// defer func() {
// // 捕获异常 防止waitGroup阻塞
// if err := recover(); err != nil {
// fmt.Println(err)
// p.wg.Done()
// }
// }()
// // 从workChannel中取出任务执行
// for fn := range p.work {
// fn()
// }
// p.wg.Done()
// }()
// }
// return p
//}
//
//// 添加任务
//func (p *SimplePool) Add(fn func()) {
// p.work <- fn
//}
//
//// 执行
//func (p *SimplePool) Run() {
// close(p.work)
// p.wg.Wait()
//}
import (
"sync"
)
type Job func()
type Worker struct {
id int
jobChannel chan Job
quit chan bool
}
type Pool struct {
workers []*Worker
jobChannel chan Job
wg sync.WaitGroup
}
func NewWorker(id int, jobChannel chan Job) *Worker {
return &Worker{
id: id,
jobChannel: jobChannel,
quit: make(chan bool),
}
}
func (w *Worker) Start() {
go func() {
for {
select {
case job := <-w.jobChannel:
job()
case <-w.quit:
return
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
func NewPool(numWorkers int) *Pool {
jobChannel := make(chan Job)
pool := &Pool{
workers: make([]*Worker, numWorkers),
jobChannel: jobChannel,
}
for i := 0; i < numWorkers; i++ {
worker := NewWorker(i, jobChannel)
pool.workers[i] = worker
}
return pool
}
func (p *Pool) Start() {
for _, worker := range p.workers {
worker.Start()
}
}
func (p *Pool) Stop() {
for _, worker := range p.workers {
worker.Stop()
}
}
func (p *Pool) AddJob(job Job) {
p.wg.Add(1)
p.jobChannel <- func() {
job()
p.wg.Done()
}
}
func (p *Pool) Wait() {
p.wg.Wait()
}