Files
jx-callback/business/jxstore/event/event_tcp.go
邹宗楠 637a878808 1
2026-02-03 17:52:04 +08:00

374 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package event
import (
"encoding/hex"
"errors"
"fmt"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/globals"
"io"
"net"
"strings"
"time"
"unicode/utf8"
)
// ConnRead 获取链接数据
func ConnRead(c net.Conn) ([]byte, int, error) {
buffer := make([]byte, 1024*2)
n, err := c.Read(buffer)
return buffer, n, err
}
// ListenTcp 入口
func ListenTcp() {
l, err := net.Listen("tcp", ":8000")
if err != nil {
fmt.Println("listen error:", err)
return
}
for {
c, err := l.Accept()
if err != nil || c == nil {
fmt.Println("accept error:", err)
break
}
fn := func() {
// 捕获异常 防止waitGroup阻塞
defer func() {
if err := recover(); err != nil {
fmt.Println("recover err = ", err)
return
}
}()
if err := handleConn(c); err != nil {
c.Close()
Poll.Wait()
Poll.Stop()
return
}
}
Poll.AddJob(fn)
}
}
func handleConn(c net.Conn) error {
if c == nil {
return errors.New("conn is nil")
}
for {
buffer, n, err := ConnRead(c)
printRemoteAddr := c.RemoteAddr().String()
printRemoteAddr = strings.Split(printRemoteAddr, ":")[0]
printNoByIP, _ := PrintAddrAndIp.GetPrintAddrAndIp(printRemoteAddr)
if err != nil {
if err == io.EOF {
fmt.Println("connection close")
} else {
fmt.Println("ReadString err:", err)
}
globals.SugarLogger.Debugf("--------printRemoteAddr := %s,printNo := %s", printRemoteAddr, printNoByIP)
if printNo, ok := PrintAddrAndIp.GetPrintAddrAndIp(printRemoteAddr); ok {
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddr)
PrintObject.DelPrintObj(printNo)
PrintIpAndAddr.DelPrintIpAndAddr(printRemoteAddr)
dao.ExecuteSQL(dao.GetDB(), `UPDATE printer SET status = -1,is_online = -1 WHERE print_no = ? `, []interface{}{printNo}...)
} else {
printStatusOff := make(map[string]int, 0)
for ip, pn := range PrintAddrAndIp.PrintObject {
if ip == printRemoteAddr {
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddr)
PrintIpAndAddr.DelPrintIpAndAddr(pn)
PrintObject.DelPrintObj(printNo)
} else if pn != "" {
printStatusOff[pn] = 1
}
}
}
return err
}
//看是心跳还是打印回调
data := hex.EncodeToString(buffer[:n])
var (
printNo string = "" //打印机编号
heartbeat bool = false
callback bool = false
)
if strings.Contains(data, heartText) || strings.Contains(data, heartTextNew) {
printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
printNo = string(printNoData)
heartbeat = true
} else if strings.Contains(data, printText) || strings.Contains(data, printTextNew) { //打印回调
_, printNo = getCallbackMsgInfo(data)
callback = true
}
t, ok := PrintObject.GetPrintObj(printNo)
if !ok || t.Clients[printNo] == nil || time.Now().Sub(t.Clients[printNo].StatusTime).Seconds() >= 120 {
t = NewTcpClient()
}
if heartbeat {
// 证明是心跳
Heartbeat(c, t, data, printNo, printRemoteAddr)
} else if callback {
// 打印回调
Callback(c, t, data, printNo)
}
}
}
func (t *TcpClient) printFail() (err error) {
//新开机的打印失败和错误的
var (
db = dao.GetDB()
)
prints, _ := dao.GetPrintMsgs(db, "", []int{printMsgFail, printMsgErr, PrintMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999)
for _, printMsg := range prints {
t.addMsgChan(printMsg)
}
return err
}
func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) (err error) {
var (
db = dao.GetDB()
comment string
status int
)
//1、先找出打印机编号和订单序列号这两个确定唯一一条消息?
//通过参数传进来
//2、打印成功改变打印表的状态
if strings.Contains(data, printSuccessText) || strings.Contains(data, printSuccessTextNew) {
status = printMsgSuccess
comment = "回调成功,修改打印状态"
} else {
//打印失败也改变状态并更新失败原因
status = printMsgFail
comment = printErrMap[data[12:14]]
}
//这里序号重复会有问题
if printMsgs, err := dao.GetPrintMsgNoPage(db, printNo, orderNo); err != nil {
globals.SugarLogger.Debugf("changePrintMsg err :[%v]", err)
return err
} else if len(printMsgs) == 0 {
globals.SugarLogger.Debugf("changePrintMsg err ,not found printMsg printNo:[%v], orderNo :[%v]", printNo, orderNo)
} else if len(printMsgs) > 0 {
for _, v := range printMsgs {
v.Comment = comment
v.Status = status
dao.UpdateEntity(db, v, "Comment", "Status")
}
}
return err
}
func HandleTcpMessages(t *TcpClient, printNo string) {
var (
db = dao.GetDB()
offset, pageSize = 0, 10
)
if !t.isExistMsg(printNo) {
return
}
fn := func() {
//for {
// time.Sleep(2 * time.Second)
if t.TimeoutMap[printNo] == true {
timeNow := time.Now()
timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location())
timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
prints, _ := dao.GetPrintMsgs(db, printNo, []int{PrintMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
for _, printMsg := range prints {
printMsg.Status = PrintMsgAlreadyLoad
//先避免重复读再插到channel
if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
if err = t.addMsgChan(printMsg); err != nil {
globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
}
}
}
} else {
globals.SugarLogger.Debugf("HandleTcpMessages timeout")
return
}
}
Poll.AddJob(fn)
}
func (t *TcpClient) readTimeoutMap(key string) bool {
t.Lock()
defer t.Unlock()
return t.TimeoutMap[key]
}
func doPrint(t *TcpClient, key string) (err error) {
var (
db = dao.GetDB()
)
if !t.isExistMsg(key) {
return err
}
fn := func() {
for {
if t.TimeoutMap[key] == true {
select {
case printMsg, ok := <-t.MsgMap[key]:
if !ok {
globals.SugarLogger.Debugf("doPrint err !ok ...")
return
}
var (
data []byte
c net.Conn
)
if printMsg == nil {
globals.SugarLogger.Debugf("print msg is nil")
continue
}
if err = checkPrintMsg(db, printMsg); err == nil {
status := t.getPrintStatus(printMsg.PrintNo)
switch status {
//只有在线才打印内容
case printerStatusOnline:
if c = t.getPrintConn(printMsg.PrintNo); c != nil {
data, err = buildMsg(printMsg)
}
case printerStatusOffline:
err = fmt.Errorf("打印机离线!")
case printerStatusOnlineWithoutPaper:
err = fmt.Errorf("打印机缺纸!")
default:
err = fmt.Errorf("打印机状态未知!")
}
}
if c == nil {
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
return
}
if err != nil {
printMsg.Status = printMsgErr
printMsg.Comment = err.Error()
dao.UpdateEntity(db, printMsg, "Status", "Comment")
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
return
}
if _, err = c.Write(data); err != nil {
globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err)
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
} else {
//等待回调
dataStr := <-t.CallBackMap[key]
if dataStr != "" {
a, b := getCallbackMsgInfo(dataStr)
t.changePrintMsg(dataStr, a, b)
// 查询打印机是否扣费,未扣费就扣费,已经扣费不做处理
have, err2 := dao.QueryOrderDeductionRecord(db, b, utils.Int64ToStr(a))
if err2 == nil && !have {
// 扣除打印机账号金额
if err = dao.DeductionPrintBalance(db, b); err != nil {
globals.SugarLogger.Debugf("扣除用户打印机金额错误 %s", err)
} else {
// 添加打印记录(支出记录)
if err = dao.AddPrintRecord(db, &model.PrintBillRecord{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
PrintNo: b,
PayType: 2,
PayMoney: 1, // 固定支出一分钱
OrderId: utils.Int64ToStr(a),
UserId: "",
}); err != nil {
globals.SugarLogger.Debugf("添加打印机订单支付记录错误 %s", err)
}
}
} else {
globals.SugarLogger.Debugf("今天已经扣除过了! %v %d %s", err2, a, b)
}
// 回调重置打印机状态时间
t.Clients[b].StatusTime = time.Now()
//判断音频暂停?
//收到打印成功回调后,如果消息中有音频,需要等待一下,等上一个音频播完
//暂停时间就暂时取的sound标签内内容长度/2
if sounds := regexpSoundSpan.FindStringSubmatch(printMsg.Content); len(sounds) > 0 {
sound := sounds[1]
lenTime := time.Duration(utf8.RuneCountInString(sound)) * time.Second
time.Sleep(lenTime / 2)
}
}
}
}
} else {
globals.SugarLogger.Debugf("doPrint timeout")
return
}
}
}
Poll.AddJob(fn)
return err
}
// HandleCheckTcpHeart 检测心跳
func HandleCheckTcpHeart(t *TcpClient, key string) {
if t.TimeoutMap[key] == true {
statusTime := t.getPrintStatusTime(key)
if !utils.IsTimeZero(statusTime) {
//1分钟内没心跳判断打印机掉线了
if time.Now().Sub(statusTime) > time.Second*75 {
globals.SugarLogger.Debugf("超过一分十秒没有心跳的打印机[%s],当前心跳时间: %s ,上一次心跳时间 : %s", key, utils.Time2TimeStr(time.Now()), utils.Time2TimeStr(statusTime))
changePrinterStatus(key, printerStatusOffline)
// 链接出错,彻底删除换成
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
}
}
} else {
t.getClients(key).C.Close()
close(t.MsgMap[key])
close(t.CallBackMap[key])
//t.delConn(key)
t.clear(key)
// 链接出错,彻底删除换成
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
return
}
}