Compare commits

...

3 Commits

Author SHA1 Message Date
邹宗楠
35c8468789 1 2026-05-11 10:03:21 +08:00
邹宗楠
040f0b7d8d 1 2026-05-09 17:25:32 +08:00
邹宗楠
d339ebb47b 1 2026-05-09 16:02:04 +08:00
5 changed files with 453 additions and 453 deletions

View File

@@ -386,11 +386,11 @@ func DoPrintMsg(appID int, msgID, printNo, content string, orderNo string) (err
Status: event.PrintMsgWait,
}
t, ok := event.PrintObject.GetPrintObj(printNo)
t, ok := event.TcpClientList.Load(printNo)
if ok {
t.Lock()
defer t.Unlock()
t.MsgMap[printNo] <- printMsg
t.(*event.TcpClient).Lock()
defer t.(*event.TcpClient).Unlock()
t.(*event.TcpClient).MsgMap[printNo] <- printMsg
printMsg.Status = event.PrintMsgAlreadyLoad
} /*else {
t = event.NewTcpClient()

View File

@@ -1,97 +1,99 @@
package event
import (
"fmt"
"sync"
)
var (
PrintObject *PrintObjectStruct // 缓存的打印机对象
PrintAddrAndIp *PrintAddrAndIpStruct // 缓存打印机地址:[ip:printNo] event 文件包,connect只能获取到addr
PrintIpAndAddr *PrintIpAndAddrStruct // 缓存打印机地址:[printNo:ip] api_controller 只能获取到printNo
)
func init() {
fmt.Println("初始化打印机对象")
PrintObject = &PrintObjectStruct{
PrintObject: make(map[string]*TcpClient),
RWMutex: new(sync.RWMutex),
}
PrintAddrAndIp = &PrintAddrAndIpStruct{
PrintObject: make(map[string]string),
RWMutex: new(sync.RWMutex),
}
PrintIpAndAddr = &PrintIpAndAddrStruct{
PrintObject: make(map[string]string),
RWMutex: new(sync.RWMutex),
}
}
type PrintObjectStruct struct {
PrintObject map[string]*TcpClient
*sync.RWMutex
}
func (p *PrintObjectStruct) GetPrintObj(printNo string) (*TcpClient, bool) {
p.RLock()
defer p.RUnlock()
tcpObj, ok := PrintObject.PrintObject[printNo]
return tcpObj, ok
}
func (p *PrintObjectStruct) SetPrintObj(printNo string, tcpObj *TcpClient) {
p.RLock()
defer p.RUnlock()
PrintObject.PrintObject[printNo] = tcpObj
}
func (p *PrintObjectStruct) DelPrintObj(printNo string) {
p.RLock()
defer p.RUnlock()
delete(PrintObject.PrintObject, printNo)
}
type PrintAddrAndIpStruct struct {
PrintObject map[string]string
*sync.RWMutex
}
func (p *PrintAddrAndIpStruct) GetPrintAddrAndIp(ip string) (string, bool) {
p.RLock()
defer p.RUnlock()
printNo, ok := PrintAddrAndIp.PrintObject[ip]
return printNo, ok
}
func (p *PrintAddrAndIpStruct) SetPrintAddrAndIp(ip string, printNo string) {
p.RLock()
defer p.RUnlock()
PrintAddrAndIp.PrintObject[ip] = printNo
}
func (p *PrintAddrAndIpStruct) DelPrintAddrAndIp(ip string) {
p.RLock()
defer p.RUnlock()
delete(PrintAddrAndIp.PrintObject, ip)
}
type PrintIpAndAddrStruct struct {
PrintObject map[string]string
*sync.RWMutex
}
func (p *PrintIpAndAddrStruct) GetPrintIpAndAddr(printNo string) (string, bool) {
p.RLock()
defer p.RUnlock()
tcpObj, ok := PrintIpAndAddr.PrintObject[printNo]
return tcpObj, ok
}
func (p *PrintIpAndAddrStruct) SetPrintIpAndAddr(printNo string, ip string) {
p.RLock()
defer p.RUnlock()
PrintIpAndAddr.PrintObject[printNo] = ip
}
func (p *PrintIpAndAddrStruct) DelPrintIpAndAddr(printNo string) {
p.RLock()
defer p.RUnlock()
delete(PrintIpAndAddr.PrintObject, printNo)
}
//package event
//
//import (
// "fmt"
// "sync"
//)
//
//var (
// PrintObject *PrintObjectStruct // 缓存打印机对象
// PrintAddrAndIp *PrintAddrAndIpStruct // 缓存打印机地址:[ip:printNo] event 文件包,connect只能获取到addr
// PrintIpAndAddr *PrintIpAndAddrStruct // 缓存打印机地址:[printNo:ip] api_controller 只能获取到printNo
//)
//
//func init() {
// fmt.Println("初始化打印机对象")
// PrintObject = &PrintObjectStruct{
// PrintObject: make(map[string]*TcpClient),
// RWMutex: new(sync.RWMutex),
// }
// PrintAddrAndIp = &PrintAddrAndIpStruct{
// PrintObject: make(map[string]string),
// RWMutex: new(sync.RWMutex),
// }
// PrintIpAndAddr = &PrintIpAndAddrStruct{
// PrintObject: make(map[string]string),
// RWMutex: new(sync.RWMutex),
// }
//}
//
//type PrintObjectStruct struct {
// PrintObject map[string]*TcpClient
// *sync.RWMutex
//}
//
//func (p *PrintObjectStruct) GetPrintObj(printNo string) (*TcpClient, bool) {
// p.RLock()
// defer p.RUnlock()
// tcpObj, ok := PrintObject.PrintObject[printNo]
// return tcpObj, ok
//}
//func (p *PrintObjectStruct) SetPrintObj(printNo string, tcpObj *TcpClient) {
// p.RLock()
// defer p.RUnlock()
// PrintObject.PrintObject[printNo] = tcpObj
//}
//
//func (p *PrintObjectStruct) DelPrintObj(printNo string) {
// p.RLock()
// defer p.RUnlock()
// delete(PrintObject.PrintObject, printNo)
//}
//
//type PrintAddrAndIpStruct struct {
// PrintObject map[string]string
// *sync.RWMutex
//}
//
//func (p *PrintAddrAndIpStruct) GetPrintAddrAndIp(ip string) (string, bool) {
// p.RLock()
// defer p.RUnlock()
// printNo, ok := PrintAddrAndIp.PrintObject[ip]
// return printNo, ok
//}
//func (p *PrintAddrAndIpStruct) SetPrintAddrAndIp(ip string, printNo string) {
// p.RLock()
// defer p.RUnlock()
// PrintAddrAndIp.PrintObject[ip] = printNo
//}
//
//func (p *PrintAddrAndIpStruct) DelPrintAddrAndIp(ip string) {
// p.RLock()
// defer p.RUnlock()
// delete(PrintAddrAndIp.PrintObject, ip)
//}
//
//type PrintIpAndAddrStruct struct {
// PrintObject map[string]string
// *sync.RWMutex
//}
//
//func (p *PrintIpAndAddrStruct) GetPrintIpAndAddr(printNo string) (string, bool) {
// p.RLock()
// defer p.RUnlock()
// tcpObj, ok := PrintIpAndAddr.PrintObject[printNo]
// return tcpObj, ok
//}
//func (p *PrintIpAndAddrStruct) SetPrintIpAndAddr(printNo string, ip string) {
// p.RLock()
// defer p.RUnlock()
// PrintIpAndAddr.PrintObject[printNo] = ip
//}
//
//func (p *PrintIpAndAddrStruct) DelPrintIpAndAddr(printNo string) {
// p.RLock()
// defer p.RUnlock()
// delete(PrintIpAndAddr.PrintObject, printNo)
//}

View File

@@ -4,15 +4,11 @@ import (
"encoding/hex"
"errors"
"fmt"
"git.rosy.net.cn/baseapi/utils"
"git.rosy.net.cn/jx-callback/business/model"
"git.rosy.net.cn/jx-callback/business/model/dao"
"git.rosy.net.cn/jx-callback/globals"
"io"
"net"
"strings"
"time"
"unicode/utf8"
)
// ConnRead 获取链接数据
@@ -48,12 +44,12 @@ func ListenTcp() {
if err := handleConn(c); err != nil {
c.Close()
Poll.Wait()
Poll.Stop()
Pool.Wait()
Pool.Stop()
return
}
}
Poll.AddJob(fn)
Pool.AddJob(fn)
}
}
@@ -65,40 +61,23 @@ func handleConn(c net.Conn) error {
buffer, n, err := ConnRead(c)
printRemoteAddr := c.RemoteAddr().String()
printRemoteAddr = strings.Split(printRemoteAddr, ":")[0]
printNoByIP, _ := PrintAddrAndIp.GetPrintAddrAndIp(printRemoteAddr)
if err != nil {
if err == io.EOF {
fmt.Println("connection close")
} else {
fmt.Println("ReadString err:", err)
}
globals.SugarLogger.Debugf("--------printRemoteAddr := %s,printNo := %s", printRemoteAddr, printNoByIP)
if printNo, ok := PrintAddrAndIp.GetPrintAddrAndIp(printRemoteAddr); ok {
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddr)
PrintObject.DelPrintObj(printNo)
PrintIpAndAddr.DelPrintIpAndAddr(printRemoteAddr)
dao.ExecuteSQL(dao.GetDB(), `UPDATE printer SET status = -1,is_online = -1 WHERE print_no = ? `, []interface{}{printNo}...)
} else {
printStatusOff := make(map[string]int, 0)
for ip, pn := range PrintAddrAndIp.PrintObject {
if ip == printRemoteAddr {
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddr)
PrintIpAndAddr.DelPrintIpAndAddr(pn)
PrintObject.DelPrintObj(printNo)
} else if pn != "" {
printStatusOff[pn] = 1
}
}
}
return err
}
//看是心跳还是打印回调
data := hex.EncodeToString(buffer[:n])
var (
printNo string = "" //打印机编号
heartbeat bool = false
callback bool = false
printNo string = "" //打印机编号
heartbeat bool = false
callback bool = false
t *TcpClient = nil
)
if strings.Contains(data, heartText) || strings.Contains(data, heartTextNew) {
printNoData, _ := hex.DecodeString(data[len(heartText) : len(data)-8])
@@ -109,11 +88,12 @@ func handleConn(c net.Conn) error {
callback = true
}
t, ok := PrintObject.GetPrintObj(printNo)
if !ok || t.Clients[printNo] == nil || time.Now().Sub(t.Clients[printNo].StatusTime).Seconds() >= 120 {
if value, have := TcpClientList.Load(printNo); !have {
t = NewTcpClient()
TcpClientList.Store(printNo, t)
} else {
t = value.(*TcpClient)
}
if heartbeat {
// 证明是心跳
Heartbeat(c, t, data, printNo, printRemoteAddr)
@@ -136,238 +116,8 @@ func (t *TcpClient) printFail() (err error) {
return err
}
func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) (err error) {
var (
db = dao.GetDB()
comment string
status int
)
//1、先找出打印机编号和订单序列号这两个确定唯一一条消息?
//通过参数传进来
//2、打印成功改变打印表的状态
if strings.Contains(data, printSuccessText) || strings.Contains(data, printSuccessTextNew) {
status = printMsgSuccess
comment = "回调成功,修改打印状态"
} else {
//打印失败也改变状态并更新失败原因
status = printMsgFail
comment = printErrMap[data[12:14]]
}
//这里序号重复会有问题
if printMsgs, err := dao.GetPrintMsgNoPage(db, printNo, orderNo); err != nil {
globals.SugarLogger.Debugf("changePrintMsg err :[%v]", err)
return err
} else if len(printMsgs) == 0 {
globals.SugarLogger.Debugf("changePrintMsg err ,not found printMsg printNo:[%v], orderNo :[%v]", printNo, orderNo)
} else if len(printMsgs) > 0 {
for _, v := range printMsgs {
v.Comment = comment
v.Status = status
dao.UpdateEntity(db, v, "Comment", "Status")
}
}
return err
}
func HandleTcpMessages(t *TcpClient, printNo string) {
var (
db = dao.GetDB()
offset, pageSize = 0, 10
)
if !t.isExistMsg(printNo) {
return
}
fn := func() {
//for {
// time.Sleep(2 * time.Second)
if t.TimeoutMap[printNo] == true {
timeNow := time.Now()
timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location())
timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
prints, _ := dao.GetPrintMsgs(db, printNo, []int{PrintMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
for _, printMsg := range prints {
printMsg.Status = PrintMsgAlreadyLoad
//先避免重复读再插到channel
if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
if err = t.addMsgChan(printMsg); err != nil {
globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
}
}
}
} else {
globals.SugarLogger.Debugf("HandleTcpMessages timeout")
return
}
}
Poll.AddJob(fn)
}
func (t *TcpClient) readTimeoutMap(key string) bool {
t.Lock()
defer t.Unlock()
return t.TimeoutMap[key]
}
func doPrint(t *TcpClient, key string) (err error) {
var (
db = dao.GetDB()
)
if !t.isExistMsg(key) {
return err
}
fn := func() {
for {
if t.TimeoutMap[key] == true {
select {
case printMsg, ok := <-t.MsgMap[key]:
if !ok {
globals.SugarLogger.Debugf("doPrint err !ok ...")
return
}
var (
data []byte
c net.Conn
)
if printMsg == nil {
globals.SugarLogger.Debugf("print msg is nil")
continue
}
if err = checkPrintMsg(db, printMsg); err == nil {
status := t.getPrintStatus(printMsg.PrintNo)
switch status {
//只有在线才打印内容
case printerStatusOnline:
if c = t.getPrintConn(printMsg.PrintNo); c != nil {
data, err = buildMsg(printMsg)
}
case printerStatusOffline:
err = fmt.Errorf("打印机离线!")
case printerStatusOnlineWithoutPaper:
err = fmt.Errorf("打印机缺纸!")
default:
err = fmt.Errorf("打印机状态未知!")
}
}
if c == nil {
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
return
}
if err != nil {
printMsg.Status = printMsgErr
printMsg.Comment = err.Error()
dao.UpdateEntity(db, printMsg, "Status", "Comment")
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
return
}
if _, err = c.Write(data); err != nil {
globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err)
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
} else {
//等待回调
dataStr := <-t.CallBackMap[key]
if dataStr != "" {
a, b := getCallbackMsgInfo(dataStr)
t.changePrintMsg(dataStr, a, b)
// 查询打印机是否扣费,未扣费就扣费,已经扣费不做处理
have, err2 := dao.QueryOrderDeductionRecord(db, b, utils.Int64ToStr(a))
if err2 == nil && !have {
// 扣除打印机账号金额
if err = dao.DeductionPrintBalance(db, b); err != nil {
globals.SugarLogger.Debugf("扣除用户打印机金额错误 %s", err)
} else {
// 添加打印记录(支出记录)
if err = dao.AddPrintRecord(db, &model.PrintBillRecord{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
PrintNo: b,
PayType: 2,
PayMoney: 1, // 固定支出一分钱
OrderId: utils.Int64ToStr(a),
UserId: "",
}); err != nil {
globals.SugarLogger.Debugf("添加打印机订单支付记录错误 %s", err)
}
}
} else {
globals.SugarLogger.Debugf("今天已经扣除过了! %v %d %s", err2, a, b)
}
// 回调重置打印机状态时间
t.Clients[b].StatusTime = time.Now()
//判断音频暂停?
//收到打印成功回调后,如果消息中有音频,需要等待一下,等上一个音频播完
//暂停时间就暂时取的sound标签内内容长度/2
if sounds := regexpSoundSpan.FindStringSubmatch(printMsg.Content); len(sounds) > 0 {
sound := sounds[1]
lenTime := time.Duration(utf8.RuneCountInString(sound)) * time.Second
time.Sleep(lenTime / 2)
}
}
}
}
} else {
globals.SugarLogger.Debugf("doPrint timeout")
return
}
}
}
Poll.AddJob(fn)
return err
}
// HandleCheckTcpHeart 检测心跳
func HandleCheckTcpHeart(t *TcpClient, key string) {
if t.TimeoutMap[key] == true {
statusTime := t.getPrintStatusTime(key)
if !utils.IsTimeZero(statusTime) {
//1分钟内没心跳判断打印机掉线了
if time.Now().Sub(statusTime) > time.Second*75 {
globals.SugarLogger.Debugf("超过一分十秒没有心跳的打印机[%s],当前心跳时间: %s ,上一次心跳时间 : %s", key, utils.Time2TimeStr(time.Now()), utils.Time2TimeStr(statusTime))
changePrinterStatus(key, printerStatusOffline)
// 链接出错,彻底删除换成
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
}
}
} else {
t.getClients(key).C.Close()
close(t.MsgMap[key])
close(t.CallBackMap[key])
//t.delConn(key)
t.clear(key)
// 链接出错,彻底删除换成
if printRemoteAddrIP, have := PrintIpAndAddr.GetPrintIpAndAddr(key); have {
PrintIpAndAddr.DelPrintIpAndAddr(key)
PrintAddrAndIp.DelPrintAddrAndIp(printRemoteAddrIP)
PrintObject.DelPrintObj(key)
}
return
}
}

View File

@@ -14,8 +14,11 @@ import (
"strings"
"sync"
"time"
"unicode/utf8"
)
var TcpClientList = new(sync.Map)
const (
heartText = "1e000f02000151" // 老版心跳
heartTextNew = "1e001a02000151" // 新版心跳
@@ -149,7 +152,6 @@ func (t *TcpClient) delConn(key string) {
t.Lock()
defer t.Unlock()
if t.Clients[key].C != nil {
globals.SugarLogger.Debugf("-------close2 := %s", key)
t.Clients[key].C.Close()
}
delete(t.Clients, key)
@@ -394,38 +396,6 @@ func changePrinterStatus(printNo string, status int) {
}
}
//按打印机方提供的文档来的
func buildMsg(printMsg *model.PrintMsg) (data []byte, err error) {
var (
content = printMsg.Content
orderNo = printMsg.OrderNo
str = "1e"
const1 = "0200ff50"
printInit = "1b40" //打印机初始化
//voice = "1d6b401dfd001a01015b7631365d736f756e64622cc4fad3d0d0c2b6a9b5a5c0b1" //语音,中国
//qr = "1d58021b5a0001061600747470733a2f2f7777772e62616964752e636f6d2f1b000A0A0A1B40"
orderNoHexH, orderNoHexL, printData string
)
//写入数据
no, err := strconv.ParseInt(orderNo, 10, 64)
if err != nil {
globals.SugarLogger.Debug("order_msg Order_no 转换异常")
}
orderNoHexH, orderNoHexL = int2h8l8(no)
// 将数据与模板组装
if strings.Contains(content, "•") {
content = strings.ReplaceAll(content, "•", "-")
}
printDataGBK, _ := jxutils.Utf8ToGbk([]byte(utils.FilterEmoji(content)))
printData = hex.EncodeToString(printDataGBK)
printData = replaceContent(printData, printMsg)
lenData := int64(len(str) + len(const1) + len(orderNoHexH) + len(orderNoHexL) + len(printInit) + 2 + 4 + len(printData))
x1, x2 := int2h8l8(lenData / 2)
dataStr := str + x1 + x2 + const1 + orderNoHexH + orderNoHexL + printInit + printData
check := getCheckSum(dataStr)
return jxutils.Hextob(dataStr + check), err
}
func getCheckSum(str string) (check string) {
var sum int64
for i := 0; i < len(str); i = i + 2 {
@@ -647,7 +617,7 @@ func Heartbeat(c net.Conn, t *TcpClient, data string, printNo string, printRemot
//4、读打印channel并打印并切等待回调channel中的消息
//5、修改数据库中打印机状态没在连接池中说明是重新连接的
//6、监听心跳时间超过1分多钟就clear掉
if t.getClients(printNo) == nil || t == nil || t.getPrintStatusTime(printNo).IsZero() || time.Now().Sub(t.Clients[printNo].StatusTime).Seconds() >= 120 {
if t.getClients(printNo) == nil || t.getPrintStatusTime(printNo).IsZero() || time.Now().Sub(t.Clients[printNo].StatusTime).Seconds() >= 120 {
addConn(c, t, printNo, status)
buildAllMap(t, printNo)
//t.TimeoutMap[printNo] <- true
@@ -663,10 +633,6 @@ func Heartbeat(c net.Conn, t *TcpClient, data string, printNo string, printRemot
if err := dao.NotExistsCreate(printNo); err != nil {
globals.SugarLogger.Debugf("监听打印机心跳,不存在则创建 :[%v],printNo[%s]", err, printNo)
}
PrintObject.SetPrintObj(printNo, t)
PrintAddrAndIp.SetPrintAddrAndIp(printRemoteAddr, printNo)
PrintIpAndAddr.SetPrintIpAndAddr(printNo, printRemoteAddr)
} else {
//在加到连接池中已经更新了时间所以放在else里
t.setPrintStatusTime(printNo)
@@ -685,3 +651,232 @@ func Callback(c net.Conn, t *TcpClient, data string, printNo string) {
t.setPrintStatusTime(printNo)
t.addCallbackChan(printNo, data)
}
func HandleTcpMessages(t *TcpClient, printNo string) {
var (
db = dao.GetDB()
offset, pageSize = 0, 10
)
if !t.isExistMsg(printNo) {
return
}
fn := func() {
//for {
// time.Sleep(2 * time.Second)
if t.TimeoutMap[printNo] == true {
timeNow := time.Now()
timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location())
timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
prints, _ := dao.GetPrintMsgs(db, printNo, []int{PrintMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
for _, printMsg := range prints {
printMsg.Status = PrintMsgAlreadyLoad
//先避免重复读再插到channel
if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
if err = t.addMsgChan(printMsg); err != nil {
globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
}
}
}
} else {
globals.SugarLogger.Debugf("HandleTcpMessages timeout")
return
}
}
Pool.AddJob(fn)
}
func doPrint(t *TcpClient, key string) (err error) {
var (
db = dao.GetDB()
)
if !t.isExistMsg(key) {
return err
}
fn := func() {
for {
if t.TimeoutMap[key] == true {
select {
case printMsg, ok := <-t.MsgMap[key]:
if !ok {
globals.SugarLogger.Debugf("doPrint err !ok ...")
return
}
var (
data []byte
c net.Conn
)
if printMsg == nil {
globals.SugarLogger.Debugf("print msg is nil")
continue
}
if err = checkPrintMsg(db, printMsg); err == nil {
status := t.getPrintStatus(printMsg.PrintNo)
switch status {
//只有在线才打印内容
case printerStatusOnline:
if c = t.getPrintConn(printMsg.PrintNo); c != nil {
data, err = buildMsg(printMsg)
}
case printerStatusOffline:
err = fmt.Errorf("打印机离线!")
case printerStatusOnlineWithoutPaper:
err = fmt.Errorf("打印机缺纸!")
default:
err = fmt.Errorf("打印机状态未知!")
}
}
if c == nil {
return
}
if err != nil {
printMsg.Status = printMsgErr
printMsg.Comment = err.Error()
dao.UpdateEntity(db, printMsg, "Status", "Comment")
return
}
if _, err = c.Write(data); err != nil {
globals.SugarLogger.Debugf("handleTcpMessages err [%v]", err)
} else {
//等待回调
dataStr := <-t.CallBackMap[key]
if dataStr != "" {
a, b := getCallbackMsgInfo(dataStr)
t.changePrintMsg(dataStr, a, b)
// 查询打印机是否扣费,未扣费就扣费,已经扣费不做处理
have, err2 := dao.QueryOrderDeductionRecord(db, b, utils.Int64ToStr(a))
if err2 == nil && !have {
// 扣除打印机账号金额
if err = dao.DeductionPrintBalance(db, b); err != nil {
globals.SugarLogger.Debugf("扣除用户打印机金额错误 %s", err)
} else {
// 添加打印记录(支出记录)
if err = dao.AddPrintRecord(db, &model.PrintBillRecord{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
PrintNo: b,
PayType: 2,
PayMoney: 1, // 固定支出一分钱
OrderId: utils.Int64ToStr(a),
UserId: "",
}); err != nil {
globals.SugarLogger.Debugf("添加打印机订单支付记录错误 %s", err)
}
}
} else {
globals.SugarLogger.Debugf("今天已经扣除过了! %v %d %s", err2, a, b)
}
// 回调重置打印机状态时间
t.Clients[b].StatusTime = time.Now()
//判断音频暂停?
//收到打印成功回调后,如果消息中有音频,需要等待一下,等上一个音频播完
//暂停时间就暂时取的sound标签内内容长度/2
if sounds := regexpSoundSpan.FindStringSubmatch(printMsg.Content); len(sounds) > 0 {
sound := sounds[1]
lenTime := time.Duration(utf8.RuneCountInString(sound)) * time.Second
time.Sleep(lenTime / 2)
}
}
}
}
} else {
globals.SugarLogger.Debugf("doPrint timeout")
return
}
}
}
Pool.AddJob(fn)
return err
}
//按打印机方提供的文档来的
func buildMsg(printMsg *model.PrintMsg) (data []byte, err error) {
var (
content = printMsg.Content
orderNo = printMsg.OrderNo
str = "1e"
const1 = "0200ff50"
printInit = "1b40" //打印机初始化
//voice = "1d6b401dfd001a01015b7631365d736f756e64622cc4fad3d0d0c2b6a9b5a5c0b1" //语音,中国
//qr = "1d58021b5a0001061600747470733a2f2f7777772e62616964752e636f6d2f1b000A0A0A1B40"
orderNoHexH, orderNoHexL, printData string
)
//写入数据
no, err := strconv.ParseInt(orderNo, 10, 64)
if err != nil {
globals.SugarLogger.Debug("order_msg Order_no 转换异常")
}
orderNoHexH, orderNoHexL = int2h8l8(no)
// 将数据与模板组装
if strings.Contains(content, "•") {
content = strings.ReplaceAll(content, "•", "-")
}
printDataGBK, _ := jxutils.Utf8ToGbk([]byte(utils.FilterEmoji(content)))
printData = hex.EncodeToString(printDataGBK)
printData = replaceContent(printData, printMsg)
lenData := int64(len(str) + len(const1) + len(orderNoHexH) + len(orderNoHexL) + len(printInit) + 2 + 4 + len(printData))
x1, x2 := int2h8l8(lenData / 2)
dataStr := str + x1 + x2 + const1 + orderNoHexH + orderNoHexL + printInit + printData
check := getCheckSum(dataStr)
return jxutils.Hextob(dataStr + check), err
}
// HandleCheckTcpHeart 检测心跳
func HandleCheckTcpHeart(t *TcpClient, key string) {
if t.TimeoutMap[key] == true {
statusTime := t.getPrintStatusTime(key)
if !utils.IsTimeZero(statusTime) {
//1分钟内没心跳判断打印机掉线了
if time.Now().Sub(statusTime) > time.Second*75 {
globals.SugarLogger.Debugf("超过一分十秒没有心跳的打印机[%s],当前心跳时间: %s ,上一次心跳时间 : %s", key, utils.Time2TimeStr(time.Now()), utils.Time2TimeStr(statusTime))
changePrinterStatus(key, printerStatusOffline)
}
}
} else {
t.getClients(key).C.Close()
close(t.MsgMap[key])
close(t.CallBackMap[key])
//t.delConn(key)
t.clear(key)
TcpClientList.Delete(key)
return
}
}
func (t *TcpClient) changePrintMsg(data string, orderNo int64, printNo string) (err error) {
var (
db = dao.GetDB()
comment string
status int
)
//1、先找出打印机编号和订单序列号这两个确定唯一一条消息?
//通过参数传进来
//2、打印成功改变打印表的状态
if strings.Contains(data, printSuccessText) || strings.Contains(data, printSuccessTextNew) {
status = printMsgSuccess
comment = "回调成功,修改打印状态"
} else {
//打印失败也改变状态并更新失败原因
status = printMsgFail
comment = printErrMap[data[12:14]]
}
//这里序号重复会有问题
if printMsgs, err := dao.GetPrintMsgNoPage(db, printNo, orderNo); err != nil {
globals.SugarLogger.Debugf("changePrintMsg err :[%v]", err)
return err
} else if len(printMsgs) == 0 {
globals.SugarLogger.Debugf("changePrintMsg err ,not found printMsg printNo:[%v], orderNo :[%v]", printNo, orderNo)
} else if len(printMsgs) > 0 {
for _, v := range printMsgs {
v.Comment = comment
v.Status = status
dao.UpdateEntity(db, v, "Comment", "Status")
}
}
return err
}

View File

@@ -2,103 +2,156 @@ package event
import (
"sync"
"time"
)
var Poll *Pool
// 全局单例线程池初始化500个worker
var Pool *WorkerPool
func init() {
Poll = NewPool(500)
Poll.Start()
Pool = NewWorkerPool(500, 1024) // 500个worker任务队列长度1024
Pool.Start()
}
// Job 任务定义
type Job func()
// Worker 工作协程
type Worker struct {
id int
jobChannel chan Job
quit chan bool
id int
jobChan chan Job
quitChan chan struct{}
closeOnce sync.Once
}
type Pool struct {
workers []*Worker
jobChannel chan Job
wg sync.WaitGroup
// WorkerPool 线程池
type WorkerPool struct {
workers []*Worker
jobChan chan Job
wg sync.WaitGroup
closeOnce sync.Once
closed bool
mu sync.Mutex
}
func NewWorker(id int, jobChannel chan Job) *Worker {
// ----------------------------------------------------
// 创建 Worker
// ----------------------------------------------------
func NewWorker(id int, jobChan chan Job) *Worker {
return &Worker{
id: id,
jobChannel: jobChannel,
quit: make(chan bool),
id: id,
jobChan: jobChan,
quitChan: make(chan struct{}),
}
}
// Start 启动Worker自带panic恢复
func (w *Worker) Start() {
go func() {
defer func() {
if r := recover(); r != nil {
// 打印panic日志防止整个worker崩溃
// log.Printf("worker %d panic recovered: %v", w.id, r)
}
}()
for {
select {
case job := <-w.jobChannel:
case job, ok := <-w.jobChan:
if !ok {
return
}
job()
case <-w.quit:
case <-w.quitChan:
return
}
}
}()
}
// Stop 安全停止Worker
func (w *Worker) Stop() {
w.quit <- true
close(w.quit)
//go func() {
// w.quit <- true
//}()
w.closeOnce.Do(func() {
close(w.quitChan)
})
}
func NewPool(numWorkers int) *Pool {
jobChannel := make(chan Job)
pool := &Pool{
workers: make([]*Worker, numWorkers),
jobChannel: jobChannel,
// ----------------------------------------------------
// 创建线程池
// ----------------------------------------------------
func NewWorkerPool(workerCount int, queueSize int) *WorkerPool {
jobChan := make(chan Job, queueSize) // 带缓冲队列,高并发不阻塞
pool := &WorkerPool{
workers: make([]*Worker, workerCount),
jobChan: jobChan,
}
for i := 0; i < numWorkers; i++ {
worker := NewWorker(i, jobChannel)
pool.workers[i] = worker
for i := 0; i < workerCount; i++ {
pool.workers[i] = NewWorker(i, jobChan)
}
return pool
}
func (p *Pool) Start() {
for _, worker := range p.workers {
worker.Start()
// Start 启动所有worker
func (p *WorkerPool) Start() {
for _, w := range p.workers {
w.Start()
}
}
func (p *Pool) Stop() {
for _, worker := range p.workers {
worker.Stop()
// AddJob 提交任务非阻塞、安全、不panic
func (p *WorkerPool) AddJob(job Job) bool {
p.mu.Lock()
closed := p.closed
p.mu.Unlock()
if closed {
return false
}
}
//
//func (p *Pool) AddJob(job Job) {
// p.wg.Add(1)
// p.jobChannel <- func() {
// job()
// p.wg.Done()
// }
//}
func (p *Pool) AddJob(job Job) {
p.wg.Add(1)
go func() {
p.jobChannel <- func() {
defer p.wg.Done()
job()
}
}()
select {
case p.jobChan <- func() {
defer p.wg.Done()
job()
}:
return true
case <-time.After(10 * time.Millisecond):
// 队列满了,不阻塞,快速失败
p.wg.Done()
return false
}
}
func (p *Pool) Wait() {
// Wait 等待所有任务执行完成
func (p *WorkerPool) Wait() {
p.wg.Wait()
}
// Stop 优雅关闭线程池安全不panic
func (p *WorkerPool) Stop() {
p.closeOnce.Do(func() {
p.mu.Lock()
p.closed = true
p.mu.Unlock()
// 1. 关闭任务通道
close(p.jobChan)
// 2. 停止所有worker
for _, w := range p.workers {
w.Stop()
}
// 3. 等待所有任务执行完
p.Wait()
})
}
// IsClosed 判断线程池是否关闭
func (p *WorkerPool) IsClosed() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.closed
}