257 lines
9.3 KiB
Go
257 lines
9.3 KiB
Go
package netspider
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
|
|
"git.rosy.net.cn/jx-callback/globals"
|
|
"git.rosy.net.cn/jx-callback/globals/api"
|
|
|
|
"git.rosy.net.cn/baseapi/utils"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/ditu"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/jxcontext"
|
|
"git.rosy.net.cn/jx-callback/business/jxutils/tasksch"
|
|
"git.rosy.net.cn/jx-callback/business/model"
|
|
"git.rosy.net.cn/jx-callback/business/model/dao"
|
|
"git.rosy.net.cn/jx-callback/business/partner"
|
|
)
|
|
|
|
const (
|
|
DefRadius = 8000
|
|
DefGridWith = 2000
|
|
)
|
|
|
|
func GetCityShops(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorIDs []int, cityCode, radius, gridWith int) (pageStoreList []*model.PageShop, err error) {
|
|
coordList := ditu.GetCityCoordinateList(cityCode, radius, gridWith)
|
|
if len(coordList) > 0 {
|
|
task := tasksch.NewParallelTask(fmt.Sprintf("GetCityShops:%d", cityCode), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
|
|
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
vendorID := batchItemList[0].(int)
|
|
storeList, err2 := getStoreListByCoordinates(ctx, task, vendorID, cityCode, coordList)
|
|
if err = err2; err == nil {
|
|
retVal = storeList
|
|
}
|
|
globals.SugarLogger.Debugf("GetCityShops vendorID:%d, cityCode:%d, len(storeList):%d, err:%v", vendorID, cityCode, len(storeList), err)
|
|
return retVal, err
|
|
}, vendorIDs)
|
|
tasksch.AddChild(parentTask, task).Run()
|
|
list, err2 := task.GetResult(0)
|
|
if err = err2; err != nil && len(list) == 0 {
|
|
return nil, err
|
|
}
|
|
err = nil
|
|
for _, v := range list {
|
|
dao.WrapAddIDCULDEntity(v, ctx.GetUserName())
|
|
pageStoreList = append(pageStoreList, v.(*model.PageShop))
|
|
}
|
|
}
|
|
return pageStoreList, err
|
|
}
|
|
|
|
func getStorePageInfo(ctx *jxcontext.Context, handler partner.IPurchasePlatformNetSpiderHandler, cityCode int, vendorStoreID string) (storePageInfo *model.PageShop, err error) {
|
|
storePageInfo, err = handler.GetStorePageInfo(ctx, vendorStoreID)
|
|
if err == nil && storePageInfo != nil {
|
|
updatePageShopCityDistrictInfo(ctx, storePageInfo, cityCode)
|
|
}
|
|
return storePageInfo, err
|
|
}
|
|
|
|
func updatePageShopCityDistrictInfo(ctx *jxcontext.Context, storePageInfo *model.PageShop, cityCode int) {
|
|
if !(storePageInfo.Lng != 0 && storePageInfo.Lat != 0) {
|
|
storePageInfo.Lng, storePageInfo.Lat, storePageInfo.DistrictCode = api.AutonaviAPI.GetCoordinateFromAddress(storePageInfo.Address, utils.Int2Str(cityCode))
|
|
if storePageInfo.DistrictCode == 0 && cityCode != 0 {
|
|
if place, err := dao.GetPlaceByCode(dao.GetDB(), cityCode); err == nil {
|
|
storePageInfo.Lng, storePageInfo.Lat, storePageInfo.DistrictCode = api.AutonaviAPI.GetCoordinateFromAddress(storePageInfo.Address, utils.Int2Str(place.ParentCode))
|
|
}
|
|
}
|
|
} else if storePageInfo.DistrictCode == 0 {
|
|
storePageInfo.DistrictCode = api.AutonaviAPI.GetCoordinateDistrictCode(storePageInfo.Lng, storePageInfo.Lat)
|
|
}
|
|
if storePageInfo.CityCode == 0 {
|
|
if storePageInfo.DistrictCode != 0 {
|
|
if place, err := dao.GetPlaceByCode(dao.GetDB(), storePageInfo.DistrictCode); err == nil {
|
|
storePageInfo.CityCode = place.ParentCode
|
|
}
|
|
}
|
|
if storePageInfo.CityCode == 0 {
|
|
storePageInfo.CityCode = cityCode
|
|
}
|
|
}
|
|
}
|
|
|
|
func getStoreListByCoordinates(ctx *jxcontext.Context, parentTask tasksch.ITask, vendorID, cityCode int, coordList []*ditu.Coordinate) (storeList []*model.PageShop, err error) {
|
|
if len(coordList) > 0 {
|
|
if handler, _ := partner.GetPurchasePlatformFromVendorID(vendorID).(partner.IPurchasePlatformNetSpiderHandler); handler != nil {
|
|
mainStoreIDList, _ := handler.GetStoreIDListByCoordinates(ctx, coordList[0])
|
|
if len(mainStoreIDList) > 0 {
|
|
task1 := tasksch.NewParallelTask(fmt.Sprintf("GetStoreListByCoordinate[%s] get list", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true).SetParallelCount(1), ctx,
|
|
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
pos := batchItemList[0].(*ditu.Coordinate)
|
|
storeIDList, err := handler.GetStoreIDListByCoordinates(ctx, pos)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return storeIDList, nil
|
|
}, coordList)
|
|
tasksch.AddChild(parentTask, task1).Run()
|
|
fullStoreIDs, err2 := task1.GetResult(0)
|
|
if err = err2; err != nil && len(fullStoreIDs) == 0 {
|
|
return nil, err
|
|
}
|
|
storeIDMap := make(map[string]int)
|
|
for _, v := range fullStoreIDs {
|
|
storeIDMap[v.(string)] = 1
|
|
}
|
|
var storeIDs []string
|
|
for storeID := range storeIDMap {
|
|
storeIDs = append(storeIDs, storeID)
|
|
}
|
|
|
|
task2 := tasksch.NewParallelTask(fmt.Sprintf("GetStoreListByCoordinate[%s] get detail", model.VendorChineseNames[vendorID]), tasksch.NewParallelConfig().SetIsContinueWhenError(true), ctx,
|
|
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
storeID := batchItemList[0].(string)
|
|
storePageInfo, err := getStorePageInfo(ctx, handler, cityCode, storeID)
|
|
if err == nil && storePageInfo != nil {
|
|
return []interface{}{storePageInfo}, nil
|
|
}
|
|
return nil, err
|
|
}, storeIDs)
|
|
tasksch.AddChild(parentTask, task2).Run()
|
|
shopList, err2 := task2.GetResult(0)
|
|
if err = err2; err != nil && len(shopList) == 0 {
|
|
return nil, err
|
|
}
|
|
err = nil
|
|
for _, v := range shopList {
|
|
storeList = append(storeList, v.(*model.PageShop))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return storeList, err
|
|
}
|
|
|
|
func GetAndStoreCitiesShops(ctx *jxcontext.Context, vendorIDs []int, cityCodeList []int, radius, gridWith int, isShuffle, isAsync bool) (hint string, err error) {
|
|
db := dao.GetDB()
|
|
if len(cityCodeList) == 0 {
|
|
placeList, err2 := dao.GetPlacesByCond(db, dao.EnableCondAll)
|
|
if err = err2; err != nil {
|
|
return "", err
|
|
}
|
|
for _, v := range placeList {
|
|
cityCodeList = append(cityCodeList, v.Code)
|
|
}
|
|
}
|
|
if isShuffle {
|
|
rand.Shuffle(len(cityCodeList), func(i, j int) {
|
|
cityCodeList[i], cityCodeList[j] = cityCodeList[j], cityCodeList[i]
|
|
})
|
|
} else {
|
|
sql := `
|
|
SELECT *
|
|
FROM page_shop t1
|
|
ORDER BY t1.id DESC
|
|
LIMIT 1`
|
|
var lastShop *model.PageShop
|
|
if err2 := dao.GetRow(db, &lastShop, sql); err2 == nil {
|
|
index := -1
|
|
for k, v := range cityCodeList {
|
|
if v >= lastShop.CityCode {
|
|
index = k
|
|
if v == lastShop.CityCode {
|
|
index++
|
|
}
|
|
if index >= len(cityCodeList) {
|
|
index = -1
|
|
}
|
|
break
|
|
}
|
|
}
|
|
if index > 0 {
|
|
var cityCodeList2 []int
|
|
cityCodeList2 = append(cityCodeList2, cityCodeList[index:]...)
|
|
cityCodeList2 = append(cityCodeList2, cityCodeList[:index]...)
|
|
cityCodeList = cityCodeList2
|
|
}
|
|
globals.SugarLogger.Debugf("GetAndStoreCitiesShops last cityCode:%d, cityCodeList:%v", lastShop.CityCode, cityCodeList)
|
|
} else {
|
|
globals.SugarLogger.Debugf("GetAndStoreCitiesShops get lastest city code error:%v", err2)
|
|
}
|
|
}
|
|
if len(vendorIDs) == 0 {
|
|
vendorIDs = []int{model.VendorIDJD, model.VendorIDEBAI}
|
|
}
|
|
if radius <= 0 {
|
|
radius = DefRadius
|
|
}
|
|
if gridWith <= 0 {
|
|
gridWith = DefGridWith
|
|
}
|
|
|
|
task := tasksch.NewParallelTask(fmt.Sprintf("GetAndStoreCitiesShops:%v", vendorIDs), tasksch.NewParallelConfig().SetParallelCount(1).SetIsContinueWhenError(true), ctx,
|
|
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
cityCode := batchItemList[0].(int)
|
|
globals.SugarLogger.Debugf("process city:%d", cityCode)
|
|
shopList, err := GetCityShops(ctx, task, vendorIDs, cityCode, radius, gridWith)
|
|
if err == nil {
|
|
txDB , _ := dao.Begin(db)
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
dao.Rollback(db, txDB)
|
|
panic(r)
|
|
}
|
|
}()
|
|
for _, v := range shopList {
|
|
globals.SugarLogger.Debugf("GetAndStoreCitiesShops cityCode:%d, 平台:%s, shopID:%s, districtCode:%d", cityCode, model.VendorChineseNames[v.VendorID], v.VendorStoreID, v.DistrictCode)
|
|
if v.DistrictCode > 0 {
|
|
tmpShop := *v
|
|
dao.DeleteEntity(db, &tmpShop, model.FieldVendorStoreID, model.FieldVendorID)
|
|
}
|
|
if err = dao.CreateEntity(db, v); err != nil {
|
|
if dao.IsDuplicateError(err) {
|
|
err = nil
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
dao.Rollback(db, txDB)
|
|
} else {
|
|
hint = utils.Int2Str(len(shopList))
|
|
dao.Commit(db, txDB)
|
|
}
|
|
}
|
|
globals.SugarLogger.Debugf("process city:%d, len(shopList):%d, err:%v", cityCode, len(shopList), err)
|
|
return nil, err
|
|
}, cityCodeList)
|
|
tasksch.ManageTask(task).Run()
|
|
if !isAsync {
|
|
_, err = task.GetResult(0)
|
|
} else {
|
|
hint = task.GetID()
|
|
}
|
|
return hint, err
|
|
}
|
|
|
|
func RefreshPageShops(ctx *jxcontext.Context) (err error) {
|
|
sql := `
|
|
SELECT *
|
|
FROM page_shop t1
|
|
WHERE t1.district_code = 0 AND t1.lng != 0 AND t1.lat != 0`
|
|
db := dao.GetDB()
|
|
var shopList []*model.PageShop
|
|
if err = dao.GetRows(db, &shopList, sql); err != nil {
|
|
return err
|
|
}
|
|
task := tasksch.NewParallelTask(fmt.Sprintf("刷新网页门店信息:%d", len(shopList)), nil, ctx,
|
|
func(task *tasksch.ParallelTask, batchItemList []interface{}, params ...interface{}) (retVal interface{}, err error) {
|
|
pageShop := batchItemList[0].(*model.PageShop)
|
|
updatePageShopCityDistrictInfo(ctx, pageShop, pageShop.CityCode)
|
|
_, err = dao.UpdateEntity(db, pageShop, "CityCode", "DistrictCode", "Lng", "Lat")
|
|
return retVal, err
|
|
}, shopList)
|
|
tasksch.HandleTask(task, nil, true).Run()
|
|
return err
|
|
}
|