From d339ebb47be1bc9cb3b79a1a80ef6b941608929e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E5=AE=97=E6=A5=A0?= Date: Sat, 9 May 2026 16:02:04 +0800 Subject: [PATCH] 1 --- business/jxstore/event/event_print.go | 98 ++++++++++++++++ business/jxstore/event/event_tcp.go | 10 +- business/jxstore/event/timing_task.go | 161 +++++++++++++++++--------- 3 files changed, 210 insertions(+), 59 deletions(-) diff --git a/business/jxstore/event/event_print.go b/business/jxstore/event/event_print.go index cfcb2e289..312056c78 100644 --- a/business/jxstore/event/event_print.go +++ b/business/jxstore/event/event_print.go @@ -95,3 +95,101 @@ func (p *PrintIpAndAddrStruct) DelPrintIpAndAddr(printNo string) { defer p.RUnlock() delete(PrintIpAndAddr.PrintObject, printNo) } + +//package event +// +//import ( +// "fmt" +// "sync" +//) +// +//var ( +// PrintObject *PrintObjectStruct // 缓存的打印机对象 +// PrintAddrAndIp *PrintAddrAndIpStruct // 缓存打印机地址:[ip:printNo] event 文件包,connect只能获取到addr +// PrintIpAndAddr *PrintIpAndAddrStruct // 缓存打印机地址:[printNo:ip] api_controller 只能获取到printNo +//) +// +//func init() { +// fmt.Println("初始化打印机对象") +// PrintObject = &PrintObjectStruct{ +// PrintObject: make(map[string]*TcpClient), +// RWMutex: new(sync.RWMutex), +// } +// PrintAddrAndIp = &PrintAddrAndIpStruct{ +// PrintObject: make(map[string]string), +// RWMutex: new(sync.RWMutex), +// } +// PrintIpAndAddr = &PrintIpAndAddrStruct{ +// PrintObject: make(map[string]string), +// RWMutex: new(sync.RWMutex), +// } +//} +// +//type PrintObjectStruct struct { +// PrintObject map[string]*TcpClient +// *sync.RWMutex +//} +// +//func (p *PrintObjectStruct) GetPrintObj(printNo string) (*TcpClient, bool) { +// p.RLock() +// defer p.RUnlock() +// tcpObj, ok := PrintObject.PrintObject[printNo] +// return tcpObj, ok +//} +//func (p *PrintObjectStruct) SetPrintObj(printNo string, tcpObj *TcpClient) { +// p.RLock() +// defer p.RUnlock() +// PrintObject.PrintObject[printNo] = tcpObj +//} +// +//func (p *PrintObjectStruct) DelPrintObj(printNo string) { +// p.RLock() +// defer p.RUnlock() +// delete(PrintObject.PrintObject, printNo) +//} +// +//type PrintAddrAndIpStruct struct { +// PrintObject map[string]string +// *sync.RWMutex +//} +// +//func (p *PrintAddrAndIpStruct) GetPrintAddrAndIp(ip string) (string, bool) { +// p.RLock() +// defer p.RUnlock() +// printNo, ok := PrintAddrAndIp.PrintObject[ip] +// return printNo, ok +//} +//func (p *PrintAddrAndIpStruct) SetPrintAddrAndIp(ip string, printNo string) { +// p.RLock() +// defer p.RUnlock() +// PrintAddrAndIp.PrintObject[ip] = printNo +//} +// +//func (p *PrintAddrAndIpStruct) DelPrintAddrAndIp(ip string) { +// p.RLock() +// defer p.RUnlock() +// delete(PrintAddrAndIp.PrintObject, ip) +//} +// +//type PrintIpAndAddrStruct struct { +// PrintObject map[string]string +// *sync.RWMutex +//} +// +//func (p *PrintIpAndAddrStruct) GetPrintIpAndAddr(printNo string) (string, bool) { +// p.RLock() +// defer p.RUnlock() +// tcpObj, ok := PrintIpAndAddr.PrintObject[printNo] +// return tcpObj, ok +//} +//func (p *PrintIpAndAddrStruct) SetPrintIpAndAddr(printNo string, ip string) { +// p.RLock() +// defer p.RUnlock() +// PrintIpAndAddr.PrintObject[printNo] = ip +//} +// +//func (p *PrintIpAndAddrStruct) DelPrintIpAndAddr(printNo string) { +// p.RLock() +// defer p.RUnlock() +// delete(PrintIpAndAddr.PrintObject, printNo) +//} diff --git a/business/jxstore/event/event_tcp.go b/business/jxstore/event/event_tcp.go index 5b75c37b6..9c279fedb 100644 --- a/business/jxstore/event/event_tcp.go +++ b/business/jxstore/event/event_tcp.go @@ -48,12 +48,12 @@ func ListenTcp() { if err := handleConn(c); err != nil { c.Close() - Poll.Wait() - Poll.Stop() + Pool.Wait() + Pool.Stop() return } } - Poll.AddJob(fn) + Pool.AddJob(fn) } } @@ -200,7 +200,7 @@ func HandleTcpMessages(t *TcpClient, printNo string) { return } } - Poll.AddJob(fn) + Pool.AddJob(fn) } func (t *TcpClient) readTimeoutMap(key string) bool { @@ -331,7 +331,7 @@ func doPrint(t *TcpClient, key string) (err error) { } } } - Poll.AddJob(fn) + Pool.AddJob(fn) return err } diff --git a/business/jxstore/event/timing_task.go b/business/jxstore/event/timing_task.go index e3d12a300..4cef2f87d 100644 --- a/business/jxstore/event/timing_task.go +++ b/business/jxstore/event/timing_task.go @@ -2,103 +2,156 @@ package event import ( "sync" + "time" ) -var Poll *Pool +// 全局单例线程池(初始化500个worker) +var Pool *WorkerPool func init() { - Poll = NewPool(500) - Poll.Start() + Pool = NewWorkerPool(500, 1024) // 500个worker,任务队列长度1024 + Pool.Start() } +// Job 任务定义 type Job func() +// Worker 工作协程 type Worker struct { - id int - jobChannel chan Job - quit chan bool + id int + jobChan chan Job + quitChan chan struct{} + closeOnce sync.Once } -type Pool struct { - workers []*Worker - jobChannel chan Job - wg sync.WaitGroup +// WorkerPool 线程池 +type WorkerPool struct { + workers []*Worker + jobChan chan Job + wg sync.WaitGroup + closeOnce sync.Once + closed bool + mu sync.Mutex } -func NewWorker(id int, jobChannel chan Job) *Worker { +// ---------------------------------------------------- +// 创建 Worker +// ---------------------------------------------------- +func NewWorker(id int, jobChan chan Job) *Worker { return &Worker{ - id: id, - jobChannel: jobChannel, - quit: make(chan bool), + id: id, + jobChan: jobChan, + quitChan: make(chan struct{}), } } +// Start 启动Worker(自带panic恢复) func (w *Worker) Start() { go func() { + defer func() { + if r := recover(); r != nil { + // 打印panic日志,防止整个worker崩溃 + // log.Printf("worker %d panic recovered: %v", w.id, r) + } + }() + for { select { - case job := <-w.jobChannel: + case job, ok := <-w.jobChan: + if !ok { + return + } job() - case <-w.quit: + case <-w.quitChan: return } } }() } +// Stop 安全停止Worker func (w *Worker) Stop() { - w.quit <- true - close(w.quit) - //go func() { - // w.quit <- true - //}() + w.closeOnce.Do(func() { + close(w.quitChan) + }) } -func NewPool(numWorkers int) *Pool { - jobChannel := make(chan Job) - pool := &Pool{ - workers: make([]*Worker, numWorkers), - jobChannel: jobChannel, +// ---------------------------------------------------- +// 创建线程池 +// ---------------------------------------------------- +func NewWorkerPool(workerCount int, queueSize int) *WorkerPool { + jobChan := make(chan Job, queueSize) // 带缓冲队列,高并发不阻塞 + + pool := &WorkerPool{ + workers: make([]*Worker, workerCount), + jobChan: jobChan, } - for i := 0; i < numWorkers; i++ { - worker := NewWorker(i, jobChannel) - pool.workers[i] = worker + for i := 0; i < workerCount; i++ { + pool.workers[i] = NewWorker(i, jobChan) } - return pool } -func (p *Pool) Start() { - for _, worker := range p.workers { - worker.Start() +// Start 启动所有worker +func (p *WorkerPool) Start() { + for _, w := range p.workers { + w.Start() } } -func (p *Pool) Stop() { - for _, worker := range p.workers { - worker.Stop() +// AddJob 提交任务(非阻塞、安全、不panic) +func (p *WorkerPool) AddJob(job Job) bool { + p.mu.Lock() + closed := p.closed + p.mu.Unlock() + + if closed { + return false } -} -// -//func (p *Pool) AddJob(job Job) { -// p.wg.Add(1) -// p.jobChannel <- func() { -// job() -// p.wg.Done() -// } -//} - -func (p *Pool) AddJob(job Job) { p.wg.Add(1) - go func() { - p.jobChannel <- func() { - defer p.wg.Done() - job() - } - }() + select { + case p.jobChan <- func() { + defer p.wg.Done() + job() + }: + return true + case <-time.After(10 * time.Millisecond): + // 队列满了,不阻塞,快速失败 + p.wg.Done() + return false + } } -func (p *Pool) Wait() { + +// Wait 等待所有任务执行完成 +func (p *WorkerPool) Wait() { p.wg.Wait() } + +// Stop 优雅关闭线程池(安全不panic) +func (p *WorkerPool) Stop() { + p.closeOnce.Do(func() { + p.mu.Lock() + p.closed = true + p.mu.Unlock() + + // 1. 关闭任务通道 + close(p.jobChan) + + // 2. 停止所有worker + for _, w := range p.workers { + w.Stop() + } + + // 3. 等待所有任务执行完 + p.Wait() + }) +} + +// IsClosed 判断线程池是否关闭 +func (p *WorkerPool) IsClosed() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.closed +}