elasticsearch python parallel_bulk无法插入数据

我曾经parallel_bulkpython中向Elasticsearch插入数据,但parallel_bulk无法插入数据。我的代码:

class CreateIndex(object):

def _gen_data(self, index, doc_type, chunk_size):

sql = """select * from tem_search_engine_1 where rownum <= 10000"""

self.cursor.execute(sql)

col_name_list = [col[0].lower() for col in self.cursor.description]

col_name_len = len(col_name_list)

actions = []

start = time.time()

for row in self.cursor:

source = {}

tbl_id = ""

for i in range(col_name_len):

source.update({col_name_list[i]: str(row[i])})

if col_name_list[i] == "tbl_id":

tbl_id = row[i]

action = {

"_index": index,

"_type": doc_type,

"_id": tbl_id,

"_source": source

}

actions.append(action)

if len(actions) == chunk_size:

print("actions time:", time.time()-start)

yield actions

actions = []

print("for time:", time.time()-start)

yield actions

def bulk_data(self, index, doc_type, chunk_size=1000, is_parallel=True, threads_counts=4):

t1 = time.time()

gen_action = self._gen_data(index, doc_type, chunk_size)

if is_parallel is None or is_parallel == True:

for success, info in helpers.parallel_bulk(client=self.es, actions=gen_action, thread_count=threads_counts):

if not success:

print("Insert failed: ", info)

if __name__ == "__main__":

createindex = CreateIndex()

createindex.create_index(index="se", doc_type="se_doc")

createindex.bulk_data(index="se", doc_type="se_doc")

当我使用bulk_data,但无法插入任何数据时,该如何处理呢?错误是:

Traceback (most recent call last):

File "F:/programs/ElasticSearch/CreateIndex.py", line 287, in <module>

createindex.bulk_data(index="se", doc_type="se_doc")

File "F:/programs/ElasticSearch/CreateIndex.py", line 179, in bulk_data

thread_count=threads_counts, chunk_size=chunk_size):

File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 306, in parallel_bulk

_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)

File "D:\anacond\lib\multiprocessing\pool.py", line 735, in next

raise value

File "D:\anacond\lib\multiprocessing\pool.py", line 119, in worker

result = (True, func(*args, **kwds))

File "D:\anacond\lib\multiprocessing\pool.py", line 138, in _helper_reraises_exception

raise ex

File "D:\anacond\lib\multiprocessing\pool.py", line 290, in _guarded_task_generation

for i, x in enumerate(iterable):

File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 58, in _chunk_actions

for action, data in actions:

File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 37, in expand_action

op_type = data.pop('_op_type', 'index')

TypeError: pop() takes at most 1 argument (2 given)

回答:

使用parallel_bulkmethod,您可以传递一个字典列表或一个生成器,生成一个字典。在这里解释。一个generator在python用于未在RAM中加载变量,但如果你要通过你的elem前一个清单-

在字典action中的列表actions,它没有更多的意义,因为建立一个列表,你应该在内存中加载所有里面的元素。在您的情况下,您要传递的生成器不会产生dict符action--但会生成操作列表-

actions

因此,或者您的函数_gen_data返回一个列表,实际上是一个生成器的列表:

def _gen_data(self, index, doc_type, chunk_size):

sql = """select * from tem_search_engine_1 where rownum <= 10000"""

self.cursor.execute(sql)

col_name_list = [col[0].lower() for col in self.cursor.description]

col_name_len = len(col_name_list)

actions = []

start = time.time()

for row in self.cursor:

source = {}

tbl_id = ""

for i in range(col_name_len):

source.update({col_name_list[i]: str(row[i])})

if col_name_list[i] == "tbl_id":

tbl_id = row[i]

action = {

"_index": index,

"_type": doc_type,

"_id": tbl_id,

"_source": source

}

actions.append(action)

return actions

或者,您不创建actions列表,而得出action字典:

def _gen_data(self, index, doc_type, chunk_size):

sql = """select * from tem_search_engine_1 where rownum <= 10000"""

self.cursor.execute(sql)

col_name_list = [col[0].lower() for col in self.cursor.description]

col_name_len = len(col_name_list)

start = time.time()

for row in self.cursor:

source = {}

tbl_id = ""

for i in range(col_name_len):

source.update({col_name_list[i]: str(row[i])})

if col_name_list[i] == "tbl_id":

tbl_id = row[i]

yield {

"_index": index,

"_type": doc_type,

"_id": tbl_id,

"_source": source

}

以上是 elasticsearch python parallel_bulk无法插入数据 的全部内容, 来源链接: utcz.com/qa/414350.html

回到顶部