package event import ( "encoding/hex" "fmt" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/globals" "io" "net" "strings" "time" "unicode/utf8" ) //入口 func ListenTcp() { t := NewTcpClient() l, err := net.Listen("tcp", ":8000") if err != nil { fmt.Println("listen error:", err) return } globals.SugarLogger.Debugf("begin listenTcp port 8000......") for { c, err := l.Accept() if err != nil { fmt.Println("accept error:", err) break } go t.handleConn(c) } } func (t *TcpClient) handleConn(c net.Conn) { if c == nil { globals.SugarLogger.Debugf("conn is nil") return } defer c.Close() for { var ( printNo string //打印机编号 //printStatus *GetPrintStatus buffer = make([]byte, 1024) ) n, err := c.Read(buffer) if err != nil { if err == io.EOF { fmt.Println("connection close") } else { fmt.Println("ReadString err", err) } return } //也可能是查状态的(openAPI外部查询,一次性的) //if err = json.Unmarshal(buffer[:n], &printStatus); err == nil { // fmt.Println("handleConn msg: ", string(buffer[:n])) // if printStatus != nil { // status := t.getPrintStatus(printStatus.PrintNo) // c.Write([]byte(utils.Int2Str(status))) // c.Close() // } //} //看是心跳还是打印回调 data := hex.EncodeToString(buffer[:n]) //证明是心跳 if strings.Contains(data, heartText) { globals.SugarLogger.Debugf("handleConn heart: %v", data) printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8]) printNo = string(printNoData) globals.SugarLogger.Debugf("handleConn printno :[%v]", printNo) status := printStatus2JxStatus(data[len(data)-8 : len(data)-6]) //如果没在连接池里 //1、加到连接池中,不同的打印机no开不同的goroutine //2、初始化channel,每个打印机一个,放打印消息和打印回调消息 //3、读数据库里的待打印信息,放到打印channel中 //4、读打印channel并打印,并切等待回调channel中的消息 //5、修改数据库中打印机状态(没在连接池中说明是重新连接的) //6、监听心跳时间,超过1分多钟就clear掉 //globals.SugarLogger.Debugf("handleConn timeout channel...: %v", t.TimeoutMap[printNo]) if t.Clients[printNo] == nil { t.addConn(c, printNo, status) t.buildAllMap(printNo) t.HandleTcpMessages(printNo) t.doPrint(printNo) if status == printerStatusOnline { //t.printFail() } changePrinterStatus(printNo, status) t.HandleCheckTcpHeart(printNo) } else { //在加到连接池中已经更新了时间,所以放在else里 t.setPrintStatusTime(printNo) } //状态不一致再更新状态(可能缺纸了,过热了等) if t.getPrintStatus(printNo) != status { t.setPrintStatus(printNo, status) changePrinterStatus(printNo, status) } globals.SugarLogger.Debugf("handleConn print model %v", utils.Format4Output(t.Clients[printNo], true)) } else if strings.Contains(data, printText) { //打印回调 globals.SugarLogger.Debugf("handleConn print callback: %v", data) //打印消息发送后,打印机会回调该条打印消息的状态(打印成功or失败,失败原因..) //将回调的信息放到回调channel中,打印成功后再打印下一条消息 _, printNo = getCallbackMsgInfo(data) //更新打印机心跳时间(打印机本身不会在打印的同时,或回调的同时发心跳消息,会导致心跳判断超时,这里更新一下) t.setPrintStatusTime(printNo) globals.SugarLogger.Debugf("handleConn print callback statusTime: %v", t.Clients[printNo].StatusTime) t.addCallbackChan(printNo, data) } } } func (t *TcpClient) printFail() (err error) { //新开机的打印失败和错误的 var ( db = dao.GetDB() ) prints, _ := dao.GetPrintMsgs(db, "", []int{printMsgFail, printMsgErr, printMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999) for _, printMsg := range prints { t.addMsgChan(printMsg) } return err } func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) (err error) { var ( db = dao.GetDB() comment string status int ) //1、先找出打印机编号和订单序列号,这两个确定唯一一条消息? //通过参数传进来 //2、打印成功改变打印表的状态 if strings.Contains(data, printSuccessText) { //1e001802000150323032313036313530303030313000013c 成功消息例子 status = printMsgSuccess } else { //打印失败也改变状态并更新失败原因 status = printMsgFail comment = printErrMap[data[12:14]] } //这里序号重复会有问题 if printMsgs, err := dao.GetPrintMsgNoPage(db, printNo, orderNo); err != nil { globals.SugarLogger.Debugf("changePrintMsg err :[%v]", err) return err } else if len(printMsgs) == 0 { globals.SugarLogger.Debugf("changePrintMsg err ,not found printMsg printNo:[%v], orderNo :[%v]", printNo, orderNo) } else if len(printMsgs) > 0 { for _, v := range printMsgs { v.Comment = comment v.Status = status dao.UpdateEntity(db, v, "Comment", "Status") } } return err } func (t *TcpClient) HandleTcpMessages(printNo string) { var ( db = dao.GetDB() offset, pageSize = 0, 1 ) if !t.isExistMsg(printNo) { return } globals.SugarLogger.Debugf("HandleTcpMessages printNo: %s", printNo) go func() { for { select { case <-t.TimeoutMap[printNo]: globals.SugarLogger.Debugf("HandleTcpMessages timeout") return default: //一直读? prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, time.Now().Add(-time.Hour*3), time.Now(), offset, pageSize) for _, printMsg := range prints { printMsg.Status = printMsgAlreadyLoad //先避免重复读再插到channel? if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil { if err = t.addMsgChan(printMsg); err != nil { globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err) } } } } } }() } func (t *TcpClient) doPrint(key string) (err error) { var ( db = dao.GetDB() ) if !t.isExistMsg(key) { return err } globals.SugarLogger.Debugf("doPrint printNo: %s", key) go func() { for { select { case <-t.TimeoutMap[key]: globals.SugarLogger.Debugf("doPrint timeout") return default: select { case printMsg, ok := <-t.MsgMap[key]: if !ok { globals.SugarLogger.Debugf("doPrint err !ok ...") return } var ( data []byte c net.Conn ) if printMsg != nil { if err = checkPrintMsg(db, printMsg); err == nil { status := t.getPrintStatus(printMsg.PrintNo) switch status { //只有在线才打印内容 case printerStatusOnline: if c = t.getPrintConn(printMsg.PrintNo); c != nil { data, err = buildMsg(printMsg) } case printerStatusOffline: err = fmt.Errorf("打印机离线!") case printerStatusOnlineWithoutPaper: err = fmt.Errorf("打印机缺纸!") default: err = fmt.Errorf("打印机状态未知!") } } if err != nil { globals.SugarLogger.Debugf("doPrint2 err printNo:%s, msgID:%s, err: %s", printMsg.PrintNo, printMsg.MsgID, err.Error()) //t.delConn(printMsg.PrintNo) printMsg.Status = printMsgErr printMsg.Comment = err.Error() dao.UpdateEntity(db, printMsg, "Status", "Comment") close(t.TimeoutMap[key]) } else { if c != nil { encryption := "" for _, v := range data { encryption += fmt.Sprintf("%d", v) } printMsg.ContentEncryption = encryption dao.UpdateEntity(db, printMsg, "ContentEncryption") if _, err = c.Write(data); err != nil { globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err) //t.delConn(printMsg.PrintNo) close(t.TimeoutMap[key]) } else { globals.SugarLogger.Debugf("handleTcpMessages success, data: %v", hex.EncodeToString(data)) //等待回调 dataStr := <-t.CallBackMap[printMsg.PrintNo] if dataStr != "" { a, b := getCallbackMsgInfo(dataStr) t.changePrintMsg(dataStr, a, b) // 查询打印机是否扣费,未扣费就扣费,已经扣费不做处理 have, err := dao.QueryOrderDeductionRecord(db, b, utils.Int64ToStr(a)) if err != nil && !have { // 扣除打印机账号金额 err = dao.DeductionPrintBalance(db, b) // 添加打印记录(支出记录) err = dao.AddPrintRecord(db, &model.PrintBillRecord{ CreatedAt: time.Now(), UpdatedAt: time.Now(), PrintNo: b, PayType: 2, PayMoney: 1, // 固定支出一分钱 OrderId: utils.Int64ToStr(a), UserId: "", }) globals.SugarLogger.Debugf("扣除用户打印机金额/添加打印机打印记录错误 %s", err) } else { globals.SugarLogger.Debugf("查询打印机扣费记录错误 %s", err) } //判断音频暂停? //收到打印成功回调后,如果消息中有音频,需要等待一下,等上一个音频播完 //暂停时间就暂时取的sound标签内内容长度/2 if sounds := regexpSoundSpan.FindStringSubmatch(printMsg.Content); len(sounds) > 0 { sound := sounds[1] lenTime := time.Duration(utf8.RuneCountInString(sound)) * time.Second time.Sleep(lenTime / 2) } } } } } } } } } }() return err } //检测心跳 func (t *TcpClient) HandleCheckTcpHeart(key string) { globals.SugarLogger.Debugf("HandleCheckTcpHeart begin key: %s", key) go func() { for { select { case <-t.TimeoutMap[key]: t.Clients[key].C.Close() close(t.MsgMap[key]) close(t.CallBackMap[key]) delete(t.Clients, key) globals.SugarLogger.Debugf("HandleCheckTcpHeart timeout") return default: statusTime := t.getPrintStatusTime(key) if !utils.IsTimeZero(statusTime) { //1分钟内没心跳判断打印机掉线了 if time.Now().Sub(statusTime) > time.Minute+time.Second*10 { changePrinterStatus(key, printerStatusOffline) globals.SugarLogger.Debugf("HandleCheckTcpHeart clear...") //t.clear(key) close(t.TimeoutMap[key]) } } } //keys := []string{} //t.RLock() //for k, v := range t.Clients { // if time.Now().Sub(v.StatusTime) > time.Minute+time.Second { // v.Status = printerStatusOffline // keys = append(keys, k) // } //} //t.RUnlock() //globals.SugarLogger.Debugf("HandleCheckTcpHeart del keys: %v", keys) //for _, v := range keys { // changePrinterStatus(v, printerStatusOffline) // if t.isExist(v) { // t.clear(v) // } //} } }() }