This commit is contained in:
邹宗楠
2026-05-09 16:02:04 +08:00
parent 637a878808
commit d339ebb47b
3 changed files with 210 additions and 59 deletions

View File

@@ -95,3 +95,101 @@ func (p *PrintIpAndAddrStruct) DelPrintIpAndAddr(printNo string) {
defer p.RUnlock() defer p.RUnlock()
delete(PrintIpAndAddr.PrintObject, printNo) delete(PrintIpAndAddr.PrintObject, printNo)
} }
//package event
//
//import (
// "fmt"
// "sync"
//)
//
//var (
// PrintObject *PrintObjectStruct // 缓存的打印机对象
// PrintAddrAndIp *PrintAddrAndIpStruct // 缓存打印机地址:[ip:printNo] event 文件包,connect只能获取到addr
// PrintIpAndAddr *PrintIpAndAddrStruct // 缓存打印机地址:[printNo:ip] api_controller 只能获取到printNo
//)
//
//func init() {
// fmt.Println("初始化打印机对象")
// PrintObject = &PrintObjectStruct{
// PrintObject: make(map[string]*TcpClient),
// RWMutex: new(sync.RWMutex),
// }
// PrintAddrAndIp = &PrintAddrAndIpStruct{
// PrintObject: make(map[string]string),
// RWMutex: new(sync.RWMutex),
// }
// PrintIpAndAddr = &PrintIpAndAddrStruct{
// PrintObject: make(map[string]string),
// RWMutex: new(sync.RWMutex),
// }
//}
//
//type PrintObjectStruct struct {
// PrintObject map[string]*TcpClient
// *sync.RWMutex
//}
//
//func (p *PrintObjectStruct) GetPrintObj(printNo string) (*TcpClient, bool) {
// p.RLock()
// defer p.RUnlock()
// tcpObj, ok := PrintObject.PrintObject[printNo]
// return tcpObj, ok
//}
//func (p *PrintObjectStruct) SetPrintObj(printNo string, tcpObj *TcpClient) {
// p.RLock()
// defer p.RUnlock()
// PrintObject.PrintObject[printNo] = tcpObj
//}
//
//func (p *PrintObjectStruct) DelPrintObj(printNo string) {
// p.RLock()
// defer p.RUnlock()
// delete(PrintObject.PrintObject, printNo)
//}
//
//type PrintAddrAndIpStruct struct {
// PrintObject map[string]string
// *sync.RWMutex
//}
//
//func (p *PrintAddrAndIpStruct) GetPrintAddrAndIp(ip string) (string, bool) {
// p.RLock()
// defer p.RUnlock()
// printNo, ok := PrintAddrAndIp.PrintObject[ip]
// return printNo, ok
//}
//func (p *PrintAddrAndIpStruct) SetPrintAddrAndIp(ip string, printNo string) {
// p.RLock()
// defer p.RUnlock()
// PrintAddrAndIp.PrintObject[ip] = printNo
//}
//
//func (p *PrintAddrAndIpStruct) DelPrintAddrAndIp(ip string) {
// p.RLock()
// defer p.RUnlock()
// delete(PrintAddrAndIp.PrintObject, ip)
//}
//
//type PrintIpAndAddrStruct struct {
// PrintObject map[string]string
// *sync.RWMutex
//}
//
//func (p *PrintIpAndAddrStruct) GetPrintIpAndAddr(printNo string) (string, bool) {
// p.RLock()
// defer p.RUnlock()
// tcpObj, ok := PrintIpAndAddr.PrintObject[printNo]
// return tcpObj, ok
//}
//func (p *PrintIpAndAddrStruct) SetPrintIpAndAddr(printNo string, ip string) {
// p.RLock()
// defer p.RUnlock()
// PrintIpAndAddr.PrintObject[printNo] = ip
//}
//
//func (p *PrintIpAndAddrStruct) DelPrintIpAndAddr(printNo string) {
// p.RLock()
// defer p.RUnlock()
// delete(PrintIpAndAddr.PrintObject, printNo)
//}

View File

@@ -48,12 +48,12 @@ func ListenTcp() {
if err := handleConn(c); err != nil { if err := handleConn(c); err != nil {
c.Close() c.Close()
Poll.Wait() Pool.Wait()
Poll.Stop() Pool.Stop()
return return
} }
} }
Poll.AddJob(fn) Pool.AddJob(fn)
} }
} }
@@ -200,7 +200,7 @@ func HandleTcpMessages(t *TcpClient, printNo string) {
return return
} }
} }
Poll.AddJob(fn) Pool.AddJob(fn)
} }
func (t *TcpClient) readTimeoutMap(key string) bool { func (t *TcpClient) readTimeoutMap(key string) bool {
@@ -331,7 +331,7 @@ func doPrint(t *TcpClient, key string) (err error) {
} }
} }
} }
Poll.AddJob(fn) Pool.AddJob(fn)
return err return err
} }

