From d626fcf12d3ccaabc5da4ec78ed19bf7639d6b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E5=AE=97=E6=A5=A0?= Date: Mon, 29 May 2023 16:49:57 +0800 Subject: [PATCH] 1 --- business/jxstore/event/event_tcp.go | 51 ++------------- business/jxstore/event/print_test.go | 17 ----- business/jxstore/event/timing_task.go | 93 ++++++++++++++------------- 3 files changed, 51 insertions(+), 110 deletions(-) diff --git a/business/jxstore/event/event_tcp.go b/business/jxstore/event/event_tcp.go index c3c108e15..a0a829e6f 100644 --- a/business/jxstore/event/event_tcp.go +++ b/business/jxstore/event/event_tcp.go @@ -16,8 +16,6 @@ import ( "unicode/utf8" ) -var pool = NewSimplePoll(100) - //入口 func ListenTcp() { l, err := net.Listen("tcp", ":8000") @@ -33,7 +31,7 @@ func ListenTcp() { break } - printFunc := func() { + go func() { defer func() { // 捕获异常 防止waitGroup阻塞 if err := recover(); err != nil { @@ -47,24 +45,7 @@ func ListenTcp() { t = nil return } - } - pool.Add(printFunc) - pool.Run() - //go func() { - // defer func() { - // // 捕获异常 防止waitGroup阻塞 - // if err := recover(); err != nil { - // fmt.Println("recover err = ", err) - // return - // } - // }() - // - // t := NewTcpClient() - // if err := handleConn(c, t); err != nil { - // t = nil - // return - // } - //}() + }() } } @@ -204,7 +185,7 @@ func (t *TcpClient) HandleTcpMessages(printNo string) { return } - workFunc := func() { + go func() { for { if t.TimeoutMap[printNo] == true { timeNow := time.Now() @@ -225,31 +206,7 @@ func (t *TcpClient) HandleTcpMessages(printNo string) { return } } - } - pool.Add(workFunc) - pool.Run() - //go func() { - // for { - // if t.TimeoutMap[printNo] == true { - // timeNow := time.Now() - // timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location()) - // timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location()) - // prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize) - // for _, printMsg := range prints { - // printMsg.Status = printMsgAlreadyLoad - // //先避免重复读再插到channel? - // if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil { - // if err = t.addMsgChan(printMsg); err != nil { - // globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err) - // } - // } - // } - // } else { - // globals.SugarLogger.Debugf("HandleTcpMessages timeout") - // return - // } - // } - //}() + }() } func (t *TcpClient) readTimeoutMap(key string) bool { diff --git a/business/jxstore/event/print_test.go b/business/jxstore/event/print_test.go index 5d068d1b0..542f93afb 100644 --- a/business/jxstore/event/print_test.go +++ b/business/jxstore/event/print_test.go @@ -8,7 +8,6 @@ import ( "strconv" "strings" "testing" - "time" "unicode/utf8" ) @@ -83,19 +82,3 @@ func TestTen216(t *testing.T) { fmt.Println(gg) fmt.Println(kk) } - -func TestSimplePool(t *testing.T) { - p := NewSimplePoll(20) - for i := 0; i < 100; i++ { - p.Add(parseTask(i)) - } - p.Run() -} - -func parseTask(i int) func() { - return func() { - // 模拟抓取数据的过程 - time.Sleep(time.Second * 1) - fmt.Println("finish parse ", i) - } -} diff --git a/business/jxstore/event/timing_task.go b/business/jxstore/event/timing_task.go index af48a5c58..121c06918 100644 --- a/business/jxstore/event/timing_task.go +++ b/business/jxstore/event/timing_task.go @@ -1,48 +1,49 @@ package event -import ( - "fmt" - "sync" -) - -type SimplePool struct { - wg sync.WaitGroup - work chan func() //任务队列 -} - -func NewSimplePoll(workers int) *SimplePool { - p := &SimplePool{ - wg: sync.WaitGroup{}, - work: make(chan func()), - } - p.wg.Add(workers) - //根据指定的并发量去读取管道并执行 - for i := 0; i < workers; i++ { - go func() { - defer func() { - // 捕获异常 防止waitGroup阻塞 - if err := recover(); err != nil { - fmt.Println(err) - p.wg.Done() - } - }() - // 从workChannel中取出任务执行 - for fn := range p.work { - fn() - } - p.wg.Done() - }() - } - return p -} - -// 添加任务 -func (p *SimplePool) Add(fn func()) { - p.work <- fn -} - -// 执行 -func (p *SimplePool) Run() { - close(p.work) - p.wg.Wait() -} +// +//import ( +// "fmt" +// "sync" +//) +// +//type SimplePool struct { +// wg sync.WaitGroup +// work chan func() //任务队列 +//} +// +//func NewSimplePoll(workers int) *SimplePool { +// p := &SimplePool{ +// wg: sync.WaitGroup{}, +// work: make(chan func()), +// } +// p.wg.Add(workers) +// //根据指定的并发量去读取管道并执行 +// for i := 0; i < workers; i++ { +// go func() { +// defer func() { +// // 捕获异常 防止waitGroup阻塞 +// if err := recover(); err != nil { +// fmt.Println(err) +// p.wg.Done() +// } +// }() +// // 从workChannel中取出任务执行 +// for fn := range p.work { +// fn() +// } +// p.wg.Done() +// }() +// } +// return p +//} +// +//// 添加任务 +//func (p *SimplePool) Add(fn func()) { +// p.work <- fn +//} +// +//// 执行 +//func (p *SimplePool) Run() { +// close(p.work) +// p.wg.Wait() +//}