package event import ( "sync" "time" ) // 全局单例线程池(初始化500个worker) var Pool *WorkerPool func init() { Pool = NewWorkerPool(500, 1024) // 500个worker,任务队列长度1024 Pool.Start() } // Job 任务定义 type Job func() // Worker 工作协程 type Worker struct { id int jobChan chan Job quitChan chan struct{} closeOnce sync.Once } // WorkerPool 线程池 type WorkerPool struct { workers []*Worker jobChan chan Job wg sync.WaitGroup closeOnce sync.Once closed bool mu sync.Mutex } // ---------------------------------------------------- // 创建 Worker // ---------------------------------------------------- func NewWorker(id int, jobChan chan Job) *Worker { return &Worker{ id: id, jobChan: jobChan, quitChan: make(chan struct{}), } } // Start 启动Worker(自带panic恢复) func (w *Worker) Start() { go func() { defer func() { if r := recover(); r != nil { // 打印panic日志,防止整个worker崩溃 // log.Printf("worker %d panic recovered: %v", w.id, r) } }() for { select { case job, ok := <-w.jobChan: if !ok { return } job() case <-w.quitChan: return } } }() } // Stop 安全停止Worker func (w *Worker) Stop() { w.closeOnce.Do(func() { close(w.quitChan) }) } // ---------------------------------------------------- // 创建线程池 // ---------------------------------------------------- func NewWorkerPool(workerCount int, queueSize int) *WorkerPool { jobChan := make(chan Job, queueSize) // 带缓冲队列,高并发不阻塞 pool := &WorkerPool{ workers: make([]*Worker, workerCount), jobChan: jobChan, } for i := 0; i < workerCount; i++ { pool.workers[i] = NewWorker(i, jobChan) } return pool } // Start 启动所有worker func (p *WorkerPool) Start() { for _, w := range p.workers { w.Start() } } // AddJob 提交任务(非阻塞、安全、不panic) func (p *WorkerPool) AddJob(job Job) bool { p.mu.Lock() closed := p.closed p.mu.Unlock() if closed { return false } p.wg.Add(1) select { case p.jobChan <- func() { defer p.wg.Done() job() }: return true case <-time.After(10 * time.Millisecond): // 队列满了,不阻塞,快速失败 p.wg.Done() return false } } // Wait 等待所有任务执行完成 func (p *WorkerPool) Wait() { p.wg.Wait() } // Stop 优雅关闭线程池(安全不panic) func (p *WorkerPool) Stop() { p.closeOnce.Do(func() { p.mu.Lock() p.closed = true p.mu.Unlock() // 1. 关闭任务通道 close(p.jobChan) // 2. 停止所有worker for _, w := range p.workers { w.Stop() } // 3. 等待所有任务执行完 p.Wait() }) } // IsClosed 判断线程池是否关闭 func (p *WorkerPool) IsClosed() bool { p.mu.Lock() defer p.mu.Unlock() return p.closed }