View File

@@ -2,103 +2,156 @@ package event
import ( import (
"sync" "sync"
"time"
) )
var Poll *Pool // 全局单例线程池初始化500个worker
var Pool *WorkerPool
func init() { func init() {
Poll = NewPool(500) Pool = NewWorkerPool(500, 1024) // 500个worker任务队列长度1024
Poll.Start() Pool.Start()
} }
// Job 任务定义
type Job func() type Job func()
// Worker 工作协程
type Worker struct { type Worker struct {
id int id int
jobChannel chan Job jobChan chan Job
quit chan bool quitChan chan struct{}
closeOnce sync.Once
} }
type Pool struct { // WorkerPool 线程池
type WorkerPool struct {
workers []*Worker workers []*Worker
jobChannel chan Job jobChan chan Job
wg sync.WaitGroup wg sync.WaitGroup
closeOnce sync.Once
closed bool
mu sync.Mutex
} }
func NewWorker(id int, jobChannel chan Job) *Worker { // ----------------------------------------------------
// 创建 Worker
// ----------------------------------------------------
func NewWorker(id int, jobChan chan Job) *Worker {
return &Worker{ return &Worker{
id: id, id: id,
jobChannel: jobChannel, jobChan: jobChan,
quit: make(chan bool), quitChan: make(chan struct{}),
} }
} }
// Start 启动Worker自带panic恢复
func (w *Worker) Start() { func (w *Worker) Start() {
go func() { go func() {
defer func() {
if r := recover(); r != nil {
// 打印panic日志防止整个worker崩溃
// log.Printf("worker %d panic recovered: %v", w.id, r)
}
}()
for { for {
select { select {
case job := <-w.jobChannel: case job, ok := <-w.jobChan:
if !ok {
return
}
job() job()
case <-w.quit: case <-w.quitChan:
return return
} }
} }
}() }()
} }
// Stop 安全停止Worker
func (w *Worker) Stop() { func (w *Worker) Stop() {
w.quit <- true w.closeOnce.Do(func() {
close(w.quit) close(w.quitChan)
//go func() { })
// w.quit <- true
//}()
} }
func NewPool(numWorkers int) *Pool { // ----------------------------------------------------
jobChannel := make(chan Job) // 创建线程池
pool := &Pool{ // ----------------------------------------------------
workers: make([]*Worker, numWorkers), func NewWorkerPool(workerCount int, queueSize int) *WorkerPool {
jobChannel: jobChannel, jobChan := make(chan Job, queueSize) // 带缓冲队列,高并发不阻塞
pool := &WorkerPool{
workers: make([]*Worker, workerCount),
jobChan: jobChan,
} }
for i := 0; i < numWorkers; i++ { for i := 0; i < workerCount; i++ {
worker := NewWorker(i, jobChannel) pool.workers[i] = NewWorker(i, jobChan)
pool.workers[i] = worker
} }
return pool return pool
} }
func (p *Pool) Start() { // Start 启动所有worker
for _, worker := range p.workers { func (p *WorkerPool) Start() {
worker.Start() for _, w := range p.workers {
w.Start()
} }
} }
func (p *Pool) Stop() { // AddJob 提交任务非阻塞、安全、不panic
for _, worker := range p.workers { func (p *WorkerPool) AddJob(job Job) bool {
worker.Stop() p.mu.Lock()
closed := p.closed
p.mu.Unlock()
if closed {
return false
} }
}
//
//func (p *Pool) AddJob(job Job) {
// p.wg.Add(1)
// p.jobChannel <- func() {
// job()
// p.wg.Done()
// }
//}
func (p *Pool) AddJob(job Job) {
p.wg.Add(1) p.wg.Add(1)
go func() { select {
p.jobChannel <- func() { case p.jobChan <- func() {
defer p.wg.Done() defer p.wg.Done()
job() job()
}:
return true
case <-time.After(10 * time.Millisecond):
// 队列满了,不阻塞,快速失败
p.wg.Done()
return false
} }
}()
} }
func (p *Pool) Wait() {
// Wait 等待所有任务执行完成
func (p *WorkerPool) Wait() {
p.wg.Wait() p.wg.Wait()
} }
// Stop 优雅关闭线程池安全不panic
func (p *WorkerPool) Stop() {
p.closeOnce.Do(func() {
p.mu.Lock()
p.closed = true
p.mu.Unlock()
// 1. 关闭任务通道
close(p.jobChan)
// 2. 停止所有worker
for _, w := range p.workers {
w.Stop()
}
// 3. 等待所有任务执行完
p.Wait()
})
}
// IsClosed 判断线程池是否关闭
func (p *WorkerPool) IsClosed() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.closed
}