package event import ( "encoding/hex" "encoding/json" "fmt" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/globals" "io" "net" "strings" "time" ) //入口 func ListenTcp() { t := NewTcpClient() l, err := net.Listen("tcp", ":8000") if err != nil { fmt.Println("listen error:", err) return } globals.SugarLogger.Debugf("begin listenTcp port 8000......") //go t.HandleTcpMessages() go t.HandleCheckTcpHeart() //go t.doPrint2() //go t.doPrint(printMsgChanFail) for { c, err := l.Accept() if err != nil { fmt.Println("accept error:", err) break } go t.handleConn(c) } } func (t *TcpClient) handleConn(c net.Conn) { var ( printNo string //打印机编号 printStatus *GetPrintStatus ) if c == nil { globals.SugarLogger.Debugf("conn is nil") return } defer c.Close() buffer := make([]byte, 1024) for { n, err := c.Read(buffer) if err != nil { if err == io.EOF { fmt.Println("connection close") } else { fmt.Println("ReadString err", err) } return } //也可能是查状态的 if err = json.Unmarshal(buffer[:n], &printStatus); err == nil { fmt.Println("handleConn msg: ", string(buffer[:n])) if printStatus != nil { status := t.getPrintStatus(printStatus.PrintNo) c.Write([]byte(utils.Int2Str(status))) c.Close() } } //看是心跳还是打印返回 data := hex.EncodeToString(buffer[:n]) //证明是心跳 if strings.Contains(data, heartText) { globals.SugarLogger.Debugf("handleConn heart: %v", data) printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8]) printNo = string(printNoData) globals.SugarLogger.Debugf("handleConn printno :[%v]", printNo) status := printStatus2JxStatus(data[len(data)-8 : len(data)-6]) if t.Clients[printNo] == nil { timeoutChan <- 1 t.addConn(c, printNo, status) t.buildCallBackMap(printNo) t.buildMsgMap(printNo) t.HandleTcpMessages(printNo) t.doPrint(printNo) if status == printerStatusOnline { t.printFail() } } changePrinterStatus(printNo, status) if t.getPrintStatus(printNo) != status { t.setPrintStatus(printNo, status) } globals.SugarLogger.Debugf("handleConn print model %v", utils.Format4Output(t.Clients[printNo], true)) //} } else if strings.Contains(data, printText) { globals.SugarLogger.Debugf("handleConn print callback: %v", data) _, printNo = getCallbackMsgInfo(data) t.addCallbackChan(printNo, data) } } } func (t *TcpClient) printFail() (err error) { //新开机的打印失败和错误的 var ( db = dao.GetDB() ) prints, _ := dao.GetPrintMsgs(db, "", []int{printMsgFail, printMsgErr, printMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999) for _, printMsg := range prints { t.addMsgChan(printMsg) } return err } func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) (err error) { var ( db = dao.GetDB() comment string status int ) //1、先找出打印机编号和订单序列号,这两个确定唯一一条消息? //2、打印成功改变打印表的状态 if strings.Contains(data, printSuccessText) { //1e001802000150323032313036313530303030313000013c status = printMsgSuccess } else { //打印失败也改变状态并更新失败原因 status = printMsgFail comment = printErrMap[data[12:14]] } if printMsgs, err := dao.GetPrintMsgNoPage(db, printNo, orderNo); err != nil { globals.SugarLogger.Debugf("changePrintMsg err :[%v]", err) return err } else if len(printMsgs) == 0 { globals.SugarLogger.Debugf("changePrintMsg err ,not found printMsg printNo:[%v], orderNo :[%v]", printNo, orderNo) } else if len(printMsgs) > 0 { for _, v := range printMsgs { v.Comment = comment v.Status = status dao.UpdateEntity(db, v, "Comment", "Status") } } return err } func (t *TcpClient) HandleTcpMessages(printNo string) { var ( db = dao.GetDB() offset, pageSize = 0, 1 ) if !t.isExistMsg(printNo) { return } globals.SugarLogger.Debugf("build HandleTcpMessages printNo: %s", printNo) go func() { for { //一直读? prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, time.Now().Add(-time.Hour*3), time.Now(), offset, pageSize) for _, printMsg := range prints { t.addMsgChan(printMsg) printMsg.Status = printMsgAlreadyLoad dao.UpdateEntity(db, printMsg, "Status") } } }() } func (t *TcpClient) doPrint(key string) (err error) { var ( db = dao.GetDB() ) if !t.isExistMsg(key) { return err } globals.SugarLogger.Debugf("doPrint printNo: %s", key) go func() { for { select { case printMsg := <-t.MsgMap[key]: var ( data []byte c net.Conn ) if printMsg != nil { if err = checkPrintMsg(db, printMsg); err == nil { status := t.getPrintStatus(printMsg.PrintNo) switch status { case printerStatusOnline: if c = t.getPrintConn(printMsg.PrintNo); c != nil { data, err = buildMsg(printMsg) } case printerStatusOffline: err = fmt.Errorf("打印机离线!") case printerStatusOnlineWithoutPaper: err = fmt.Errorf("打印机缺纸!") default: err = fmt.Errorf("打印机状态未知!") } } } else { err = fmt.Errorf("未查询到此printMsg") } if err != nil { globals.SugarLogger.Debugf("doPrint2 err printNo:%s, msgID:%s, printInfo: %v, err: %s", printMsg.PrintNo, printMsg.MsgID, utils.Format4Output(t.Clients[printMsg.PrintNo], true), err.Error()) //t.delConn(printMsg.PrintNo) printMsg.Status = printMsgErr printMsg.Comment = err.Error() dao.UpdateEntity(db, printMsg, "Status", "Comment") } else { if c != nil { if _, err = c.Write(data); err != nil { globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err) t.delConn(printMsg.PrintNo) } else { globals.SugarLogger.Debugf("handleTcpMessages success, data: %v", hex.EncodeToString(data)) dataStr := <-t.CallBackMap[printMsg.PrintNo] a, b := getCallbackMsgInfo(dataStr) t.changePrintMsg(dataStr, a, b) } } } case <-timeoutChan: } } }() return err } func (t *TcpClient) HandleCheckTcpHeart() { for { keys := []string{} t.RLock() for k, v := range t.Clients { if time.Now().Sub(v.StatusTime) > time.Minute+time.Second { v.Status = printerStatusOffline keys = append(keys, k) } } t.RUnlock() for _, v := range keys { changePrinterStatus(v, printerStatusOffline) if t.isExist(v) { t.Lock() delete(t.Clients, v) t.Unlock() } } } }