package netspider import ( "fmt" "math/rand" "git.rosy.net.cn/jx-callback/globals" "git.rosy.net.cn/jx-callback/globals/api" "git.rosy.net.cn/baseapi/utils" "git.rosy.net.cn/jx-callback/business/jxutils/ditu" "git.rosy.net.cn/jx-callback/business/jxutils/jxcontext" "git.rosy.net.cn/jx-callback/business/jxutils/tasksch" "git.rosy.net.cn/jx-callback/business/model" "git.rosy.net.cn/jx-callback/business/model/dao" "git.rosy.net.cn/jx-callback/business/partner" ) const ( DefRadius = 8000 DefGridWith = 2000 ) func GetCityShops(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorIDs []int, cityCode, radius, gridWith int) (pageStoreList []*model.PageShop, err error) { coordList := ditu.GetCityCoordinateList(cityCode, radius, gridWith) if len(coordList) > 0 { task := tasksch.NewParallelTask(fmt.Sprintf("GetCityShops:%d", cityCode), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { vendorID := batchItemList[0].(int) storeList, err2 := getStoreListByCoordinates(ctx, task, vendorID, cityCode, coordList) if err = err2; err == nil { retVal = storeList } globals.SugarLogger.Debugf("GetCityShops vendorID:%d, cityCode:%d, len(storeList):%d, err:%v", vendorID, cityCode, len(storeList), err) return retVal, err }, vendorIDs) tasksch.AddChild(parentTask, task).Run() list, err2 := task.GetResult(0) if err = err2; err != nil && len(list) == 0 { return nil, err } err = nil for _, v := range list { dao.WrapAddIDCULDEntity(v, ctx.GetUserName()) pageStoreList = append(pageStoreList, v.(*model.PageShop)) } } return pageStoreList, err } func getStorePageInfo(ctx *jxcontext.Context, handler partner.IPurchasePlatformNetSpiderHandler, cityCode int, vendorStoreID string) (storePageInfo *model.PageShop, err error) { storePageInfo, err = handler.GetStorePageInfo(ctx, vendorStoreID) if err == nil && storePageInfo != nil { updatePageShopCityDistrictInfo(ctx, storePageInfo, cityCode) } return storePageInfo, err } func updatePageShopCityDistrictInfo(ctx *jxcontext.Context, storePageInfo *model.PageShop, cityCode int) { if !(storePageInfo.Lng != 0 && storePageInfo.Lat != 0) { storePageInfo.Lng, storePageInfo.Lat, storePageInfo.DistrictCode = api.AutonaviAPI.GetCoordinateFromAddress(storePageInfo.Address, utils.Int2Str(cityCode)) if storePageInfo.DistrictCode == 0 && cityCode != 0 { if place, err := dao.GetPlaceByCode(dao.GetDB(), cityCode); err == nil { storePageInfo.Lng, storePageInfo.Lat, storePageInfo.DistrictCode = api.AutonaviAPI.GetCoordinateFromAddress(storePageInfo.Address, utils.Int2Str(place.ParentCode)) } } } else if storePageInfo.DistrictCode == 0 { storePageInfo.DistrictCode = api.AutonaviAPI.GetCoordinateDistrictCode(storePageInfo.Lng, storePageInfo.Lat) } if storePageInfo.CityCode == 0 { if storePageInfo.DistrictCode != 0 { if place, err := dao.GetPlaceByCode(dao.GetDB(), storePageInfo.DistrictCode); err == nil { storePageInfo.CityCode = place.ParentCode } } if storePageInfo.CityCode == 0 { storePageInfo.CityCode = cityCode } } } func getStoreListByCoordinates(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, cityCode int, coordList []*ditu.Coordinate) (storeList []*model.PageShop, err error) { if len(coordList) > 0 { if handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IPurchasePlatformNetSpiderHandler); handler != nil { mainStoreIDList, _ := handler.GetStoreIDListByCoordinates(ctx, coordList[0]) if len(mainStoreIDList) > 0 { task1 := tasksch.NewParallelTask(fmt.Sprintf("GetStoreListByCoordinate[%s] get list", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true).SetParallelCount(1), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { pos := batchItemList[0].(*ditu.Coordinate) storeIDList, err := handler.GetStoreIDListByCoordinates(ctx, pos) if err != nil { return nil, err } return storeIDList, nil }, coordList) tasksch.AddChild(parentTask, task1).Run() fullStoreIDs, err2 := task1.GetResult(0) if err = err2; err != nil && len(fullStoreIDs) == 0 { return nil, err } storeIDMap := make(map[string]int) for _, v := range fullStoreIDs { storeIDMap[v.(string)] = 1 } var storeIDs []string for storeID := range storeIDMap { storeIDs = append(storeIDs, storeID) } task2 := tasksch.NewParallelTask(fmt.Sprintf("GetStoreListByCoordinate[%s] get detail", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { storeID := batchItemList[0].(string) storePageInfo, err := getStorePageInfo(ctx, handler, cityCode, storeID) if err == nil && storePageInfo != nil { return []interface{}{storePageInfo}, nil } return nil, err }, storeIDs) tasksch.AddChild(parentTask, task2).Run() shopList, err2 := task2.GetResult(0) if err = err2; err != nil && len(shopList) == 0 { return nil, err } err = nil for _, v := range shopList { storeList = append(storeList, v.(*model.PageShop)) } } } } return storeList, err } func GetAndStoreCitiesShops(ctx *jxcontext.Context, vendorIDs []int, cityCodeList []int, radius, gridWith int, isShuffle, isAsync bool) (hint string, err error) { db := dao.GetDB() if len(cityCodeList) == 0 { placeList, err2 := dao.GetPlacesByCond(db, dao.EnableCondAll) if err = err2; err != nil { return "", err } for _, v := range placeList { cityCodeList = append(cityCodeList, v.Code) } } if isShuffle { rand.Shuffle(len(cityCodeList), func(i, j int) { cityCodeList[i], cityCodeList[j] = cityCodeList[j], cityCodeList[i] }) } else { sql := ` SELECT * FROM page_shop t1 ORDER BY t1.id DESC LIMIT 1` var lastShop *model.PageShop if err2 := dao.GetRow(db, &lastShop, sql); err2 == nil { index := -1 for k, v := range cityCodeList { if v >= lastShop.CityCode { index = k if v == lastShop.CityCode { index++ } if index >= len(cityCodeList) { index = -1 } break } } if index > 0 { var cityCodeList2 []int cityCodeList2 = append(cityCodeList2, cityCodeList[index:]...) cityCodeList2 = append(cityCodeList2, cityCodeList[:index]...) cityCodeList = cityCodeList2 } globals.SugarLogger.Debugf("GetAndStoreCitiesShops last cityCode:%d, cityCodeList:%v", lastShop.CityCode, cityCodeList) } else { globals.SugarLogger.Debugf("GetAndStoreCitiesShops get lastest city code error:%v", err2) } } if len(vendorIDs) == 0 { vendorIDs = []int{model.VendorIDJD, model.VendorIDEBAI} } if radius <= 0 { radius = DefRadius } if gridWith <= 0 { gridWith = DefGridWith } task := tasksch.NewParallelTask(fmt.Sprintf("GetAndStoreCitiesShops:%v", vendorIDs), tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(true), ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { cityCode := batchItemList[0].(int) globals.SugarLogger.Debugf("process city:%d", cityCode) shopList, err := GetCityShops(ctx, task, vendorIDs, cityCode, radius, gridWith) if err == nil { dao.Begin(db) defer func() { if r := recover(); r != nil { dao.Rollback(db) panic(r) } }() for _, v := range shopList { globals.SugarLogger.Debugf("GetAndStoreCitiesShops cityCode:%d, 平台:%s, shopID:%s, districtCode:%d", cityCode, model.VendorChineseNames[v.VendorID], v.VendorStoreID, v.DistrictCode) if v.DistrictCode > 0 { tmpShop := *v dao.DeleteEntity(db, &tmpShop, model.FieldVendorStoreID, model.FieldVendorID) } if err = dao.CreateEntity(db, v); err != nil { if dao.IsDuplicateError(err) { err = nil } else { break } } } if err != nil { dao.Rollback(db) } else { hint = utils.Int2Str(len(shopList)) dao.Commit(db) } } globals.SugarLogger.Debugf("process city:%d, len(shopList):%d, err:%v", cityCode, len(shopList), err) return nil, err }, cityCodeList) tasksch.ManageTask(task).Run() if !isAsync { _, err = task.GetResult(0) } else { hint = task.GetID() } return hint, err } func RefreshPageShops(ctx *jxcontext.Context) (err error) { sql := ` SELECT * FROM page_shop t1 WHERE t1.district_code = 0 AND t1.lng != 0 AND t1.lat != 0` db := dao.GetDB() var shopList []*model.PageShop if err = dao.GetRows(db, &shopList, sql); err != nil { return err } task := tasksch.NewParallelTask(fmt.Sprintf("刷新网页门店信息:%d", len(shopList)), nil, ctx, func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) { pageShop := batchItemList[0].(*model.PageShop) updatePageShopCityDistrictInfo(ctx, pageShop, pageShop.CityCode) _, err = dao.UpdateEntity(db, pageShop, "CityCode", "DistrictCode", "Lng", "Lat") return retVal, err }, shopList) tasksch.HandleTask(task, nil, true).Run() return err }