python批量插入数据到es和读取es数据

python

一、插入数据

1、首先准备类似如下数据

{"_type": "type1", "_id": 1, "_index": "test", "_source": {"JOBNAME0": "guba_eastmoney_com_265162", "JOBNAME1": "guba_eastmoney_com_265162"}}

2、调用es相关模块插入数据到es中

#!/usr/bin/python

import threading

import queue

import json

import time

from elasticsearch import Elasticsearch

from elasticsearch import helpers

import os

import sys

#

# host_list = [

# {"host":"10.58.7.190","port":9200},

# {"host":"10.58.55.191","port":9200},

# {"host":"10.58.55.192","port":9200},

# ]

#

host_list = [

{"host":"10.87.7.190","port":9200},

]

# create a es clint obj

client = Elasticsearch(host_list)

with open(os.path.join(os.path.dirname(os.path.abspath(__file__)),"insert.json"),"r") as f:

for line in f:

actions = []

actions.append(json.loads(line))

try:

for k, v in helpers.parallel_bulk(client=client, thread_count=1, actions=actions):

# 这里的actions是插入es的数据,这个格式必须是列表的格式,列表的每个元素又必须是字典

pass

except Exception as e:

sys.stderr(e)

3、查看es索引中的文档数

[root@test1 cdrom]# curl -XGET 'http://10.87.7.190:9200/_cat/indices?v&pretty'

health status index uuid pri rep docs.count docs.deleted store.size pri.store.size

yellow open test r91GhsFVT7iF6M3iAuNEKg 2 5 19362 0 1.3mb 499.7kb

 

二、读取es的数据

#!/usr/bin/python

from kafka import KafkaProducer

import threading

import json

import time

from elasticsearch import Elasticsearch

from elasticsearch import helpers

import os

import sys

import argparse

import random

def get_data_from_es():

host_list = [

{"host": "10.58.55.1", "port": 9200},

{"host": "10.58.55.2", "port": 9200},

{"host": "10.58.55.7", "port": 9200},

# {"host": "10.58.55.201", "port": 9200},

# {"host": "10.58.55.202", "port": 9200},

# {"host": "10.58.55.203", "port": 9200},

]

es = Elasticsearch(host_list)

err_data_num = 0

correct_data_num = 0

size = 100

query = es.search(index='full_sight', scroll='1m', size=size)

for m in query['hits']['hits']:

# print(m)

d_id = m["_id"]

if "LASTOPER" in m["_source"].keys():

# if "UPDATE_TEST" in m["_source"].keys():

if m["_source"]["LASTOPER"] == "" + str(d_id):

correct_data_num += 1

else:

err_data_num += 1

else:

err_data_num += 1

# print("id为{d_id}数据未更新成功,错误的条数为{num}".format(d_id=d_id, num=err_data_num))

results = query['hits']['hits']

total = query['hits']['total']

scroll_id = query['_scroll_id']

page = divmod(total, size)

if page[1] == 0:

page = page[0]

else:

page = page[0] + 1

for i in range(0, page):

try:

query_scroll = es.scroll(scroll_id=scroll_id, scroll='1m', )['hits']['hits']

except Exception as e:

continue

else:

for m in query_scroll:

d_id = m.get("_id",None)

if "LASTOPER" in m["_source"].keys():

if m["_source"]["LASTOPER"] == "test" + str(d_id):

correct_data_num += 1

else:

err_data_num += 1

else:

err_data_num += 1

return err_data_num,correct_data_num

if __name__ == '__main__':

while True:

error,correct = get_data_from_es()

print("未更新的数据的条数为:{num}".format(num = error))

print("已更新的数据的条数为:{num}".format(num = correct))

print("=" * 200)

if int(error) == 0:

break

else:

continue

以上是 python批量插入数据到es和读取es数据 的全部内容, 来源链接: utcz.com/z/389435.html

回到顶部