Files
jx-callback/business/jxstore/event/event_tcp.go
邹宗楠 bab030bf39 1'
2022-11-04 14:58:14 +08:00

339 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package event
import (
"encoding/hex"
"fmt"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/globals"
"io"
"net"
"strings"
"time"
"unicode/utf8"
)
//入口
func ListenTcp() {
t := NewTcpClient()
l, err := net.Listen("tcp", ":8000")
if err != nil {
fmt.Println("listen error:", err)
return
}
globals.SugarLogger.Debugf("begin listenTcp port 8000......")
for {
c, err := l.Accept()
if err != nil {
fmt.Println("accept error:", err)
break
}
go t.handleConn(c)
}
}
func (t *TcpClient) handleConn(c net.Conn) {
globals.SugarLogger.Debugf("conn time %v", time.Now())
if c == nil {
globals.SugarLogger.Debugf("conn is nil")
return
}
defer c.Close()
for {
var (
printNo string //打印机编号
buffer = make([]byte, 1024)
)
n, err := c.Read(buffer)
if err != nil {
if err == io.EOF {
fmt.Println("connection close")
} else {
fmt.Println("ReadString err", err)
}
return
}
//看是心跳还是打印回调
data := hex.EncodeToString(buffer[:n])
globals.SugarLogger.Debugf("data=============:%s", data)
//证明是心跳
if strings.Contains(data, heartText) || strings.Contains(data, heartTextNew) {
printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
printNo = string(printNoData)
status := printStatus2JxStatus(data[len(data)-8 : len(data)-6])
globals.SugarLogger.Debugf("输出打印机编号printNo:[%s],和状态printStatus[%d],打印时间time:[%v]", printNo, status, time.Now())
//如果没在连接池里
//1、加到连接池中不同的打印机no开不同的goroutine
//2、初始化channel每个打印机一个放打印消息和打印回调消息
//3、读数据库里的待打印信息放到打印channel中
//4、读打印channel并打印并切等待回调channel中的消息
//5、修改数据库中打印机状态没在连接池中说明是重新连接的
//6、监听心跳时间超过1分多钟就clear掉
//globals.SugarLogger.Debugf("handleConn timeout channel...: %v", t.TimeoutMap[printNo])
if t.Clients[printNo] == nil {
t.addConn(c, printNo, status)
t.buildAllMap(printNo)
t.HandleTcpMessages(printNo)
t.doPrint(printNo)
if status == printerStatusOnline {
//t.printFail()
}
changePrinterStatus(printNo, status)
t.HandleCheckTcpHeart(printNo)
// todo 证明打印机已经被激活,将激活打印机存入数据库,保证用户不能无限制绑定打印机
if err := dao.NotExistsCreate(printNo); err != nil {
globals.SugarLogger.Debugf("监听打印机心跳,不存在则创建 :[%v]", err)
}
} else {
//在加到连接池中已经更新了时间所以放在else里
t.setPrintStatusTime(printNo)
}
//状态不一致再更新状态(可能缺纸了,过热了等)
globals.SugarLogger.Debugf("比较缓存状态和真实状态缓存状态[%d],真实状态[%d]", t.getPrintStatus(printNo), status)
if t.getPrintStatus(printNo) != status {
globals.SugarLogger.Debugf("更新打印机状态到数据库和缓存[%s:%d]", printNo, status)
t.setPrintStatus(printNo, status)
changePrinterStatus(printNo, status)
}
} else if strings.Contains(data, printText) || strings.Contains(data, printTextNew) { //打印回调
//打印消息发送后打印机会回调该条打印消息的状态打印成功or失败失败原因..
//将回调的信息放到回调channel中打印成功后再打印下一条消息
_, printNo = getCallbackMsgInfo(data)
//更新打印机心跳时间(打印机本身不会在打印的同时,或回调的同时发心跳消息,会导致心跳判断超时,这里更新一下)
t.setPrintStatusTime(printNo)
t.addCallbackChan(printNo, data)
}
}
}
func (t *TcpClient) printFail() (err error) {
//新开机的打印失败和错误的
var (
db = dao.GetDB()
)
prints, _ := dao.GetPrintMsgs(db, "", []int{printMsgFail, printMsgErr, printMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999)
for _, printMsg := range prints {
t.addMsgChan(printMsg)
}
return err
}
func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) (err error) {
var (
db = dao.GetDB()
comment string
status int
)
//1、先找出打印机编号和订单序列号这两个确定唯一一条消息?
//通过参数传进来
//2、打印成功改变打印表的状态
if strings.Contains(data, printSuccessText) || strings.Contains(data, printSuccessTextNew) {
//1e001802000150323032313036313530303030313000013c 成功消息例子
status = printMsgSuccess
} else {
//打印失败也改变状态并更新失败原因
status = printMsgFail
comment = printErrMap[data[12:14]]
}
//这里序号重复会有问题
if printMsgs, err := dao.GetPrintMsgNoPage(db, printNo, orderNo); err != nil {
globals.SugarLogger.Debugf("changePrintMsg err :[%v]", err)
return err
} else if len(printMsgs) == 0 {
globals.SugarLogger.Debugf("changePrintMsg err ,not found printMsg printNo:[%v], orderNo :[%v]", printNo, orderNo)
} else if len(printMsgs) > 0 {
for _, v := range printMsgs {
v.Comment = comment
v.Status = status
dao.UpdateEntity(db, v, "Comment", "Status")
}
}
return err
}
func (t *TcpClient) HandleTcpMessages(printNo string) {
var (
db = dao.GetDB()
offset, pageSize = 0, 1
)
if !t.isExistMsg(printNo) {
return
}
globals.SugarLogger.Debugf("HandleTcpMessages printNo: %s", printNo)
go func() {
for {
select {
case <-t.TimeoutMap[printNo]:
globals.SugarLogger.Debugf("HandleTcpMessages timeout")
return
default:
//一直读?
prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, time.Now().Add(-time.Hour*3), time.Now(), offset, pageSize)
for _, printMsg := range prints {
printMsg.Status = printMsgAlreadyLoad
//先避免重复读再插到channel
if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
if err = t.addMsgChan(printMsg); err != nil {
globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
}
}
}
}
}
}()
}
func (t *TcpClient) doPrint(key string) (err error) {
var (
db = dao.GetDB()
)
if !t.isExistMsg(key) {
return err
}
go func() {
for {
select {
case <-t.TimeoutMap[key]:
globals.SugarLogger.Debugf("doPrint timeout")
return
default:
select {
case printMsg, ok := <-t.MsgMap[key]:
if !ok {
globals.SugarLogger.Debugf("doPrint err !ok ...")
return
}
var (
data []byte
c net.Conn
)
if printMsg != nil {
if err = checkPrintMsg(db, printMsg); err == nil {
status := t.getPrintStatus(printMsg.PrintNo)
switch status {
//只有在线才打印内容
case printerStatusOnline:
if c = t.getPrintConn(printMsg.PrintNo); c != nil {
data, err = buildMsg(printMsg)
}
case printerStatusOffline:
err = fmt.Errorf("打印机离线!")
case printerStatusOnlineWithoutPaper:
err = fmt.Errorf("打印机缺纸!")
default:
err = fmt.Errorf("打印机状态未知!")
}
}
if err != nil {
//t.delConn(printMsg.PrintNo)
printMsg.Status = printMsgErr
printMsg.Comment = err.Error()
dao.UpdateEntity(db, printMsg, "Status", "Comment")
close(t.TimeoutMap[key])
} else {
if c != nil {
encryption := ""
for _, v := range data {
encryption += fmt.Sprintf("%d", v)
}
printMsg.ContentEncryption = encryption
dao.UpdateEntity(db, printMsg, "ContentEncryption")
if _, err = c.Write(data); err != nil {
globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err)
//t.delConn(printMsg.PrintNo)
close(t.TimeoutMap[key])
} else {
//等待回调
dataStr := <-t.CallBackMap[printMsg.PrintNo]
if dataStr != "" {
a, b := getCallbackMsgInfo(dataStr)
t.changePrintMsg(dataStr, a, b)
// 查询打印机是否扣费,未扣费就扣费,已经扣费不做处理
have, err := dao.QueryOrderDeductionRecord(db, b, utils.Int64ToStr(a))
if err != nil && !have {
// 扣除打印机账号金额
if err = dao.DeductionPrintBalance(db, b); err != nil {
globals.SugarLogger.Debugf("扣除用户打印机金额错误 %s", err)
} else {
// 添加打印记录(支出记录)
if err = dao.AddPrintRecord(db, &model.PrintBillRecord{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
PrintNo: b,
PayType: 2,
PayMoney: 1, // 固定支出一分钱
OrderId: utils.Int64ToStr(a),
UserId: "",
}); err != nil {
globals.SugarLogger.Debugf("添加打印机订单支付记录错误 %s", err)
}
}
} else {
globals.SugarLogger.Debugf("查询打印机扣费记录错误 %s", err)
}
//判断音频暂停?
//收到打印成功回调后,如果消息中有音频,需要等待一下,等上一个音频播完
//暂停时间就暂时取的sound标签内内容长度/2
if sounds := regexpSoundSpan.FindStringSubmatch(printMsg.Content); len(sounds) > 0 {
sound := sounds[1]
lenTime := time.Duration(utf8.RuneCountInString(sound)) * time.Second
time.Sleep(lenTime / 2)
}
}
}
}
}
}
}
}
}
}()
return err
}
//检测心跳
func (t *TcpClient) HandleCheckTcpHeart(key string) {
globals.SugarLogger.Debugf("HandleCheckTcpHeart begin key: %s", key)
go func() {
for {
select {
case <-t.TimeoutMap[key]:
t.Clients[key].C.Close()
close(t.MsgMap[key])
close(t.CallBackMap[key])
delete(t.Clients, key)
return
default:
statusTime := t.getPrintStatusTime(key)
if !utils.IsTimeZero(statusTime) {
//1分钟内没心跳判断打印机掉线了
if time.Now().Sub(statusTime) > time.Minute+time.Second*10 {
changePrinterStatus(key, printerStatusOffline)
//t.clear(key)
close(t.TimeoutMap[key])
}
}
}
//keys := []string{}
//t.RLock()
//for k, v := range t.Clients {
// if time.Now().Sub(v.StatusTime) > time.Minute+time.Second {
// v.Status = printerStatusOffline
// keys = append(keys, k)
// }
//}
//t.RUnlock()
//globals.SugarLogger.Debugf("HandleCheckTcpHeart del keys: %v", keys)
//for _, v := range keys {
// changePrinterStatus(v, printerStatusOffline)
// if t.isExist(v) {
// t.clear(v)
// }
//}
}
}()
}