ES全量索引校验-python
import unittestimport json
import requests
import ddt
from pymysql import connect
def getMySqlData():
try:
db = connect(host="xx.xx.xx.xx", user="xxx", password="xxx", db="gb_goods", port=3306,
charset="utf8")
# db = connect(host="localhost", user="root", password="root", db="test", port=3306, charset="utf8")
cur = db.cursor()
# sql需替换测试环境的sql
sql = "SELECT gie.good_sn,gie.v_wh_code AS goodsSn FROM goods_info_extend_s_7 gie WHERE gie.goods_status IN (2,4,5) AND gie.platform IN (1,2,4) AND gie.site_code = 'GB' GROUP BY gie.good_sn, gie.v_wh_code;"
# sql = "select goods_sn from kp_goods order by goods_sn"
# 分表轮询
# sql_1 = "select goods_sn from price_%s where ..."
# for table_num in range(10):
# sql_2 = sql_1 % table_num
cur.execute(sql)
result = cur.fetchall()
cur.close()
db.close()
except Exception as e:
print(e)
return result
@ddt.ddt
class MyTest(unittest.TestCase):
@ddt.data(*getMySqlData())
def test_gb(self, data):
goodsSn = data[0]
whCode = data[1]
goodsId = f"{goodsSn}#{whCode}"
print(goodsId)
url_es = "http://10.4.4.80:9200/gearbest20200212201449/sku/_search"
url_ai = "http://10.4.4.80:9200/GB_daily_full/sku/_search"
headers = {"Content-Type": "application/json"}
params = {"query": {"term": {"goodsId": {"value": goodsId}}}}
# params = {"query": {"term": {"goodsSn": {"value": goodsSn}}}}
# 调用内部的方法时,方法前面加 self.
es_goodsInfo, goodsId_es = self.gb_index(url_es, params, headers)
ai_goodsInfo, goodsId_ai = self.gb_index(url_ai, params, headers)
# 判断两个字典数据是否一致
if es_goodsInfo != ai_goodsInfo:
self.writeGoodsSn(goodsId_es)
# print("数据一致")
# 数据写入txt方法
def writeGoodsSn(self, goodsId):
file = r"D:\goodsId_GB.txt"
with open(file, "a+") as f:
f.write(goodsId + "\n")
print("数据写入成功...")
# 内嵌对象排序方法
def func_sort(self, jsonArray, list, sort_key1, sort_key2):
# jsonArray 排序,先按sort_key1排序,再按sort_key2排序
jsonArray.sort(key=lambda x: (x[sort_key1], x[sort_key2]))
# 字典类型转换为 JSON 对象,再将 JSON 对象类型转换为 Python 字典
data = json.loads(json.dumps(jsonArray))
# 将字典以指定key的顺序写入list
newList = []
for info in data:
dict = {}
for i in range(len(list)):
dict[list[i]] = info[list[i]]
newList.append(dict)
return newList
# 请求索引数据
def gb_index(self, url, params, headers):
# data:字典对象 json: json字符串
r = requests.post(url, data=json.dumps(params), headers=headers)
print("响应对象:", r.json())
r_json = r.json() # 返回字典类型,可以通过键名获取响应的值
if len(r_json["hits"]["hits"]) > 0:
# 定义一个字典,存储sku对应的各字段数据
goods_info = {}
# GB索引字段
goods_info["week2SalesVolume"] = r_json["hits"]["hits"][0]["_source"]["week2SalesVolume"]
goods_info["payEndTime"] = r_json["hits"]["hits"][0]["_source"]["payEndTime"]
goods_info["appSwellAmount"] = r_json["hits"]["hits"][0]["_source"]["appSwellAmount"]
goods_info["appPriceType"] = r_json["hits"]["hits"][0]["_source"]["appPriceType"]
goods_info["goodsId"] = r_json["hits"]["hits"][0]["_source"]["goodsId"]
goods_info["youtube"] = r_json["hits"]["hits"][0]["_source"]["youtube"]
# skuAttrs
# skuAttrs = r_json["hits"]["hits"][0]["_source"]["skuAttrs"]
# if len(skuAttrs) > 0:
# list = ["attrValueKey", "attrValue", "attrKey", "attrType", "attrName"]
# skuAttrs_list = self.func_sort(skuAttrs, list, list[3], list[0])
# goods_info["skuAttrs"] = skuAttrs_list
# else:
# goods_info["skuAttrs"] = skuAttrs
goods_info["week2Sales"] = r_json["hits"]["hits"][0]["_source"]["week2Sales"]
goods_info["shopPrice"] = r_json["hits"]["hits"][0]["_source"]["shopPrice"]
# grossMargin 暂写固定值
goods_info["grossMargin"] = "0"
goods_info["baseScore2"] = r_json["hits"]["hits"][0]["_source"]["baseScore2"]
goods_info["subTitle"] = r_json["hits"]["hits"][0]["_source"]["subTitle"]
goods_info["exposureFlag"] = r_json["hits"]["hits"][0]["_source"]["exposureFlag"]
goods_info["originalUrl"] = r_json["hits"]["hits"][0]["_source"]["originalUrl"]
goods_info["discount"] = r_json["hits"]["hits"][0]["_source"]["discount"]
goods_info["whCode"] = r_json["hits"]["hits"][0]["_source"]["whCode"]
goods_info["baseScore5"] = r_json["hits"]["hits"][0]["_source"]["baseScore5"]
goods_info["payStartTime"] = r_json["hits"]["hits"][0]["_source"]["payStartTime"]
goods_info["recommendedLevel"] = r_json["hits"]["hits"][0]["_source"]["recommendedLevel"]
goods_info["thumbExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["thumbExtendUrl"]
goods_info["appStatus"] = r_json["hits"]["hits"][0]["_source"]["appStatus"]
goods_info["passAvgScore"] = r_json["hits"]["hits"][0]["_source"]["passAvgScore"]
goods_info["stockFlag"] = r_json["hits"]["hits"][0]["_source"]["stockFlag"]
goods_info["brandName"] = r_json["hits"]["hits"][0]["_source"]["brandName"]
goods_info["appDisplayPrice"] = r_json["hits"]["hits"][0]["_source"]["appDisplayPrice"]
goods_info["centerWord"] = r_json["hits"]["hits"][0]["_source"]["centerWord"]
goods_info["goodsTitle"] = r_json["hits"]["hits"][0]["_source"]["goodsTitle"]
goods_info["brandCode"] = r_json["hits"]["hits"][0]["_source"]["brandCode"]
goods_info["appDeposit"] = r_json["hits"]["hits"][0]["_source"]["appDeposit"]
goods_info["vWhCode"] = r_json["hits"]["hits"][0]["_source"]["vWhCode"]
goods_info["catId"] = r_json["hits"]["hits"][0]["_source"]["catId"]
goods_info["priceRates"] = r_json["hits"]["hits"][0]["_source"]["priceRates"]
# skuDescAttrs
# skuDescAttrs = r_json["hits"]["hits"][0]["_source"]["skuDescAttrs"]
# if len(skuDescAttrs) > 0:
# list = ["attrValueKey", "attrValue", "attrKey", "attrName"]
# skuDescAttrs_list = self.func_sort(skuDescAttrs, list, list[2], list[0])
# goods_info["skuDescAttrs"] = skuDescAttrs_list
# else:
# goods_info["skuDescAttrs"] = skuDescAttrs
goods_info["totalFavoriteCount"] = r_json["hits"]["hits"][0]["_source"]["totalFavoriteCount"]
goods_info["baseScore4"] = r_json["hits"]["hits"][0]["_source"]["baseScore4"]
goods_info["appPayEndTime"] = r_json["hits"]["hits"][0]["_source"]["appPayEndTime"]
goods_info["urlTitle"] = r_json["hits"]["hits"][0]["_source"]["urlTitle"]
# categories
categories = r_json["hits"]["hits"][0]["_source"]["categories"]
if len(categories) > 0:
list = ["level", "catId", "catName", "isDefault"]
categories_list = self.func_sort(categories, list, list[1], list[3])
goods_info["categories"] = categories_list
else:
goods_info["categories"] = categories
goods_info["firstUpTime"] = r_json["hits"]["hits"][0]["_source"]["firstUpTime"]
# labelFlags
# labelFlags = r_json["hits"]["hits"][0]["_source"]["labelFlags"]
# if len(labelFlags) > 0:
# list = ["platform","type","labelId"]
# sort_key = "labelId"
# labelFlags_list = self.func_sort(labelFlags,list,sort_key)
# goods_info["labelFlags"] = labelFlags_list
# else:
# goods_info["labelFlags"] = labelFlags
goods_info["appPayStartTime"] = r_json["hits"]["hits"][0]["_source"]["appPayStartTime"]
goods_info["expiredTime"] = r_json["hits"]["hits"][0]["_source"]["expiredTime"]
goods_info["exposureSalesVolume"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesVolume"]
goods_info["appDiscount"] = r_json["hits"]["hits"][0]["_source"]["appDiscount"]
goods_info["baseScore1"] = r_json["hits"]["hits"][0]["_source"]["baseScore1"]
# coupons
# coupons = r_json["hits"]["hits"][0]["_source"]["coupons"]
# if len(coupons) > 0:
# list=["code","platforms"]
# sort_key = "code"
# coupons_list = self.func_sort(coupons,list,sort_key)
# goods_info["coupons"] = list
# else:
# goods_info["coupons"] = coupons
goods_info["lang"] = r_json["hits"]["hits"][0]["_source"]["lang"]
goods_info["isCod"] = r_json["hits"]["hits"][0]["_source"]["isCod"]
goods_info["searchWords"] = r_json["hits"]["hits"][0]["_source"]["searchWords"]
# defaultWh 暂写固定值
goods_info["defaultWh"] = "0"
goods_info["mStatus"] = r_json["hits"]["hits"][0]["_source"]["mStatus"]
goods_info["dailyRate"] = r_json["hits"]["hits"][0]["_source"]["dailyRate"]
goods_info["exposureSalesRate"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesRate"]
goods_info["sortOrder"] = r_json["hits"]["hits"][0]["_source"]["sortOrder"]
shopGroups = r_json["hits"]["hits"][0]["_source"]["shopGroups"]
if len(shopGroups) > 0:
list = ["groupId", "level", "groupName", "path"]
shopGroups_list = self.func_sort(shopGroups, list, list[1], list[0])
goods_info["shopGroups"] = shopGroups_list
else:
goods_info["shopGroups"] = shopGroups
goods_info["passTotalNum"] = r_json["hits"]["hits"][0]["_source"]["passTotalNum"]
goods_info["shopCode"] = r_json["hits"]["hits"][0]["_source"]["shopCode"]
goods_info["appExpiredTime"] = r_json["hits"]["hits"][0]["_source"]["appExpiredTime"]
goods_info["isTort"] = r_json["hits"]["hits"][0]["_source"]["isTort"]
goods_info["baseScore3"] = r_json["hits"]["hits"][0]["_source"]["baseScore3"]
goods_info["priceType"] = r_json["hits"]["hits"][0]["_source"]["priceType"]
goods_info["createTime"] = r_json["hits"]["hits"][0]["_source"]["createTime"]
goods_info["swellAmount"] = r_json["hits"]["hits"][0]["_source"]["swellAmount"]
goods_info["saleMark"] = r_json["hits"]["hits"][0]["_source"]["saleMark"]
goods_info["deposit"] = r_json["hits"]["hits"][0]["_source"]["deposit"]
goods_info["goodsModelWord"] = r_json["hits"]["hits"][0]["_source"]["goodsModelWord"]
# appDefaultWh 暂写固定值
goods_info["appDefaultWh"] = "0"
goods_info["goodsWebSku"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSku"]
goods_info["imgExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["imgExtendUrl"]
# activityIds
# goods_info["activityIds"] = r_json["hits"]["hits"][0]["_source"]["activityIds"]
goods_info["displayPrice"] = r_json["hits"]["hits"][0]["_source"]["displayPrice"]
goods_info["goodsWebSpu"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSpu"]
goods_info["isPlatform"] = r_json["hits"]["hits"][0]["_source"]["isPlatform"]
goods_info["webStatus"] = r_json["hits"]["hits"][0]["_source"]["webStatus"]
goods_info["goodsSn"] = r_json["hits"]["hits"][0]["_source"]["goodsSn"]
goods_info["imgUrl"] = r_json["hits"]["hits"][0]["_source"]["imgUrl"]
# tags
# goods_info["tags"] = r_json["hits"]["hits"][0]["_source"]["tags"]
yesterdaySales = r_json["hits"]["hits"][0]["_source"]["yesterdaySales"]
if yesterdaySales > 0:
goods_info["yesterdaySales"] = yesterdaySales
else:
goods_info["yesterdaySales"] = 0
# activities
# activities = r_json["hits"]["hits"][0]["_source"]["activities"]
# if len(activities) > 0:
# list = ["activityId","activityType"]
# sort_key = "activityId"
# activities_list = self.func_sort(activities,list,sort_key)
# goods_info["activities"] = activities_list
# else:
# goods_info["activities"] = r_json["hits"]["hits"][0]["_source"]["activities"]
# 邮件专享价数据
goods_info["mailPrice"] = r_json["hits"]["hits"][0]["_source"]["mailPrice"]
goods_info["appMailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceDiscount"]
goods_info["appMailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceActive"]
goods_info["mailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["mailPriceActive"]
goods_info["mailPriceCipherText"] = r_json["hits"]["hits"][0]["_source"]["mailPriceCipherText"]
goods_info["mailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["mailPriceDiscount"]
goodsId = r_json["hits"]["hits"][0]["_source"]["goodsId"]
print("goods_info:", goods_info)
return goods_info, goodsId
else:
print("无sku数据")
return 0, 0
if __name__ == '__main__':
unittest.main()
以上是 ES全量索引校验-python 的全部内容, 来源链接: utcz.com/z/387811.html