diff --git a/business/jxstore/event/event_tcp.go b/business/jxstore/event/event_tcp.go index 71b918cca..085b75bc8 100644 --- a/business/jxstore/event/event_tcp.go +++ b/business/jxstore/event/event_tcp.go @@ -25,6 +25,7 @@ func ListenTcp() { } globals.SugarLogger.Debugf("begin listenTcp port 8000......") go HandleTcpMessages() + go t.HandleCallback() go t.HandleCheckTcpHeart() go t.doPrint2(printMsgChan) //go t.doPrint(printMsgChanFail) @@ -117,7 +118,9 @@ func (t *TcpClient) handleConn(c net.Conn) { } else if strings.Contains(data, printText) { globals.SugarLogger.Debugf("handleConn print callback: %v", data) _, printNo = getCallbackMsgInfo(data) - t.CallBackMap[printNo] <- data + if backChan, ok := t.CallBackMap.Load(printNo); ok { + backChan.(chan string) <- data + } //printMsgCallbackChan <- printMsgCallbackMap //changePrintMsg(data) } @@ -217,7 +220,7 @@ func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) ( func HandleTcpMessages() { var ( db = dao.GetDB() - offset, pageSize = 0, 10 + offset, pageSize = 0, 1 ) for { //一直读? @@ -275,7 +278,7 @@ func (t *TcpClient) doPrint2(printMsgChan chan *model.PrintMsg) (err error) { err = fmt.Errorf("未查询到此printMsg") } if err != nil { - fmt.Println("111111111111111111111111111111111111111111111", utils.Format4Output(t.Clients[printMsg.PrintNo], true)) + fmt.Println("111111111111111111111111111111111111111111111", utils.Format4Output(t.Clients[printMsg.PrintNo], true), err.Error()) //t.delConn(printMsg.PrintNo) printMsg.Status = printMsgErr printMsg.Comment = err.Error() @@ -293,18 +296,9 @@ func (t *TcpClient) doPrint2(printMsgChan chan *model.PrintMsg) (err error) { //c.Close() } else { globals.SugarLogger.Debugf("handleTcpMessages success, data: %v", hex.EncodeToString(data)) - printMsg.Status = printMsgAlreadySend - dao.UpdateEntity(db, printMsg, "Status", "Comment") + //printMsg.Status = printMsgAlreadySend + //dao.UpdateEntity(db, printMsg, "Status") //if t.CallBackMap[printMsg.PrintNo] != nil { - t.RLock() - select { - case dataStr := <-t.CallBackMap[printMsg.PrintNo]: - a, b := getCallbackMsgInfo(dataStr) - t.changePrintMsg(dataStr, a, b) - case <-timeoutChan: - } - t.RUnlock() - //} //dataStr := <-printMsgCallbackChan } @@ -315,6 +309,17 @@ func (t *TcpClient) doPrint2(printMsgChan chan *model.PrintMsg) (err error) { return err } +func (t *TcpClient) HandleCallback() { + for { + t.CallBackMap.Range(func(key, value interface{}) bool { + dataStr := <-value.(chan string) + a, b := getCallbackMsgInfo(dataStr) + t.changePrintMsg(dataStr, a, b) + return true + }) + } +} + func (t *TcpClient) HandleCheckTcpHeart() { for { keys := []string{} diff --git a/business/jxstore/event/event_tcp_utils.go b/business/jxstore/event/event_tcp_utils.go index c56eaa0b1..a840ff6b1 100644 --- a/business/jxstore/event/event_tcp_utils.go +++ b/business/jxstore/event/event_tcp_utils.go @@ -114,7 +114,7 @@ type PrintInfo struct { //连接的客户端,吧每个客户端都放进来 type TcpClient struct { Clients map[string]*PrintInfo - CallBackMap map[string]chan string + CallBackMap sync.Map *sync.RWMutex } @@ -145,11 +145,8 @@ func (t *TcpClient) addConn(c net.Conn, key string, status int) { } func (t *TcpClient) buildCallBackMap(key string) { - t.Lock() - defer t.Unlock() dataChan := make(chan string, 1024) - t.CallBackMap[key] = dataChan - timeoutChan <- 1 + t.CallBackMap.Store(key, dataChan) } func (t *TcpClient) getPrintStatus(key string) int { @@ -195,8 +192,8 @@ func (t *TcpClient) setPrintStatus(key string, status int) { func NewTcpClient() *TcpClient { t := &TcpClient{ - Clients: make(map[string]*PrintInfo), - CallBackMap: make(map[string]chan string, 1024), + Clients: make(map[string]*PrintInfo), + //CallBackMap: new(sync.Map), } t.RWMutex = new(sync.RWMutex) return t