ES全量索引校验-python

python

import unittest

import json

import requests

import ddt

from pymysql import connect

def getMySqlData():

try:

db = connect(host="xx.xx.xx.xx", user="xxx", password="xxx", db="gb_goods", port=3306,

charset="utf8")

# db = connect(host="localhost", user="root", password="root", db="test", port=3306, charset="utf8")

cur = db.cursor()

# sql需替换测试环境的sql

sql = "SELECT gie.good_sn,gie.v_wh_code AS goodsSn FROM goods_info_extend_s_7 gie WHERE gie.goods_status IN (2,4,5) AND gie.platform IN (1,2,4) AND gie.site_code = 'GB' GROUP BY gie.good_sn, gie.v_wh_code;"

# sql = "select goods_sn from kp_goods order by goods_sn"

# 分表轮询

# sql_1 = "select goods_sn from price_%s where ..."

# for table_num in range(10):

# sql_2 = sql_1 % table_num

cur.execute(sql)

result = cur.fetchall()

cur.close()

db.close()

except Exception as e:

print(e)

return result

@ddt.ddt

class MyTest(unittest.TestCase):

@ddt.data(*getMySqlData())

def test_gb(self, data):

goodsSn = data[0]

whCode = data[1]

goodsId = f"{goodsSn}#{whCode}"

print(goodsId)

url_es = "http://10.4.4.80:9200/gearbest20200212201449/sku/_search"

url_ai = "http://10.4.4.80:9200/GB_daily_full/sku/_search"

headers = {"Content-Type": "application/json"}

params = {"query": {"term": {"goodsId": {"value": goodsId}}}}

# params = {"query": {"term": {"goodsSn": {"value": goodsSn}}}}

# 调用内部的方法时,方法前面加 self.

es_goodsInfo, goodsId_es = self.gb_index(url_es, params, headers)

ai_goodsInfo, goodsId_ai = self.gb_index(url_ai, params, headers)

# 判断两个字典数据是否一致

if es_goodsInfo != ai_goodsInfo:

self.writeGoodsSn(goodsId_es)

# print("数据一致")

# 数据写入txt方法

def writeGoodsSn(self, goodsId):

file = r"D:\goodsId_GB.txt"

with open(file, "a+") as f:

f.write(goodsId + "\n")

print("数据写入成功...")

# 内嵌对象排序方法

def func_sort(self, jsonArray, list, sort_key1, sort_key2):

# jsonArray 排序,先按sort_key1排序,再按sort_key2排序

jsonArray.sort(key=lambda x: (x[sort_key1], x[sort_key2]))

# 字典类型转换为 JSON 对象,再将 JSON 对象类型转换为 Python 字典

data = json.loads(json.dumps(jsonArray))

# 将字典以指定key的顺序写入list

newList = []

for info in data:

dict = {}

for i in range(len(list)):

dict[list[i]] = info[list[i]]

newList.append(dict)

return newList

# 请求索引数据

def gb_index(self, url, params, headers):

# data:字典对象 json: json字符串

r = requests.post(url, data=json.dumps(params), headers=headers)

print("响应对象:", r.json())

r_json = r.json() # 返回字典类型,可以通过键名获取响应的值

if len(r_json["hits"]["hits"]) > 0:

# 定义一个字典,存储sku对应的各字段数据

goods_info = {}

# GB索引字段

goods_info["week2SalesVolume"] = r_json["hits"]["hits"][0]["_source"]["week2SalesVolume"]

goods_info["payEndTime"] = r_json["hits"]["hits"][0]["_source"]["payEndTime"]

goods_info["appSwellAmount"] = r_json["hits"]["hits"][0]["_source"]["appSwellAmount"]

goods_info["appPriceType"] = r_json["hits"]["hits"][0]["_source"]["appPriceType"]

goods_info["goodsId"] = r_json["hits"]["hits"][0]["_source"]["goodsId"]

goods_info["youtube"] = r_json["hits"]["hits"][0]["_source"]["youtube"]

# skuAttrs

# skuAttrs = r_json["hits"]["hits"][0]["_source"]["skuAttrs"]

# if len(skuAttrs) > 0:

# list = ["attrValueKey", "attrValue", "attrKey", "attrType", "attrName"]

# skuAttrs_list = self.func_sort(skuAttrs, list, list[3], list[0])

# goods_info["skuAttrs"] = skuAttrs_list

# else:

# goods_info["skuAttrs"] = skuAttrs

goods_info["week2Sales"] = r_json["hits"]["hits"][0]["_source"]["week2Sales"]

goods_info["shopPrice"] = r_json["hits"]["hits"][0]["_source"]["shopPrice"]

# grossMargin 暂写固定值

goods_info["grossMargin"] = "0"

goods_info["baseScore2"] = r_json["hits"]["hits"][0]["_source"]["baseScore2"]

