310 lines
8.6 KiB
Go
310 lines
8.6 KiB
Go
package event
|
||
|
||
import (
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"fmt"
|
||
"git.rosy.net.cn/baseapi/utils"
|
||
"git.rosy.net.cn/jx-callback/business/model/dao"
|
||
"git.rosy.net.cn/jx-callback/globals"
|
||
"io"
|
||
"net"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
//入口
|
||
func ListenTcp() {
|
||
t := NewTcpClient()
|
||
l, err := net.Listen("tcp", ":8000")
|
||
if err != nil {
|
||
fmt.Println("listen error:", err)
|
||
return
|
||
}
|
||
globals.SugarLogger.Debugf("begin listenTcp port 8000......")
|
||
for {
|
||
c, err := l.Accept()
|
||
if err != nil {
|
||
fmt.Println("accept error:", err)
|
||
break
|
||
}
|
||
go t.handleConn(c)
|
||
}
|
||
}
|
||
|
||
func (t *TcpClient) handleConn(c net.Conn) {
|
||
var (
|
||
printNo string //打印机编号
|
||
printStatus *GetPrintStatus
|
||
)
|
||
if c == nil {
|
||
globals.SugarLogger.Debugf("conn is nil")
|
||
return
|
||
}
|
||
defer c.Close()
|
||
buffer := make([]byte, 1024)
|
||
for {
|
||
n, err := c.Read(buffer)
|
||
if err != nil {
|
||
if err == io.EOF {
|
||
fmt.Println("connection close")
|
||
} else {
|
||
fmt.Println("ReadString err", err)
|
||
}
|
||
return
|
||
}
|
||
//也可能是查状态的(openAPI外部查询,一次性的)
|
||
if err = json.Unmarshal(buffer[:n], &printStatus); err == nil {
|
||
fmt.Println("handleConn msg: ", string(buffer[:n]))
|
||
if printStatus != nil {
|
||
status := t.getPrintStatus(printStatus.PrintNo)
|
||
c.Write([]byte(utils.Int2Str(status)))
|
||
c.Close()
|
||
}
|
||
}
|
||
//看是心跳还是打印回调
|
||
data := hex.EncodeToString(buffer[:n])
|
||
//证明是心跳
|
||
if strings.Contains(data, heartText) {
|
||
globals.SugarLogger.Debugf("handleConn heart: %v", data)
|
||
printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
|
||
printNo = string(printNoData)
|
||
globals.SugarLogger.Debugf("handleConn printno :[%v]", printNo)
|
||
status := printStatus2JxStatus(data[len(data)-8 : len(data)-6])
|
||
//如果没在连接池里
|
||
//1、加到连接池中,不同的打印机no开不同的go
|
||
//2、初始化channel,每个打印机一个,放打印消息和打印回调消息
|
||
//3、读数据库里的待打印信息,放到打印channel中
|
||
//4、读打印channel并打印,并切等待回调channel中的消息
|
||
//5、修改数据库中打印机状态(没在连接池中说明是重新连接的)
|
||
//6、监听心跳时间,超过2分钟就clear掉
|
||
if t.Clients[printNo] == nil {
|
||
t.addConn(c, printNo, status)
|
||
t.buildCallBackMap(printNo)
|
||
t.buildMsgMap(printNo)
|
||
t.HandleTcpMessages(printNo)
|
||
t.doPrint(printNo)
|
||
if status == printerStatusOnline {
|
||
//t.printFail()
|
||
}
|
||
changePrinterStatus(printNo, status)
|
||
t.HandleCheckTcpHeart(printNo)
|
||
} else {
|
||
//在加到连接池中已经更新了时间,所以放在else里
|
||
t.setPrintStatusTime(printNo)
|
||
}
|
||
//状态不一致再更新状态(可能缺纸了,过热了等)
|
||
if t.getPrintStatus(printNo) != status {
|
||
t.setPrintStatus(printNo, status)
|
||
changePrinterStatus(printNo, status)
|
||
}
|
||
globals.SugarLogger.Debugf("handleConn print model %v", utils.Format4Output(t.Clients[printNo], true))
|
||
} else if strings.Contains(data, printText) { //打印回调
|
||
globals.SugarLogger.Debugf("handleConn print callback: %v", data)
|
||
//打印消息发送后,打印机会回调该条打印消息的状态(打印成功or失败,失败原因..)
|
||
//将回调的信息放到回调channel中,打印成功后再打印下一条消息
|
||
_, printNo = getCallbackMsgInfo(data)
|
||
t.addCallbackChan(printNo, data)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (t *TcpClient) printFail() (err error) {
|
||
//新开机的打印失败和错误的
|
||
var (
|
||
db = dao.GetDB()
|
||
)
|
||
prints, _ := dao.GetPrintMsgs(db, "", []int{printMsgFail, printMsgErr, printMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999)
|
||
for _, printMsg := range prints {
|
||
t.addMsgChan(printMsg)
|
||
}
|
||
return err
|
||
}
|
||
|
||
func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) (err error) {
|
||
var (
|
||
db = dao.GetDB()
|
||
comment string
|
||
status int
|
||
)
|
||
//1、先找出打印机编号和订单序列号,这两个确定唯一一条消息?
|
||
//通过参数传进来
|
||
//2、打印成功改变打印表的状态
|
||
if strings.Contains(data, printSuccessText) {
|
||
//1e001802000150323032313036313530303030313000013c 成功消息例子
|
||
status = printMsgSuccess
|
||
} else {
|
||
//打印失败也改变状态并更新失败原因
|
||
status = printMsgFail
|
||
comment = printErrMap[data[12:14]]
|
||
}
|
||
//这里序号重复会有问题
|
||
if printMsgs, err := dao.GetPrintMsgNoPage(db, printNo, orderNo); err != nil {
|
||
globals.SugarLogger.Debugf("changePrintMsg err :[%v]", err)
|
||
return err
|
||
} else if len(printMsgs) == 0 {
|
||
globals.SugarLogger.Debugf("changePrintMsg err ,not found printMsg printNo:[%v], orderNo :[%v]", printNo, orderNo)
|
||
} else if len(printMsgs) > 0 {
|
||
for _, v := range printMsgs {
|
||
v.Comment = comment
|
||
v.Status = status
|
||
dao.UpdateEntity(db, v, "Comment", "Status")
|
||
}
|
||
}
|
||
return err
|
||
}
|
||
|
||
func (t *TcpClient) HandleTcpMessages(printNo string) {
|
||
var (
|
||
db = dao.GetDB()
|
||
offset, pageSize = 0, 1
|
||
)
|
||
if !t.isExistMsg(printNo) {
|
||
return
|
||
}
|
||
globals.SugarLogger.Debugf("build HandleTcpMessages printNo: %s", printNo)
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-timeoutChan:
|
||
globals.SugarLogger.Debugf("HandleTcpMessages timeout")
|
||
return
|
||
default:
|
||
//一直读?
|
||
prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, time.Now().Add(-time.Hour*3), time.Now(), offset, pageSize)
|
||
for _, printMsg := range prints {
|
||
printMsg.Status = printMsgAlreadyLoad
|
||
//先避免重复读再插到channel?
|
||
if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
|
||
t.addMsgChan(printMsg)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
func (t *TcpClient) doPrint(key string) (err error) {
|
||
var (
|
||
db = dao.GetDB()
|
||
)
|
||
if !t.isExistMsg(key) {
|
||
return err
|
||
}
|
||
globals.SugarLogger.Debugf("doPrint printNo: %s", key)
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-timeoutChan:
|
||
globals.SugarLogger.Debugf("doPrint timeout")
|
||
return
|
||
default:
|
||
printMsg, ok := <-t.MsgMap[key]
|
||
if !ok {
|
||
return
|
||
}
|
||
var (
|
||
data []byte
|
||
c net.Conn
|
||
)
|
||
if printMsg != nil {
|
||
if err = checkPrintMsg(db, printMsg); err == nil {
|
||
status := t.getPrintStatus(printMsg.PrintNo)
|
||
switch status {
|
||
//只有在线才打印内容
|
||
case printerStatusOnline:
|
||
if c = t.getPrintConn(printMsg.PrintNo); c != nil {
|
||
data, err = buildMsg(printMsg)
|
||
}
|
||
case printerStatusOffline:
|
||
err = fmt.Errorf("打印机离线!")
|
||
case printerStatusOnlineWithoutPaper:
|
||
err = fmt.Errorf("打印机缺纸!")
|
||
default:
|
||
err = fmt.Errorf("打印机状态未知!")
|
||
}
|
||
}
|
||
if err != nil {
|
||
globals.SugarLogger.Debugf("doPrint2 err printNo:%s, msgID:%s, err: %s", printMsg.PrintNo, printMsg.MsgID, err.Error())
|
||
//t.delConn(printMsg.PrintNo)
|
||
printMsg.Status = printMsgErr
|
||
printMsg.Comment = err.Error()
|
||
dao.UpdateEntity(db, printMsg, "Status", "Comment")
|
||
if t.isExist(key) {
|
||
timeoutChan <- 1
|
||
timeoutChan <- 1
|
||
timeoutChan <- 1
|
||
t.clear(key)
|
||
}
|
||
return
|
||
} else {
|
||
if c != nil {
|
||
if _, err = c.Write(data); err != nil {
|
||
globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err)
|
||
//t.delConn(printMsg.PrintNo)
|
||
if t.isExist(key) {
|
||
timeoutChan <- 1
|
||
timeoutChan <- 1
|
||
timeoutChan <- 1
|
||
t.clear(key)
|
||
}
|
||
return
|
||
} else {
|
||
globals.SugarLogger.Debugf("handleTcpMessages success, data: %v", hex.EncodeToString(data))
|
||
//等待回调
|
||
dataStr := <-t.CallBackMap[printMsg.PrintNo]
|
||
a, b := getCallbackMsgInfo(dataStr)
|
||
t.changePrintMsg(dataStr, a, b)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
return err
|
||
}
|
||
|
||
//检测心跳
|
||
func (t *TcpClient) HandleCheckTcpHeart(key string) {
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-timeoutChan:
|
||
globals.SugarLogger.Debugf("HandleCheckTcpHeart timeout")
|
||
return
|
||
default:
|
||
if !utils.IsTimeZero(t.getPrintStatusTime(key)) {
|
||
//2分钟内没心跳判断打印机掉线了
|
||
if time.Now().Sub(t.getPrintStatusTime(key)) > time.Minute*2 {
|
||
changePrinterStatus(key, printerStatusOffline)
|
||
if t.isExist(key) {
|
||
timeoutChan <- 1
|
||
timeoutChan <- 1
|
||
timeoutChan <- 1
|
||
t.clear(key)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
//keys := []string{}
|
||
//t.RLock()
|
||
//for k, v := range t.Clients {
|
||
// if time.Now().Sub(v.StatusTime) > time.Minute+time.Second {
|
||
// v.Status = printerStatusOffline
|
||
// keys = append(keys, k)
|
||
// }
|
||
//}
|
||
//t.RUnlock()
|
||
//globals.SugarLogger.Debugf("HandleCheckTcpHeart del keys: %v", keys)
|
||
//for _, v := range keys {
|
||
// changePrinterStatus(v, printerStatusOffline)
|
||
// if t.isExist(v) {
|
||
// t.clear(v)
|
||
// }
|
||
//}
|
||
}
|
||
}()
|
||
}
|