158 lines
2.8 KiB
Go
158 lines
2.8 KiB
Go
package event
|
||
|
||
import (
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// 全局单例线程池(初始化500个worker)
|
||
var Pool *WorkerPool
|
||
|
||
func init() {
|
||
Pool = NewWorkerPool(500, 1024) // 500个worker,任务队列长度1024
|
||
Pool.Start()
|
||
}
|
||
|
||
// Job 任务定义
|
||
type Job func()
|
||
|
||
// Worker 工作协程
|
||
type Worker struct {
|
||
id int
|
||
jobChan chan Job
|
||
quitChan chan struct{}
|
||
closeOnce sync.Once
|
||
}
|
||
|
||
// WorkerPool 线程池
|
||
type WorkerPool struct {
|
||
workers []*Worker
|
||
jobChan chan Job
|
||
wg sync.WaitGroup
|
||
closeOnce sync.Once
|
||
closed bool
|
||
mu sync.Mutex
|
||
}
|
||
|
||
// ----------------------------------------------------
|
||
// 创建 Worker
|
||
// ----------------------------------------------------
|
||
func NewWorker(id int, jobChan chan Job) *Worker {
|
||
return &Worker{
|
||
id: id,
|
||
jobChan: jobChan,
|
||
quitChan: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// Start 启动Worker(自带panic恢复)
|
||
func (w *Worker) Start() {
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// 打印panic日志,防止整个worker崩溃
|
||
// log.Printf("worker %d panic recovered: %v", w.id, r)
|
||
}
|
||
}()
|
||
|
||
for {
|
||
select {
|
||
case job, ok := <-w.jobChan:
|
||
if !ok {
|
||
return
|
||
}
|
||
job()
|
||
case <-w.quitChan:
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// Stop 安全停止Worker
|
||
func (w *Worker) Stop() {
|
||
w.closeOnce.Do(func() {
|
||
close(w.quitChan)
|
||
})
|
||
}
|
||
|
||
// ----------------------------------------------------
|
||
// 创建线程池
|
||
// ----------------------------------------------------
|
||
func NewWorkerPool(workerCount int, queueSize int) *WorkerPool {
|
||
jobChan := make(chan Job, queueSize) // 带缓冲队列,高并发不阻塞
|
||
|
||
pool := &WorkerPool{
|
||
workers: make([]*Worker, workerCount),
|
||
jobChan: jobChan,
|
||
}
|
||
|
||
for i := 0; i < workerCount; i++ {
|
||
pool.workers[i] = NewWorker(i, jobChan)
|
||
}
|
||
return pool
|
||
}
|
||
|
||
// Start 启动所有worker
|
||
func (p *WorkerPool) Start() {
|
||
for _, w := range p.workers {
|
||
w.Start()
|
||
}
|
||
}
|
||
|
||
// AddJob 提交任务(非阻塞、安全、不panic)
|
||
func (p *WorkerPool) AddJob(job Job) bool {
|
||
p.mu.Lock()
|
||
closed := p.closed
|
||
p.mu.Unlock()
|
||
|
||
if closed {
|
||
return false
|
||
}
|
||
|
||
p.wg.Add(1)
|
||
select {
|
||
case p.jobChan <- func() {
|
||
defer p.wg.Done()
|
||
job()
|
||
}:
|
||
return true
|
||
case <-time.After(10 * time.Millisecond):
|
||
// 队列满了,不阻塞,快速失败
|
||
p.wg.Done()
|
||
return false
|
||
}
|
||
}
|
||
|
||
// Wait 等待所有任务执行完成
|
||
func (p *WorkerPool) Wait() {
|
||
p.wg.Wait()
|
||
}
|
||
|
||
// Stop 优雅关闭线程池(安全不panic)
|
||
func (p *WorkerPool) Stop() {
|
||
p.closeOnce.Do(func() {
|
||
p.mu.Lock()
|
||
p.closed = true
|
||
p.mu.Unlock()
|
||
|
||
// 1. 关闭任务通道
|
||
close(p.jobChan)
|
||
|
||
// 2. 停止所有worker
|
||
for _, w := range p.workers {
|
||
w.Stop()
|
||
}
|
||
|
||
// 3. 等待所有任务执行完
|
||
p.Wait()
|
||
})
|
||
}
|
||
|
||
// IsClosed 判断线程池是否关闭
|
||
func (p *WorkerPool) IsClosed() bool {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
return p.closed
|
||
}
|