goods_info["subTitle"] = r_json["hits"]["hits"][0]["_source"]["subTitle"]

goods_info["exposureFlag"] = r_json["hits"]["hits"][0]["_source"]["exposureFlag"]

goods_info["originalUrl"] = r_json["hits"]["hits"][0]["_source"]["originalUrl"]

goods_info["discount"] = r_json["hits"]["hits"][0]["_source"]["discount"]

goods_info["whCode"] = r_json["hits"]["hits"][0]["_source"]["whCode"]

goods_info["baseScore5"] = r_json["hits"]["hits"][0]["_source"]["baseScore5"]

goods_info["payStartTime"] = r_json["hits"]["hits"][0]["_source"]["payStartTime"]

goods_info["recommendedLevel"] = r_json["hits"]["hits"][0]["_source"]["recommendedLevel"]

goods_info["thumbExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["thumbExtendUrl"]

goods_info["appStatus"] = r_json["hits"]["hits"][0]["_source"]["appStatus"]

goods_info["passAvgScore"] = r_json["hits"]["hits"][0]["_source"]["passAvgScore"]

goods_info["stockFlag"] = r_json["hits"]["hits"][0]["_source"]["stockFlag"]

goods_info["brandName"] = r_json["hits"]["hits"][0]["_source"]["brandName"]

goods_info["appDisplayPrice"] = r_json["hits"]["hits"][0]["_source"]["appDisplayPrice"]

goods_info["centerWord"] = r_json["hits"]["hits"][0]["_source"]["centerWord"]

goods_info["goodsTitle"] = r_json["hits"]["hits"][0]["_source"]["goodsTitle"]

goods_info["brandCode"] = r_json["hits"]["hits"][0]["_source"]["brandCode"]

goods_info["appDeposit"] = r_json["hits"]["hits"][0]["_source"]["appDeposit"]

goods_info["vWhCode"] = r_json["hits"]["hits"][0]["_source"]["vWhCode"]

goods_info["catId"] = r_json["hits"]["hits"][0]["_source"]["catId"]

goods_info["priceRates"] = r_json["hits"]["hits"][0]["_source"]["priceRates"]

# skuDescAttrs

# skuDescAttrs = r_json["hits"]["hits"][0]["_source"]["skuDescAttrs"]

# if len(skuDescAttrs) > 0:

# list = ["attrValueKey", "attrValue", "attrKey", "attrName"]

# skuDescAttrs_list = self.func_sort(skuDescAttrs, list, list[2], list[0])

# goods_info["skuDescAttrs"] = skuDescAttrs_list

# else:

# goods_info["skuDescAttrs"] = skuDescAttrs

goods_info["totalFavoriteCount"] = r_json["hits"]["hits"][0]["_source"]["totalFavoriteCount"]

goods_info["baseScore4"] = r_json["hits"]["hits"][0]["_source"]["baseScore4"]

goods_info["appPayEndTime"] = r_json["hits"]["hits"][0]["_source"]["appPayEndTime"]

goods_info["urlTitle"] = r_json["hits"]["hits"][0]["_source"]["urlTitle"]

# categories

categories = r_json["hits"]["hits"][0]["_source"]["categories"]

if len(categories) > 0:

list = ["level", "catId", "catName", "isDefault"]

categories_list = self.func_sort(categories, list, list[1], list[3])

goods_info["categories"] = categories_list

else:

goods_info["categories"] = categories

goods_info["firstUpTime"] = r_json["hits"]["hits"][0]["_source"]["firstUpTime"]

# labelFlags

# labelFlags = r_json["hits"]["hits"][0]["_source"]["labelFlags"]

# if len(labelFlags) > 0:

# list = ["platform","type","labelId"]

# sort_key = "labelId"

# labelFlags_list = self.func_sort(labelFlags,list,sort_key)

# goods_info["labelFlags"] = labelFlags_list

# else:

# goods_info["labelFlags"] = labelFlags

goods_info["appPayStartTime"] = r_json["hits"]["hits"][0]["_source"]["appPayStartTime"]

goods_info["expiredTime"] = r_json["hits"]["hits"][0]["_source"]["expiredTime"]

goods_info["exposureSalesVolume"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesVolume"]

goods_info["appDiscount"] = r_json["hits"]["hits"][0]["_source"]["appDiscount"]

goods_info["baseScore1"] = r_json["hits"]["hits"][0]["_source"]["baseScore1"]

# coupons

# coupons = r_json["hits"]["hits"][0]["_source"]["coupons"]

# if len(coupons) > 0:

# list=["code","platforms"]

# sort_key = "code"

# coupons_list = self.func_sort(coupons,list,sort_key)

# goods_info["coupons"] = list

# else:

# goods_info["coupons"] = coupons

goods_info["lang"] = r_json["hits"]["hits"][0]["_source"]["lang"]

goods_info["isCod"] = r_json["hits"]["hits"][0]["_source"]["isCod"]

goods_info["searchWords"] = r_json["hits"]["hits"][0]["_source"]["searchWords"]

# defaultWh 暂写固定值

goods_info["defaultWh"] = "0"

goods_info["mStatus"] = r_json["hits"]["hits"][0]["_source"]["mStatus"]

goods_info["dailyRate"] = r_json["hits"]["hits"][0]["_source"]["dailyRate"]

goods_info["exposureSalesRate"] = r_json["hits"]["hits"][0]["_source"]["exposureSalesRate"]

goods_info["sortOrder"] = r_json["hits"]["hits"][0]["_source"]["sortOrder"]

shopGroups = r_json["hits"]["hits"][0]["_source"]["shopGroups"]

if len(shopGroups) > 0:

list = ["groupId", "level", "groupName", "path"]

shopGroups_list = self.func_sort(shopGroups, list, list[1], list[0])

goods_info["shopGroups"] = shopGroups_list

else:

goods_info["shopGroups"] = shopGroups

goods_info["passTotalNum"] = r_json["hits"]["hits"][0]["_source"]["passTotalNum"]

goods_info["shopCode"] = r_json["hits"]["hits"][0]["_source"]["shopCode"]

goods_info["appExpiredTime"] = r_json["hits"]["hits"][0]["_source"]["appExpiredTime"]

goods_info["isTort"] = r_json["hits"]["hits"][0]["_source"]["isTort"]

goods_info["baseScore3"] = r_json["hits"]["hits"][0]["_source"]["baseScore3"]

goods_info["priceType"] = r_json["hits"]["hits"][0]["_source"]["priceType"]

goods_info["createTime"] = r_json["hits"]["hits"][0]["_source"]["createTime"]

goods_info["swellAmount"] = r_json["hits"]["hits"][0]["_source"]["swellAmount"]

goods_info["saleMark"] = r_json["hits"]["hits"][0]["_source"]["saleMark"]

goods_info["deposit"] = r_json["hits"]["hits"][0]["_source"]["deposit"]

goods_info["goodsModelWord"] = r_json["hits"]["hits"][0]["_source"]["goodsModelWord"]

# appDefaultWh 暂写固定值

goods_info["appDefaultWh"] = "0"

goods_info["goodsWebSku"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSku"]

goods_info["imgExtendUrl"] = r_json["hits"]["hits"][0]["_source"]["imgExtendUrl"]

# activityIds

# goods_info["activityIds"] = r_json["hits"]["hits"][0]["_source"]["activityIds"]

goods_info["displayPrice"] = r_json["hits"]["hits"][0]["_source"]["displayPrice"]

goods_info["goodsWebSpu"] = r_json["hits"]["hits"][0]["_source"]["goodsWebSpu"]

goods_info["isPlatform"] = r_json["hits"]["hits"][0]["_source"]["isPlatform"]

goods_info["webStatus"] = r_json["hits"]["hits"][0]["_source"]["webStatus"]

goods_info["goodsSn"] = r_json["hits"]["hits"][0]["_source"]["goodsSn"]

goods_info["imgUrl"] = r_json["hits"]["hits"][0]["_source"]["imgUrl"]

# tags

# goods_info["tags"] = r_json["hits"]["hits"][0]["_source"]["tags"]

yesterdaySales = r_json["hits"]["hits"][0]["_source"]["yesterdaySales"]

if yesterdaySales > 0:

goods_info["yesterdaySales"] = yesterdaySales

else:

goods_info["yesterdaySales"] = 0

# activities

# activities = r_json["hits"]["hits"][0]["_source"]["activities"]

# if len(activities) > 0:

# list = ["activityId","activityType"]

# sort_key = "activityId"

# activities_list = self.func_sort(activities,list,sort_key)

# goods_info["activities"] = activities_list

# else:

# goods_info["activities"] = r_json["hits"]["hits"][0]["_source"]["activities"]

# 邮件专享价数据

goods_info["mailPrice"] = r_json["hits"]["hits"][0]["_source"]["mailPrice"]

goods_info["appMailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceDiscount"]

goods_info["appMailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["appMailPriceActive"]

goods_info["mailPriceActive"] = r_json["hits"]["hits"][0]["_source"]["mailPriceActive"]

goods_info["mailPriceCipherText"] = r_json["hits"]["hits"][0]["_source"]["mailPriceCipherText"]

goods_info["mailPriceDiscount"] = r_json["hits"]["hits"][0]["_source"]["mailPriceDiscount"]

goodsId = r_json["hits"]["hits"][0]["_source"]["goodsId"]

print("goods_info:", goods_info)

return goods_info, goodsId

else:

print("无sku数据")

return 0, 0

if __name__ == '__main__':

unittest.main()

以上是 ES全量索引校验-python 的全部内容, 来源链接: utcz.com/z/387811.html

回到顶部