package netspider import ( "fmt" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/ditu" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" ) const ( DefRadius = 8000 DefGridWith = 2000 ) func GetCityShops(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorIDs []int, cityCode, radius, gridWith int) (pageStoreList []*model.PageShop, err error) { coordList := ditu.GetCityCoordinateList(cityCode, radius, gridWith) if len(coordList) > 0 { task := tasksch.NewParallelTask("GetCityShops", tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) storeList, err2 := getStoreListByCoordinates(ctx, task, vendorID, utils.Int2Str(cityCode), coordList) if err = err2; err == nil { retVal = storeList } globals.SugarLogger.Debugf("GetCityShops vendorID:%d, cityCode:%d, len(storeList):%d, err:%v", vendorID, cityCode, len(storeList), err) return retVal, err }, vendorIDs) tasksch.AddChild(parentTask, task).Run() list, err2 := task.GetResult(0) if err = err2; err != nil && len(list) == 0 { return nil, err } err = nil for _, v := range list { v2 := v.(*model.PageShop) v2.CityCode = cityCode dao.WrapAddIDCULDEntity(v, ctx.GetUserName()) pageStoreList = append(pageStoreList, v.(*model.PageShop)) } } return pageStoreList, err } func getStoreListByCoordinates(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID int, cityInfo string, coordList []*ditu.Coordinate) (storeList []*model.PageShop, err error) { if len(coordList) > 0 { if handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IPurchasePlatformNetSpiderHandler); handler != nil { mainStoreIDList, _ := handler.GetStoreIDListByCoordinates(ctx, coordList[0]) if len(mainStoreIDList) > 0 { task1 := tasksch.NewParallelTask(fmt.Sprintf("GetStoreListByCoordinate[%s] get list", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true).SetParallelCount(1), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { pos := batchItemList[0].(*ditu.Coordinate) storeIDList, err := handler.GetStoreIDListByCoordinates(ctx, pos) if err != nil { return nil, err } return storeIDList, nil }, coordList) tasksch.AddChild(parentTask, task1).Run() fullStoreIDs, err2 := task1.GetResult(0) if err = err2; err != nil && len(fullStoreIDs) == 0 { return nil, err } storeIDMap := make(map[string]int) for _, v := range fullStoreIDs { storeIDMap[v.(string)] = 1 } var storeIDs []string for storeID := range storeIDMap { storeIDs = append(storeIDs, storeID) } task2 := tasksch.NewParallelTask(fmt.Sprintf("GetStoreListByCoordinate[%s] get detail", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeID := batchItemList[0].(string) storePageInfo, err := handler.GetStorePageInfo(ctx, cityInfo, storeID) if err == nil && storePageInfo != nil { return []interface{}{storePageInfo}, nil } return nil, err }, storeIDs) tasksch.AddChild(parentTask, task2).Run() shopList, err2 := task2.GetResult(0) if err = err2; err != nil && len(shopList) == 0 { return nil, err } err = nil for _, v := range shopList { storeList = append(storeList, v.(*model.PageShop)) } } } } return storeList, err } func GetAndStoreCitiesShops(ctx *jxcontext.Context, vendorIDs []int, cityCodeList []int, radius, gridWith int, isAsync bool) (hint string, err error) { db := dao.GetDB() if len(cityCodeList) == 0 { placeList, err2 := dao.GetPlacesByCond(db, dao.EnableCondAll) if err = err2; err != nil { return "", err } for _, v := range placeList { cityCodeList = append(cityCodeList, v.Code) } // cityCodeList = []int{510100} } if len(vendorIDs) == 0 { vendorIDs = []int{model.VendorIDJD, model.VendorIDEBAI} } if radius <= 0 { radius = DefRadius } if gridWith <= 0 { gridWith = DefGridWith } task := tasksch.NewSeqTask("GetAndStoreCitiesShops", ctx, func(task *tasksch.SeqTask, step int, params ...interface{}) (result interface{}, err error) { cityCode := cityCodeList[step] globals.SugarLogger.Debugf("process city:%d", cityCode) shopList, err := GetCityShops(ctx, task, vendorIDs, cityCode, radius, gridWith) if err == nil { dao.Begin(db) defer func() { if r := recover(); r != nil { dao.Rollback(db) panic(r) } }() for _, v := range shopList { tmpShop := *v dao.DeleteEntity(db, &tmpShop, model.FieldVendorStoreID, model.FieldVendorID) if err = dao.CreateEntity(db, v); err != nil { break } } if err != nil { dao.Rollback(db) } else { hint = utils.Int2Str(len(shopList)) dao.Commit(db) } } globals.SugarLogger.Debugf("process city:%d, len(shopList):%d, err:%v", cityCode, len(shopList), err) return nil, nil // 强制继续 }, len(cityCodeList)) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } else { hint = task.GetID() } return hint, err } func RefreshPageStore(ctx *jxcontext.Context) (err error) { sql := ` SELECT * FROM page_shop t1 WHERE t1.vendor_id = 0 ` db := dao.GetDB() var shopList []*model.PageShop if err = dao.GetRows(db, &shopList, sql); err != nil { return err } for _, v := range shopList { if handler, _ := partner.GetPurchasePlatformFromVendorID(v.VendorID).(partner.IPurchasePlatformNetSpiderHandler); handler != nil { storePageInfo, err2 := handler.GetStorePageInfo(ctx, "", v.VendorStoreID) if err2 == nil { v.RecentOrderNum = storePageInfo.RecentOrderNum v.SkuCount = storePageInfo.SkuCount dao.UpdateEntity(db, v, "RecentOrderNum", "SkuCount") } else { globals.SugarLogger.Debugf("RefreshPageStore err:%v", err) } } } return err }