This commit is contained in:
邹宗楠
2023-05-29 16:46:26 +08:00
parent 9fee44b6fd
commit 958ab4686d
3 changed files with 120 additions and 60 deletions

View File

@@ -16,6 +16,8 @@ import (
"unicode/utf8" "unicode/utf8"
) )
var pool = NewSimplePoll(100)
//入口 //入口
func ListenTcp() { func ListenTcp() {
l, err := net.Listen("tcp", ":8000") l, err := net.Listen("tcp", ":8000")
@@ -23,19 +25,46 @@ func ListenTcp() {
fmt.Println("listen error:", err) fmt.Println("listen error:", err)
return return
} }
for { for {
c, err := l.Accept() c, err := l.Accept()
if err != nil { if err != nil {
fmt.Println("accept error:", err) fmt.Println("accept error:", err)
break break
} }
go func() {
printFunc := func() {
defer func() {
// 捕获异常 防止waitGroup阻塞
if err := recover(); err != nil {
fmt.Println("recover err = ", err)
return
}
}()
t := NewTcpClient() t := NewTcpClient()
if err := handleConn(c, t); err != nil { if err := handleConn(c, t); err != nil {
t = nil t = nil
return return
} }
}() }
pool.Add(printFunc)
pool.Run()
//go func() {
// defer func() {
// // 捕获异常 防止waitGroup阻塞
// if err := recover(); err != nil {
// fmt.Println("recover err = ", err)
// return
// }
// }()
//
// t := NewTcpClient()
// if err := handleConn(c, t); err != nil {
// t = nil
// return
// }
//}()
} }
} }
@@ -175,7 +204,7 @@ func (t *TcpClient) HandleTcpMessages(printNo string) {
return return
} }
go func() { workFunc := func() {
for { for {
if t.TimeoutMap[printNo] == true { if t.TimeoutMap[printNo] == true {
timeNow := time.Now() timeNow := time.Now()
@@ -196,7 +225,31 @@ func (t *TcpClient) HandleTcpMessages(printNo string) {
return return
} }
} }
}() }
pool.Add(workFunc)
pool.Run()
//go func() {
// for {
// if t.TimeoutMap[printNo] == true {
// timeNow := time.Now()
// timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location())
// timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location())
// prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize)
// for _, printMsg := range prints {
// printMsg.Status = printMsgAlreadyLoad
// //先避免重复读再插到channel
// if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil {
// if err = t.addMsgChan(printMsg); err != nil {
// globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err)
// }
// }
// }
// } else {
// globals.SugarLogger.Debugf("HandleTcpMessages timeout")
// return
// }
// }
//}()
} }
func (t *TcpClient) readTimeoutMap(key string) bool { func (t *TcpClient) readTimeoutMap(key string) bool {

View File

@@ -8,6 +8,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
"time"
"unicode/utf8" "unicode/utf8"
) )
@@ -82,3 +83,19 @@ func TestTen216(t *testing.T) {
fmt.Println(gg) fmt.Println(gg)
fmt.Println(kk) fmt.Println(kk)
} }
func TestSimplePool(t *testing.T) {
p := NewSimplePoll(20)
for i := 0; i < 100; i++ {
p.Add(parseTask(i))
}
p.Run()
}
func parseTask(i int) func() {
return func() {
// 模拟抓取数据的过程
time.Sleep(time.Second * 1)
fmt.Println("finish parse ", i)
}
}

View File

@@ -1,58 +1,48 @@
package event package event
// import (
//import ( "fmt"
// "git.rosy.net.cn/baseapi/utils" "sync"
// "git.rosy.net.cn/jx-callback/business/model/dao" )
// "git.rosy.net.cn/jx-callback/globals"
// "time" type SimplePool struct {
//) wg sync.WaitGroup
// work chan func() //任务队列
//func ScheduleTimerFuncByInterval(handler func(), delay, inerval time.Duration) { }
// utils.AfterFuncWithRecover(delay, func() {
// beginTime := time.Now() func NewSimplePoll(workers int) *SimplePool {
// handler() p := &SimplePool{
// delay = inerval - time.Now().Sub(beginTime) wg: sync.WaitGroup{},
// if delay < time.Second { work: make(chan func()),
// delay = time.Second }
// } p.wg.Add(workers)
// ScheduleTimerFuncByInterval(handler, delay, inerval) //根据指定的并发量去读取管道并执行
// }) for i := 0; i < workers; i++ {
//} go func() {
// defer func() {
//func (t *TcpClient) HandleTcpMessages2() { // 捕获异常 防止waitGroup阻塞
// var ( if err := recover(); err != nil {
// db = dao.GetDB() fmt.Println(err)
// offset, pageSize = 0, 10 p.wg.Done()
// ) }
//for printNo,have := range }()
// select { // 从workChannel中取出任务执行
// case <-t.TimeoutMap[printNo]: for fn := range p.work {
// globals.SugarLogger.Debugf("HandleTcpMessages timeout") fn()
// return }
// default: p.wg.Done()
// //一直读? }()
// timeNow := time.Now() }
// timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location()) return p
// timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location()) }
// prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart, timeEnd, offset, pageSize)
// for _, printMsg := range prints { // 添加任务
// printMsg.Status = printMsgAlreadyLoad func (p *SimplePool) Add(fn func()) {
// //先避免重复读再插到channel p.work <- fn
// if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil { }
// if err = t.addMsgChan(printMsg); err != nil {
// globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err) // 执行
// } func (p *SimplePool) Run() {
// } close(p.work)
// } p.wg.Wait()
// } }
//}
//
//func (t *TcpClient) GetTimeOutMap() map[string]chan bool {
// t.RLock()
// defer t.RUnlock()
// if len(t.TimeoutMap) > 0 {
// return t.TimeoutMap
// }
// return nil
//}