This commit is contained in:
邹宗楠
2023-05-29 16:49:57 +08:00
parent 958ab4686d
commit d626fcf12d
3 changed files with 51 additions and 110 deletions

View File

@@ -16,8 +16,6 @@ import (
"unicode/utf8"
)
var pool = NewSimplePoll(100)
//入口
func ListenTcp() {
l, err := net.Listen("tcp", ":8000")
@@ -33,7 +31,7 @@ func ListenTcp() {
break
}
printFunc := func() {
go func() {
defer func() {
// 捕获异常 防止waitGroup阻塞
if err := recover(); err != nil {
@@ -47,24 +45,7 @@ func ListenTcp() {
t = nil
return
}
}
pool.Add(printFunc)
pool.Run()
//go func() {
// defer func() {
// // 捕获异常 防止waitGroup阻塞
// if err := recover(); err != nil {
// fmt.Println("recover err = ", err)
// return
// }
// }()
//
// t := NewTcpClient()
// if err := handleConn(c, t); err != nil {
// t = nil
// return
// }
//}()
}()
}
}
@@ -204,7 +185,7 @@ func (t *TcpClient) HandleTcpMessages(printNo string) {
return
}
workFunc := func() {
go func() {
for {
if t.TimeoutMap[printNo] == true {
timeNow := time.Now()
@@ -225,31 +206,7 @@ func (t *TcpClient) HandleTcpMessages(printNo string) {
return
}
}
}
pool.Add(workFunc)
pool.Run()
//go func() {
// for {
// if t.TimeoutMap[printNo] == true {
// timeNow := time.Now()
// timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location())
// timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
// prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
// for _, printMsg := range prints {
// printMsg.Status = printMsgAlreadyLoad
// //先避免重复读再插到channel
// if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
// if err = t.addMsgChan(printMsg); err != nil {
// globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
// }
// }
// }
// } else {
// globals.SugarLogger.Debugf("HandleTcpMessages timeout")
// return
// }
// }
//}()
}()
}
func (t *TcpClient) readTimeoutMap(key string) bool {

View File

@@ -8,7 +8,6 @@ import (
"strconv"
"strings"
"testing"
"time"
"unicode/utf8"
)
@@ -83,19 +82,3 @@ func TestTen216(t *testing.T) {
fmt.Println(gg)
fmt.Println(kk)
}
func TestSimplePool(t *testing.T) {
p := NewSimplePoll(20)
for i := 0; i < 100; i++ {
p.Add(parseTask(i))
}
p.Run()
}
func parseTask(i int) func() {
return func() {
// 模拟抓取数据的过程
time.Sleep(time.Second * 1)
fmt.Println("finish parse ", i)
}
}

View File

@@ -1,48 +1,49 @@
package event
import (
"fmt"
"sync"
)
type SimplePool struct {
wg sync.WaitGroup
work chan func() //任务队列
}
func NewSimplePoll(workers int) *SimplePool {
p := &SimplePool{
wg: sync.WaitGroup{},
work: make(chan func()),
}
p.wg.Add(workers)
//根据指定的并发量去读取管道并执行
for i := 0; i < workers; i++ {
go func() {
defer func() {
// 捕获异常 防止waitGroup阻塞
if err := recover(); err != nil {
fmt.Println(err)
p.wg.Done()
}
}()
// 从workChannel中取出任务执行
for fn := range p.work {
fn()
}
p.wg.Done()
}()
}
return p
}
// 添加任务
func (p *SimplePool) Add(fn func()) {
p.work <- fn
}
// 执行
func (p *SimplePool) Run() {
close(p.work)
p.wg.Wait()
}
//
//import (
// "fmt"
// "sync"
//)
//
//type SimplePool struct {
// wg sync.WaitGroup
// work chan func() //任务队列
//}
//
//func NewSimplePoll(workers int) *SimplePool {
// p := &SimplePool{
// wg: sync.WaitGroup{},
// work: make(chan func()),
// }
// p.wg.Add(workers)
// //根据指定的并发量去读取管道并执行
// for i := 0; i < workers; i++ {
// go func() {
// defer func() {
// // 捕获异常 防止waitGroup阻塞
// if err := recover(); err != nil {
// fmt.Println(err)
// p.wg.Done()
// }
// }()
// // 从workChannel中取出任务执行
// for fn := range p.work {
// fn()
// }
// p.wg.Done()
// }()
// }
// return p
//}
//
//// 添加任务
//func (p *SimplePool) Add(fn func()) {
// p.work <- fn
//}
//
//// 执行
//func (p *SimplePool) Run() {
// close(p.work)
// p.wg.Wait()
//}