From 14de70f9116270be06f81de27d0fb2a49b7de444 Mon Sep 17 00:00:00 2001 From: suyl <770236076@qq.com> Date: Fri, 30 Jul 2021 17:22:51 +0800 Subject: [PATCH] aa --- business/jxstore/event/event_tcp.go | 24 ++++----- business/jxstore/event/event_tcp_utils.go | 63 +++++++++++++++++++---- 2 files changed, 66 insertions(+), 21 deletions(-) diff --git a/business/jxstore/event/event_tcp.go b/business/jxstore/event/event_tcp.go index 43c74bef9..f63f061d5 100644 --- a/business/jxstore/event/event_tcp.go +++ b/business/jxstore/event/event_tcp.go @@ -25,9 +25,9 @@ func ListenTcp() { return } globals.SugarLogger.Debugf("begin listenTcp port 8000......") - go HandleTcpMessages() + go t.HandleTcpMessages() go t.HandleCheckTcpHeart() - go t.doPrint2(printMsgChan) + //go t.doPrint2() //go t.doPrint(printMsgChanFail) for { c, err := l.Accept() @@ -95,9 +95,11 @@ func (t *TcpClient) handleConn(c net.Conn) { //t.Unlock() t.addConn(c, printNo, status) t.buildCallBackMap(printNo) + t.buildMsgMap(printNo) + go t.doPrint2(printNo) changePrinterStatus(printNo, status) if status == printerStatusOnline { - printFail() + t.printFail() } //} else { //改变打印机状态 @@ -118,9 +120,7 @@ func (t *TcpClient) handleConn(c net.Conn) { } else if strings.Contains(data, printText) { globals.SugarLogger.Debugf("handleConn print callback: %v", data) _, printNo = getCallbackMsgInfo(data) - t.RLock() - t.CallBackMap[printNo] <- data - t.RUnlock() + t.addCallbackChan(printNo, data) //printMsgCallbackChan <- printMsgCallbackMap //changePrintMsg(data) } @@ -153,14 +153,14 @@ func changePrinterStatus(printNo string, status int) { } } -func printFail() (err error) { +func (t *TcpClient) printFail() (err error) { //新开机的打印失败和错误的 var ( db = dao.GetDB() ) prints, _ := dao.GetPrintMsgs(db, []int{printMsgFail, printMsgErr, printMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999) for _, printMsg := range prints { - printMsgChan <- printMsg + t.addMsgChan(printMsg) } return err } @@ -217,7 +217,7 @@ func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) ( return err } -func HandleTcpMessages() { +func (t *TcpClient) HandleTcpMessages() { var ( db = dao.GetDB() offset, pageSize = 0, 1 @@ -226,20 +226,20 @@ func HandleTcpMessages() { //一直读? prints, _ := dao.GetPrintMsgs(db, []int{printMsgWait}, time.Now().Add(-time.Hour*3), time.Now(), offset, pageSize) for _, printMsg := range prints { - printMsgChan <- printMsg + t.addMsgChan(printMsg) printMsg.Status = printMsgAlreadyLoad dao.UpdateEntity(db, printMsg, "Status") } } } -func (t *TcpClient) doPrint2(printMsgChan chan *model.PrintMsg) (err error) { +func (t *TcpClient) doPrint2(key string) (err error) { var ( db = dao.GetDB() ) for { select { - case printMsg := <-printMsgChan: + case printMsg := <-t.MsgMap[key]: var ( data []byte c net.Conn diff --git a/business/jxstore/event/event_tcp_utils.go b/business/jxstore/event/event_tcp_utils.go index ba7d6652a..fdc40651c 100644 --- a/business/jxstore/event/event_tcp_utils.go +++ b/business/jxstore/event/event_tcp_utils.go @@ -99,10 +99,10 @@ var ( regexpQrr = regexp.MustCompile(byteSignQrRight + "(.*?)" + byteSignQrRightE) regexpSound = regexp.MustCompile(byteSignSound + "(.*?)" + byteSignSoundE) - printMsgChan = make(chan *model.PrintMsg, 1024) + //printMsgChan = make(chan *model.PrintMsg, 1024) //printMsgCallbackMap = make(map[string]chan string, 1024) - printMsgChanFail = make(chan *model.PrintMsg, 1024) - timeoutChan = make(chan int, 10) + //printMsgChanFail = make(chan *model.PrintMsg, 1024) + timeoutChan = make(chan int, 10) ) type PrintInfo struct { @@ -113,8 +113,9 @@ type PrintInfo struct { //连接的客户端,吧每个客户端都放进来 type TcpClient struct { - Clients map[string]*PrintInfo //放tcp连接的,printNo 为key - CallBackMap map[string]chan string //放打印信息回调信息的,printNo为key + Clients map[string]*PrintInfo //放tcp连接的,printNo 为key + MsgMap map[string]chan *model.PrintMsg //放打印信息的,printNo为key + CallBackMap map[string]chan string //放打印信息回调信息的,printNo为key *sync.RWMutex } @@ -123,20 +124,20 @@ type GetPrintStatus struct { AppID int } +//从连接池删除,并关闭连接 func (t *TcpClient) delConn(key string) { t.Lock() defer t.Unlock() - if t.Clients[key].C != nil { t.Clients[key].C.Close() } delete(t.Clients, key) } +//添加到连接池中 func (t *TcpClient) addConn(c net.Conn, key string, status int) { t.Lock() defer t.Unlock() - t.Clients[key] = &PrintInfo{ C: c, Status: status, @@ -144,6 +145,13 @@ func (t *TcpClient) addConn(c net.Conn, key string, status int) { } } +func (t *TcpClient) buildMsgMap(key string) { + t.Lock() + defer t.Unlock() + dataChan := make(chan *model.PrintMsg, 1024) + t.MsgMap[key] = dataChan +} + func (t *TcpClient) buildCallBackMap(key string) { t.Lock() defer t.Unlock() @@ -171,10 +179,29 @@ func (t *TcpClient) getPrintConn(key string) net.Conn { } } +func (t *TcpClient) isExistMsg(key string) bool { + t.RLock() + defer t.RUnlock() + if t.MsgMap[key] == nil { + return false + } else { + return true + } +} + +func (t *TcpClient) isExistCallback(key string) bool { + t.RLock() + defer t.RUnlock() + if t.CallBackMap[key] == nil { + return false + } else { + return true + } +} + func (t *TcpClient) isExist(key string) bool { t.RLock() defer t.RUnlock() - if t.Clients[key] == nil { return false } else { @@ -185,17 +212,35 @@ func (t *TcpClient) isExist(key string) bool { func (t *TcpClient) setPrintStatus(key string, status int) { t.Lock() defer t.Unlock() - if t.isExist(key) { t.Clients[key].Status = status t.Clients[key].StatusTime = time.Now() } } +func (t *TcpClient) addMsgChan(printMsg *model.PrintMsg) { + t.RUnlock() + defer t.RUnlock() + if !t.isExistMsg(printMsg.PrintNo) { + t.buildMsgMap(printMsg.PrintNo) + } + t.MsgMap[printMsg.PrintNo] <- printMsg +} + +func (t *TcpClient) addCallbackChan(key, data string) { + t.RUnlock() + defer t.RUnlock() + if !t.isExistCallback(key) { + t.buildCallBackMap(key) + } + t.CallBackMap[key] <- data +} + func NewTcpClient() *TcpClient { t := &TcpClient{ Clients: make(map[string]*PrintInfo), CallBackMap: make(map[string]chan string), + MsgMap: make(map[string]chan *model.PrintMsg), } t.RWMutex = new(sync.RWMutex) return t