为嵌套的Elasticsearch结构格式化Postgres JSON输出

我已经意识到使用SQL数据库(Postgres)是将我的关系数据(24个CSV文件中的40+

GB)移植到具有嵌套结构的

Elasticsearch中的最有效方法之一。但是我仍然对Postgres的JSON输出的格式存在一些问题:1)不需要的换行符(\

n),2)不需要的标题行和3)不需要的日期格式。这是一个基本示例进行演示:

file1

id,age,gender,wave

1,49,M,1

2,72,F,0

file2

id,time,event1

1,2095-04-20 12:28:55,V39

1,2095-04-21 2:27:45,T21

2,2094-05-17 18:17:25,V39

file3

id,time,event2

1,2095-04-22 3:48:53,P90

2,2094-05-18 1:28:23,RT4

2,2094-05-18 4:23:53,W3

将这些CSV添加到名为Forum的架构并运行以下SQL代码后:

with f_1 as(

SELECT id, json_agg(file1.*) AS tag

FROM forum.file1

GROUP BY id

), f_2 as (

SELECT id, json_agg(file2.*) AS tag

FROM forum.file2

GROUP BY id

), f_3 as (

SELECT id, json_agg(file3.*) AS tag

FROM forum.file3

GROUP BY id

)

SELECT ('{"id":' || a.id), ('"file1":' || a.tag), ('"file2":' || b.tag), ('"file3":' || c.tag ||'}')

FROM f_1 AS a, f_2 AS b, f_3 AS c

WHERE b.id = a.id AND c.id = a.id;

我得到以下输出(pgAdminIII-导出到文件-没有报价):

?column?,?column?,?column?,?column?

{"id":1,"file1":[{"id":1,"age":49,"gender":"M","wave":1}],"file2":[{"id":1,"time":"2095-04-20T12:28:55","event1":"V39"},

{"id":1,"time":"2095-04-21T02:27:45","event1":"T21"}],"file3":[{"id":1,"time":"2095-04-22T03:48:53","event2":"P90"}]}

{"id":2,"file1":[{"id":2,"age":72,"gender":"F","wave":0}],"file2":[{"id":2,"time":"2094-05-17T18:17:25","event1":"V39"}],"file3":[{"id":2,"time":"2094-05-18T01:28:23","event2":"RT4"},

{"id":2,"time":"2094-05-18T04:23:53","event2":"W3"}]}

您可以看到,对于给定的ID,多行数据。对于给定的ID(即没有\

n),我需要所有数据都在一行上。我没有花很多时间但想更改的其他几个小问题是不需要第一行,我想摆脱它?column?,?column?,?column?,?column?而不必在处理完成后打开文件。理想情况下,我也希望T日期输出中没有任何内容。我应该能够T在Elasticsearch中容纳它,但是到目前为止还没有被它接受。这是我希望从Postgres获得的输出,该输出可用于Elasticsearch的输入(使用stream2es和嵌套的映射结构):

{"id":1,"file1":[{"id":1,"age":49,"gender":"M","wave":1}],"file2":[{"id":1,"time":"2095-04-20 12:28:55","event1":"V39"},{"id":1,"time":"2095-04-21 02:27:45","event1":"T21"}],"file3":[{"id":1,"time":"2095-04-22 03:48:53","event2":"P90"}]}

{"id":2,"file1":[{"id":2,"age":72,"gender":"F","wave":0}],"file2":[{"id":2,"time":"2094-05-17 18:17:25","event1":"V39"}],"file3":[{"id":2,"time":"2094-05-18 01:28:23","event2":"RT4"},{"id":2,"time":"2094-05-18 04:23:53","event2":"W3"}]}

添加to_json确实可以修复不需要的换行符,但它可以\"代替"stream2es解析器所不喜欢的:

SELECT to_json('{"id":' || a.id), to_json('"file1":' || a.tag::json),

to_json('"file2":' || b.tag::json), to_json('"file3":' || c.tag::json ||'}')

"{\"id\":1","\"file1\":[{\"id\":1,\"age\":49,\"gender\":\"M\",\"wave\":1}]"

es2stream异常: Exception in thread "stream dispatcher"

java.lang.ClassCastException: java.lang.String cannot be cast to

clojure.lang.IPersistentMap

回答:

全选一列(而不是四列)。函数format()将帮助您更清楚地记录下来。用

regexp_replace (str, '(\d\d\d\d-\d\d-\d\d)T', '\1 ', 'g')

更正日期格式,并

replace (str, e' \n ', '')

跳过换行符。

使用COPY命令可以简化问题:

COPY (

with f_1 as(

SELECT id, json_agg(file1.*) AS tag

FROM forum.file1

GROUP BY id

), f_2 as (

SELECT id, json_agg(file2.*) AS tag

FROM forum.file2

GROUP BY id

), f_3 as (

SELECT id, json_agg(file3.*) AS tag

FROM forum.file3

GROUP BY id

)

SELECT

replace(

regexp_replace(

format('{"id":%s,"file1":%s,"file2":%s,"file3":%s}',

a.id, a.tag, b.tag, c.tag),

'(\d\d\d\d-\d\d-\d\d)T', '\1 ', 'g'),

e' \n ', '')

FROM f_1 AS a, f_2 AS b, f_3 AS c

WHERE b.id = a.id AND c.id = a.id

) TO '/full/path/to/your/file';


要在每行数据前添加命令行,可以使用带有返回两行功能的技巧。格式化的某些部分可以偶尔移至该功能。

create or replace function format_data_line(command text, data_str text)

returns setof text language plpgsql as $$

begin

return next command;

return next

replace(

regexp_replace(data_str,

'(\d\d\d\d-\d\d-\d\d)T', '\1 ', 'g'),

e' \n ', '');

end $$;

COPY (

with f_1 as(

SELECT id, json_agg(file1.*) AS tag

FROM forum.file1

GROUP BY id

), f_2 as (

SELECT id, json_agg(file2.*) AS tag

FROM forum.file2

GROUP BY id

), f_3 as (

SELECT id, json_agg(file3.*) AS tag

FROM forum.file3

GROUP BY id

)

SELECT

format_data_line(

'my command',

format('{"id":%s,"file1":%s,"file2":%s,"file3":%s}',

a.id, a.tag, b.tag, c.tag))

FROM f_1 AS a, f_2 AS b, f_3 AS c

WHERE b.id = a.id AND c.id = a.id

) TO '/full/path/to/your/file';

以上是 为嵌套的Elasticsearch结构格式化Postgres JSON输出 的全部内容, 来源链接: utcz.com/qa/424770.html

回到顶部