diff --git a/business/jxstore/event/event_tcp.go b/business/jxstore/event/event_tcp.go index 6424c6b7c..c3c108e15 100644 --- a/business/jxstore/event/event_tcp.go +++ b/business/jxstore/event/event_tcp.go @@ -16,6 +16,8 @@ import ( "unicode/utf8" ) +var pool = NewSimplePoll(100) + //入口 func ListenTcp() { l, err := net.Listen("tcp", ":8000") @@ -23,19 +25,46 @@ func ListenTcp() { fmt.Println("listen error:", err) return } + for { c, err := l.Accept() if err != nil { fmt.Println("accept error:", err) break } - go func() { + + printFunc := func() { + defer func() { + // 捕获异常 防止waitGroup阻塞 + if err := recover(); err != nil { + fmt.Println("recover err = ", err) + return + } + }() + t := NewTcpClient() if err := handleConn(c, t); err != nil { t = nil return } - }() + } + pool.Add(printFunc) + pool.Run() + //go func() { + // defer func() { + // // 捕获异常 防止waitGroup阻塞 + // if err := recover(); err != nil { + // fmt.Println("recover err = ", err) + // return + // } + // }() + // + // t := NewTcpClient() + // if err := handleConn(c, t); err != nil { + // t = nil + // return + // } + //}() } } @@ -175,7 +204,7 @@ func (t *TcpClient) HandleTcpMessages(printNo string) { return } - go func() { + workFunc := func() { for { if t.TimeoutMap[printNo] == true { timeNow := time.Now() @@ -196,7 +225,31 @@ func (t *TcpClient) HandleTcpMessages(printNo string) { return } } - }() + } + pool.Add(workFunc) + pool.Run() + //go func() { + // for { + // if t.TimeoutMap[printNo] == true { + // timeNow := time.Now() + // timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location()) + // timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location()) + // prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart.AddDate(0, 0, -1), timeEnd, offset, pageSize) + // for _, printMsg := range prints { + // printMsg.Status = printMsgAlreadyLoad + // //先避免重复读再插到channel? + // if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil { + // if err = t.addMsgChan(printMsg); err != nil { + // globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err) + // } + // } + // } + // } else { + // globals.SugarLogger.Debugf("HandleTcpMessages timeout") + // return + // } + // } + //}() } func (t *TcpClient) readTimeoutMap(key string) bool { diff --git a/business/jxstore/event/print_test.go b/business/jxstore/event/print_test.go index 542f93afb..5d068d1b0 100644 --- a/business/jxstore/event/print_test.go +++ b/business/jxstore/event/print_test.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "testing" + "time" "unicode/utf8" ) @@ -82,3 +83,19 @@ func TestTen216(t *testing.T) { fmt.Println(gg) fmt.Println(kk) } + +func TestSimplePool(t *testing.T) { + p := NewSimplePoll(20) + for i := 0; i < 100; i++ { + p.Add(parseTask(i)) + } + p.Run() +} + +func parseTask(i int) func() { + return func() { + // 模拟抓取数据的过程 + time.Sleep(time.Second * 1) + fmt.Println("finish parse ", i) + } +} diff --git a/business/jxstore/event/timing_task.go b/business/jxstore/event/timing_task.go index 868b94407..af48a5c58 100644 --- a/business/jxstore/event/timing_task.go +++ b/business/jxstore/event/timing_task.go @@ -1,58 +1,48 @@ package event -// -//import ( -// "git.rosy.net.cn/baseapi/utils" -// "git.rosy.net.cn/jx-callback/business/model/dao" -// "git.rosy.net.cn/jx-callback/globals" -// "time" -//) -// -//func ScheduleTimerFuncByInterval(handler func(), delay, inerval time.Duration) { -// utils.AfterFuncWithRecover(delay, func() { -// beginTime := time.Now() -// handler() -// delay = inerval - time.Now().Sub(beginTime) -// if delay < time.Second { -// delay = time.Second -// } -// ScheduleTimerFuncByInterval(handler, delay, inerval) -// }) -//} -// -//func (t *TcpClient) HandleTcpMessages2() { -// var ( -// db = dao.GetDB() -// offset, pageSize = 0, 10 -// ) -//for printNo,have := range -// select { -// case <-t.TimeoutMap[printNo]: -// globals.SugarLogger.Debugf("HandleTcpMessages timeout") -// return -// default: -// //一直读? -// timeNow := time.Now() -// timeStart := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 0, 0, 0, 0, timeNow.Location()) -// timeEnd := time.Date(timeNow.Year(), timeNow.Month(), timeNow.Day(), 23, 59, 59, 0, timeNow.Location()) -// prints, _ := dao.GetPrintMsgs(db, printNo, []int{printMsgWait}, timeStart, timeEnd, offset, pageSize) -// for _, printMsg := range prints { -// printMsg.Status = printMsgAlreadyLoad -// //先避免重复读再插到channel? -// if _, err := dao.UpdateEntity(db, printMsg, "Status"); err == nil { -// if err = t.addMsgChan(printMsg); err != nil { -// globals.SugarLogger.Debugf("HandleTcpMessages addMsgChan Err: %v", err) -// } -// } -// } -// } -//} -// -//func (t *TcpClient) GetTimeOutMap() map[string]chan bool { -// t.RLock() -// defer t.RUnlock() -// if len(t.TimeoutMap) > 0 { -// return t.TimeoutMap -// } -// return nil -//} +import ( + "fmt" + "sync" +) + +type SimplePool struct { + wg sync.WaitGroup + work chan func() //任务队列 +} + +func NewSimplePoll(workers int) *SimplePool { + p := &SimplePool{ + wg: sync.WaitGroup{}, + work: make(chan func()), + } + p.wg.Add(workers) + //根据指定的并发量去读取管道并执行 + for i := 0; i < workers; i++ { + go func() { + defer func() { + // 捕获异常 防止waitGroup阻塞 + if err := recover(); err != nil { + fmt.Println(err) + p.wg.Done() + } + }() + // 从workChannel中取出任务执行 + for fn := range p.work { + fn() + } + p.wg.Done() + }() + } + return p +} + +// 添加任务 +func (p *SimplePool) Add(fn func()) { + p.work <- fn +} + +// 执行 +func (p *SimplePool) Run() { + close(p.work) + p.wg.Wait() +}