This commit is contained in:
邹宗楠
2024-01-10 09:28:12 +08:00
parent 31ca7605c8
commit 5cd7764c6e
7 changed files with 223 additions and 202 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/jxstore/event"
"git.rosy.net.cn/jx-callback/business/jxutils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
@@ -379,10 +380,19 @@ func DoPrintMsg(appID int, msgID, printNo, content string, orderNo string) (err
OrderNo: orderNo,
MsgID: msgID,
}
dao.WrapAddIDCULDEntity(printMsg, "")
if err = dao.CreateEntity(db, printMsg); err != nil {
return err
}
t, ok := event.PrintObject[printNo]
if ok {
t.MsgMap[printNo] <- printMsg
printMsg.Status = event.PrintMsgAlreadyLoad
dao.UpdateEntity(db, printMsg, "Status")
}
return err
}

View File

@@ -0,0 +1,11 @@
package event
import (
"fmt"
)
var PrintObject = make(map[string]*TcpClient, 0)
func init() {
fmt.Println("初始化打印机对象")
}

View File

@@ -16,17 +16,14 @@ import (
"unicode/utf8"
)
var Poll *Pool
//var PrintPool map[string]*PrintPoolMap
func init() {
Poll = NewPool(500)
Poll.Start()
//PrintPool = make(map[string]*PrintPoolMap, 10000)
// ConnRead 获取链接数据
func ConnRead(c net.Conn) ([]byte, int, error) {
buffer := make([]byte, 1024*2)
n, err := c.Read(buffer)
return buffer, n, err
}
//入口
// ListenTcp 入口
func ListenTcp() {
l, err := net.Listen("tcp", ":8000")
if err != nil {
@@ -36,24 +33,22 @@ func ListenTcp() {
for {
c, err := l.Accept()
if err != nil {
if err != nil || c == nil {
fmt.Println("accept error:", err)
break
}
fn := func() {
// 捕获异常 防止waitGroup阻塞
defer func() {
// 捕获异常 防止waitGroup阻塞
if err := recover(); err != nil {
fmt.Println("recover err = ", err)
return
}
}()
t := NewTcpClient()
if err := handleConn(c, t); err != nil {
if err := handleConn(c); err != nil {
c.Close()
t = nil
Poll.Wait()
Poll.Stop()
return
@@ -63,13 +58,7 @@ func ListenTcp() {
}
}
func ConnRead(c net.Conn) ([]byte, int, error) {
buffer := make([]byte, 1024*2)
n, err := c.Read(buffer)
return buffer, n, err
}
func handleConn(c net.Conn, t *TcpClient) error {
func handleConn(c net.Conn) error {
if c == nil {
return errors.New("conn is nil")
}
@@ -87,6 +76,23 @@ func handleConn(c net.Conn, t *TcpClient) error {
//看是心跳还是打印回调
data := hex.EncodeToString(buffer[:n])
var printNo string //打印机编号
var heartbeat bool = false
var callback bool = false
if strings.Contains(data, heartText) || strings.Contains(data, heartTextNew) {
printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
printNo = string(printNoData)
heartbeat = true
} else if strings.Contains(data, printText) || strings.Contains(data, printTextNew) { //打印回调
_, printNo = getCallbackMsgInfo(data)
callback = true
}
t, ok := PrintObject[printNo]
if !ok {
t = NewTcpClient()
PrintObject[printNo] = t
}
if strings.Contains(string(buffer[0:n]), "print_no_clear") { // 清理缓存
param := struct {
PrintNoClear json.Number `json:"print_no_clear"`
@@ -100,62 +106,12 @@ func handleConn(c net.Conn, t *TcpClient) error {
return err
}
var printNo string //打印机编号
var heartbeat bool = false
var callback bool = false
if strings.Contains(data, heartText) || strings.Contains(data, heartTextNew) {
printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
printNo = string(printNoData)
heartbeat = true
} else if strings.Contains(data, printText) || strings.Contains(data, printTextNew) { //打印回调
_, printNo = getCallbackMsgInfo(data)
callback = true
}
//obj, ok := PrintPool[printNo]
//证明是心跳
if heartbeat {
//printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
//printNo = string(printNoData)
status := printStatus2JxStatus(data[len(data)-8 : len(data)-6])
//如果没在连接池里
//1、加到连接池中不同的打印机no开不同的goroutine
//2、初始化channel每个打印机一个放打印消息和打印回调消息
//3、读数据库里的待打印信息放到打印channel中
//4、读打印channel并打印并切等待回调channel中的消息
//5、修改数据库中打印机状态没在连接池中说明是重新连接的
//6、监听心跳时间超过1分多钟就clear掉
if t.getClients(printNo) == nil {
addConn(c, t, printNo, status)
buildAllMap(t, printNo)
//t.TimeoutMap[printNo] <- true
HandleTcpMessages(t, printNo)
doPrint(t, printNo)
if status == printerStatusOnline {
//t.printFail()
}
changePrinterStatus(printNo, status)
// todo 暂时关闭心跳检测
HandleCheckTcpHeart(t, printNo)
// todo 证明打印机已经被激活,将激活打印机存入数据库,保证用户不能无限制绑定打印机
if err := dao.NotExistsCreate(printNo); err != nil {
globals.SugarLogger.Debugf("监听打印机心跳,不存在则创建 :[%v],printNo[%s]", err, printNo)
}
} else {
//在加到连接池中已经更新了时间所以放在else里
t.setPrintStatusTime(printNo)
}
//状态不一致再更新状态(可能缺纸了,过热了等)
t.setPrintStatus(printNo, status)
changePrinterStatus(printNo, status)
} else if callback { //打印回调
//打印消息发送后打印机会回调该条打印消息的状态打印成功or失败失败原因..
//将回调的信息放到回调channel中打印成功后再打印下一条消息
//_, printNo = getCallbackMsgInfo(data)
//更新打印机心跳时间(打印机本身不会在打印的同时,或回调的同时发心跳消息,会导致心跳判断超时,这里更新一下)
t.setPrintStatusTime(printNo)
t.addCallbackChan(printNo, data)
// 证明是心跳
Heartbeat(c, t, data, printNo)
} else if callback {
// 打印回调
Callback(c, t, data, printNo)
}
}
}
@@ -165,7 +121,7 @@ func (t *TcpClient) printFail() (err error) {
var (
db = dao.GetDB()
)
prints, _ := dao.GetPrintMsgs(db, "", []int{printMsgFail, printMsgErr, printMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999)
prints, _ := dao.GetPrintMsgs(db, "", []int{printMsgFail, printMsgErr, PrintMsgAlreadyLoad, printMsgAlreadySend}, time.Now().Add(-time.Hour*3), time.Now(), 0, 999)
for _, printMsg := range prints {
t.addMsgChan(printMsg)
}
@@ -215,28 +171,28 @@ func HandleTcpMessages(t *TcpClient, printNo string) {
}
fn := func() {
for {
time.Sleep(2 * time.Second)
if t.TimeoutMap[printNo] == true {
timeNow := time.Now()
timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location())
timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
for _, printMsg := range prints {
printMsg.Status = printMsgAlreadyLoad
//先避免重复读再插到channel
if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
if err = t.addMsgChan(printMsg); err != nil {
globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
}
//for {
// time.Sleep(2 * time.Second)
if t.TimeoutMap[printNo] == true {
timeNow := time.Now()
timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location())
timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
for _, printMsg := range prints {
printMsg.Status = PrintMsgAlreadyLoad
//先避免重复读再插到channel
if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
if err = t.addMsgChan(printMsg); err != nil {
globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
}
}
} else {
globals.SugarLogger.Debugf("HandleTcpMessages timeout")
//Poll.Stop()
return
}
} else {
globals.SugarLogger.Debugf("HandleTcpMessages timeout")
//Poll.Stop()
return
}
//}
}
Poll.AddJob(fn)
@@ -248,7 +204,7 @@ func HandleTcpMessages(t *TcpClient, printNo string) {
// timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
// prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
// for _, printMsg := range prints {
// printMsg.Status = printMsgAlreadyLoad
// printMsg.Status = PrintMsgAlreadyLoad
// //先避免重复读再插到channel
// if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
// if err = t.addMsgChan(printMsg); err != nil {
@@ -292,81 +248,85 @@ func doPrint(t *TcpClient, key string) (err error) {
data []byte
c net.Conn
)
if printMsg != nil {
if err = checkPrintMsg(db, printMsg); err == nil {
status := t.getPrintStatus(printMsg.PrintNo)
switch status {
//只有在线才打印内容
case printerStatusOnline:
if c = t.getPrintConn(printMsg.PrintNo); c != nil {
data, err = buildMsg(printMsg)
}
case printerStatusOffline:
err = fmt.Errorf("打印机离线!")
case printerStatusOnlineWithoutPaper:
err = fmt.Errorf("打印机缺纸!")
default:
err = fmt.Errorf("打印机状态未知!")
if printMsg == nil {
globals.SugarLogger.Debugf("print msg is nil")
continue
}
if err = checkPrintMsg(db, printMsg); err == nil {
status := t.getPrintStatus(printMsg.PrintNo)
switch status {
//只有在线才打印内容
case printerStatusOnline:
if c = t.getPrintConn(printMsg.PrintNo); c != nil {
data, err = buildMsg(printMsg)
}
case printerStatusOffline:
err = fmt.Errorf("打印机离线!")
case printerStatusOnlineWithoutPaper:
err = fmt.Errorf("打印机缺纸!")
default:
err = fmt.Errorf("打印机状态未知!")
}
if err != nil {
printMsg.Status = printMsgErr
printMsg.Comment = err.Error()
dao.UpdateEntity(db, printMsg, "Status", "Comment")
delete(t.TimeoutMap, key)
} else {
if c != nil {
if _, err = c.Write(data); err != nil {
globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err)
//close(t.TimeoutMap[key])
delete(t.TimeoutMap, key)
}
if c == nil {
delete(PrintObject, key)
return
}
if err != nil {
printMsg.Status = printMsgErr
printMsg.Comment = err.Error()
dao.UpdateEntity(db, printMsg, "Status", "Comment")
delete(t.TimeoutMap, key)
return
}
if _, err = c.Write(data); err != nil {
globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err)
//close(t.TimeoutMap[key])
delete(t.TimeoutMap, key)
} else {
//等待回调
dataStr := <-t.CallBackMap[key]
if dataStr != "" {
a, b := getCallbackMsgInfo(dataStr)
t.changePrintMsg(dataStr, a, b)
// 查询打印机是否扣费,未扣费就扣费,已经扣费不做处理
have, err := dao.QueryOrderDeductionRecord(db, b, utils.Int64ToStr(a))
if err == nil && !have {
// 扣除打印机账号金额
if err = dao.DeductionPrintBalance(db, b); err != nil {
globals.SugarLogger.Debugf("扣除用户打印机金额错误 %s", err)
} else {
//等待回调
dataStr := <-t.CallBackMap[key]
if dataStr != "" {
a, b := getCallbackMsgInfo(dataStr)
t.changePrintMsg(dataStr, a, b)
// 查询打印机是否扣费,未扣费就扣费,已经扣费不做处理
have, err := dao.QueryOrderDeductionRecord(db, b, utils.Int64ToStr(a))
if err != nil && !have {
// 扣除打印机账号金额
if err = dao.DeductionPrintBalance(db, b); err != nil {
globals.SugarLogger.Debugf("扣除用户打印机金额错误 %s", err)
} else {
// 添加打印记录(支出记录)
if err = dao.AddPrintRecord(db, &model.PrintBillRecord{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
PrintNo: b,
PayType: 2,
PayMoney: 1, // 固定支出一分钱
OrderId: utils.Int64ToStr(a),
UserId: "",
}); err != nil {
globals.SugarLogger.Debugf("添加打印机订单支付记录错误 %s", err)
}
}
} else {
globals.SugarLogger.Debugf("查询打印机扣费记录错误 %s", err)
}
// 回调重置打印机状态时间
t.Clients[b].StatusTime = time.Now()
//判断音频暂停?
//收到打印成功回调后,如果消息中有音频,需要等待一下,等上一个音频播完
//暂停时间就暂时取的sound标签内内容长度/2
if sounds := regexpSoundSpan.FindStringSubmatch(printMsg.Content); len(sounds) > 0 {
sound := sounds[1]
lenTime := time.Duration(utf8.RuneCountInString(sound)) * time.Second
time.Sleep(lenTime / 2)
}
// 添加打印记录(支出记录)
if err = dao.AddPrintRecord(db, &model.PrintBillRecord{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
PrintNo: b,
PayType: 2,
PayMoney: 1, // 固定支出一分钱
OrderId: utils.Int64ToStr(a),
UserId: "",
}); err != nil {
globals.SugarLogger.Debugf("添加打印机订单支付记录错误 %s", err)
}
}
} else {
globals.SugarLogger.Debugf("查询打印机扣费记录错误 %s", err)
}
// 回调重置打印机状态时间
t.Clients[b].StatusTime = time.Now()
//判断音频暂停?
//收到打印成功回调后,如果消息中有音频,需要等待一下,等上一个音频播完
//暂停时间就暂时取的sound标签内内容长度/2
if sounds := regexpSoundSpan.FindStringSubmatch(printMsg.Content); len(sounds) > 0 {
sound := sounds[1]
lenTime := time.Duration(utf8.RuneCountInString(sound)) * time.Second
time.Sleep(lenTime / 2)
}
}
} else {
t = nil
//Poll.Stop()
globals.SugarLogger.Debugf("msgMap is nil")
}
}
} else {
@@ -473,29 +433,30 @@ func doPrint(t *TcpClient, key string) (err error) {
return err
}
//检测心跳
// HandleCheckTcpHeart 检测心跳
func HandleCheckTcpHeart(t *TcpClient, key string) {
fn := func() {
for {
if t.TimeoutMap[key] == true {
statusTime := t.getPrintStatusTime(key)
if !utils.IsTimeZero(statusTime) {
//1分钟内没心跳判断打印机掉线了
if time.Now().Sub(statusTime) > time.Second*75 {
globals.SugarLogger.Debugf("超过一分十秒没有心跳的打印机[%s],当前心跳时间: %s ,上一次心跳时间 : %s", key, utils.Time2TimeStr(time.Now()), utils.Time2TimeStr(statusTime))
changePrinterStatus(key, printerStatusOffline)
delete(t.TimeoutMap, key)
}
}
} else {
t.getClients(key).C.Close()
close(t.MsgMap[key])
close(t.CallBackMap[key])
t.delConn(key)
t = nil
return
//fn := func() {
//for {
if t.TimeoutMap[key] == true {
statusTime := t.getPrintStatusTime(key)
if !utils.IsTimeZero(statusTime) {
//1分钟内没心跳判断打印机掉线了
if time.Now().Sub(statusTime) > time.Second*75 {
globals.SugarLogger.Debugf("超过一分十秒没有心跳的打印机[%s],当前心跳时间: %s ,上一次心跳时间 : %s", key, utils.Time2TimeStr(time.Now()), utils.Time2TimeStr(statusTime))
changePrinterStatus(key, printerStatusOffline)
delete(t.TimeoutMap, key)
delete(PrintObject, key)
}
}
} else {
t.getClients(key).C.Close()
close(t.MsgMap[key])
close(t.CallBackMap[key])
t.delConn(key)
delete(PrintObject, key)
return
}
Poll.AddJob(fn)
//}
//}
//Poll.AddJob(fn)
}

View File

@@ -31,7 +31,7 @@ const (
printMsgWait = 0 //待打印
printMsgFail = -1 //打印失败(打印机报出)
printMsgErr = -2 //京西报出
printMsgAlreadyLoad = 3 //已放入队列
PrintMsgAlreadyLoad = 3 //已放入队列
heartErrNormal = "00" //正常
heartErrWithoutPaper = "04" //心跳错,缺纸
@@ -409,7 +409,7 @@ func buildMsg(printMsg *model.PrintMsg) (data []byte, err error) {
orderNoHexH, orderNoHexL = int2h8l8(no)
// 将数据与模板组装
printDataGBK, _ := jxutils.Utf8ToGbk([]byte(replaceContentOther(content)))
printDataGBK, _ := jxutils.Utf8ToGbk([]byte(utils.FilterEmoji(content)))
printData = hex.EncodeToString(printDataGBK)
printData = replaceContent(printData, printMsg)
lenData := int64(len(str) + len(const1) + len(orderNoHexH) + len(orderNoHexL) + len(printInit) + 2 + 4 + len(printData))
@@ -419,11 +419,6 @@ func buildMsg(printMsg *model.PrintMsg) (data []byte, err error) {
return jxutils.Hextob(dataStr + check), err
}
//替换特殊字符上面那个hextob转不了先替换一下
func replaceContentOther(content string) string {
return strings.ReplaceAll(strings.ReplaceAll(content, "⃣️", " "), "•", "-")
}
func getCheckSum(str string) (check string) {
var sum int64
for i := 0; i < len(str); i = i + 2 {
@@ -632,3 +627,50 @@ func xtob(x string) string {
base, _ := strconv.ParseInt(x, 16, 10)
return strconv.FormatInt(base, 2)
}
// Heartbeat 心跳回调
func Heartbeat(c net.Conn, t *TcpClient, data string, printNo string) {
//printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
//printNo = string(printNoData)
status := printStatus2JxStatus(data[len(data)-8 : len(data)-6])
//如果没在连接池里
//1、加到连接池中不同的打印机no开不同的goroutine
//2、初始化channel每个打印机一个放打印消息和打印回调消息
//3、读数据库里的待打印信息放到打印channel中
//4、读打印channel并打印并切等待回调channel中的消息
//5、修改数据库中打印机状态没在连接池中说明是重新连接的
//6、监听心跳时间超过1分多钟就clear掉
if t.getClients(printNo) == nil {
addConn(c, t, printNo, status)
buildAllMap(t, printNo)
//t.TimeoutMap[printNo] <- true
HandleTcpMessages(t, printNo)
doPrint(t, printNo)
if status == printerStatusOnline {
//t.printFail()
}
changePrinterStatus(printNo, status)
// todo 暂时关闭心跳检测
// HandleCheckTcpHeart(t, printNo)
// todo 证明打印机已经被激活,将激活打印机存入数据库,保证用户不能无限制绑定打印机
if err := dao.NotExistsCreate(printNo); err != nil {
globals.SugarLogger.Debugf("监听打印机心跳,不存在则创建 :[%v],printNo[%s]", err, printNo)
}
} else {
//在加到连接池中已经更新了时间所以放在else里
t.setPrintStatusTime(printNo)
}
//状态不一致再更新状态(可能缺纸了,过热了等)
t.setPrintStatus(printNo, status)
changePrinterStatus(printNo, status)
}
// Callback 打印成功回调
func Callback(c net.Conn, t *TcpClient, data string, printNo string) {
//打印消息发送后打印机会回调该条打印消息的状态打印成功or失败失败原因..
//将回调的信息放到回调channel中打印成功后再打印下一条消息
//_, printNo = getCallbackMsgInfo(data)
//更新打印机心跳时间(打印机本身不会在打印的同时,或回调的同时发心跳消息,会导致心跳判断超时,这里更新一下)
t.setPrintStatusTime(printNo)
t.addCallbackChan(printNo, data)
}

View File

@@ -4,6 +4,13 @@ import (
"sync"
)
var Poll *Pool
func init() {
Poll = NewPool(500)
Poll.Start()
}
type Job func()
type Worker struct {

View File

@@ -11,10 +11,5 @@ func NotExistsCreate(printNo string) error {
PrintNo: printNo,
}
//sql := `INSERT INTO print_activation (print_no,created_at) SELECT ?,? FROM dual WHERE NOT EXISTS (SELECT * FROM print_activation WHERE print_no = ?)`
//param := []interface{}{printNo, time.Now(), printNo}
//if _, err := ExecuteSQL(GetDB(), sql, param...); err != nil {
// return err
//}
return CreateOrUpdate(GetDB(), item)
}