ElasticSearch在另一个聚合结果上使用聚合

有一个对话列表,每个对话都有一个消息列表。每个消息都有一个不同的字段和一个action字段。我们需要考虑的是,在对话的第一条消息中使用了动作A,在几条消息中使用了动作A.1之后,过了一会儿A.1.1,依此类推(有一个聊天机器人意图列表)。

将对话的消息动作分组将类似于: A > A > A > A.1 > A > A.1 > A.1.1 ...

我需要使用ElasticSearch创建一个报告,该报告将返回actions group每次会话的;接下来,我需要对类似的东西进行分组并actions

groups添加一个计数;最终将导致Map<actionsGroup, count>as 'A > A.1 > A > A.1 > A.1.1',

3

构建actions groupI需要消除每组重复项;而不是A > A > A > A.1 > A > A.1 > A.1.1我需要拥有A >

A.1 > A > A.1 > A.1.1

{

"collapse":{

"field":"context.conversationId",

"inner_hits":{

"name":"logs",

"size": 10000,

"sort":[

{

"@timestamp":"asc"

}

]

}

},

"aggs":{

},

}

  1. 我需要将崩溃的结果映射到单个结果中A > A.1 > A > A.1 > A.1.1。我已经看到,在这种情况下,或者aggr有可能在结果中使用脚本,并且有可能创建一个我需要aggr执行的动作列表,但它会对所有消息进行操作,而不仅仅是对我组合的消息进行操作陷入崩溃。是否可以使用aggr内部塌陷或类似的解决方案?
  2. 我需要对A > A.1 > A > A.1 > A.1.1所有折叠的结果values()进行分组,添加一个计数并得出Map<actionsGroup, count>

  1. conversationId使用字段将对话消息分组aggr(我不知道该怎么做)
  2. 使用脚本来迭代所有值并actions group为每个对话创建。(不确定是否可行)
  3. aggr在所有值上使用另一个,并将重复项分组,返回Map<actionsGroup, count>

添加一些其他详细信息

对应:

"mappings":{

"properties":{

"@timestamp":{

"type":"date",

"format": "epoch_millis"

}

"context":{

"properties":{

"action":{

"type":"keyword"

},

"conversationId":{

"type":"keyword"

}

}

}

}

}

对话的样本文件:

Conversation 1.

{

"@timestamp": 1579632745000,

"context": {

"action": "A",

"conversationId": "conv_id1",

}

},

{

"@timestamp": 1579632745001,

"context": {

"action": "A.1",

"conversationId": "conv_id1",

}

},

{

"@timestamp": 1579632745002,

"context": {

"action": "A.1.1",

"conversationId": "conv_id1",

}

}

Conversation 2.

{

"@timestamp": 1579632745000,

"context": {

"action": "A",

"conversationId": "conv_id2",

}

},

{

"@timestamp": 1579632745001,

"context": {

"action": "A.1",

"conversationId": "conv_id2",

}

},

{

"@timestamp": 1579632745002,

"context": {

"action": "A.1.1",

"conversationId": "conv_id2",

}

}

Conversation 3.

{

"@timestamp": 1579632745000,

"context": {

"action": "B",

"conversationId": "conv_id3",

}

},

{

"@timestamp": 1579632745001,

"context": {

"action": "B.1",

"conversationId": "conv_id3",

}

}

预期结果:

{

"A -> A.1 -> A.1.1": 2,

"B -> B.1": 1

}

Something similar, having this or any other format.

回答:

我用scripted_metric弹性的解决了。而且,的index状态已从初始状态更改。

剧本:

{

"size": 0,

"aggs": {

"intentPathsCountAgg": {

"scripted_metric": {

"init_script": "state.messagesList = new ArrayList();",

"map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",

"combine_script": "return state",

"reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "

}

}

}

}

格式化脚本(为了提高可读性-使用.ts):

scripted_metric: {

init_script: 'state.messagesList = new ArrayList();',

map_script: `

long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;

Map currentMessage = [

'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],

'time': currentMessageTime,

'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value

];

state.messagesList.add(currentMessage);`,

combine_script: 'return state',

reduce_script: `

List messages = new ArrayList();

Map conversationsMap = new HashMap();

Map intentsMap = new HashMap();

boolean[] ifElseWorkaround = new boolean[1];

for (state in states) {

messages.addAll(state.messagesList);

}

messages.stream().forEach(message -> {

Map existingMessage = conversationsMap.get(message.conversationId);

if(existingMessage == null || message.time > existingMessage.time) {

conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);

} else {

ifElseWorkaround[0] = true;

}

});

conversationsMap.entrySet().forEach(conversation -> {

if (intentsMap.containsKey(conversation.getValue().intentsPath)) {

long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;

intentsMap.put(conversation.getValue().intentsPath, intentsCount);

} else {

intentsMap.put(conversation.getValue().intentsPath, 1L);

}

});

return intentsMap.entrySet().stream().map(intentPath -> [

'path': intentPath.getKey().toString(),

'count': intentPath.getValue()

]).collect(Collectors.toSet())`

答案:

{

"took": 2,

"timed_out": false,

"_shards": {

"total": 5,

"successful": 5,

"skipped": 0,

"failed": 0

},

"hits": {

"total": {

"value": 11,

"relation": "eq"

},

"max_score": null,

"hits": []

},

"aggregations": {

"intentPathsCountAgg": {

"value": [

{

"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2

},

{

"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3 -> smallTalk.greet4": 1

},

{

"smallTalk.greet -> smallTalk.greet2": 1

}

]

}

}

}

以上是 ElasticSearch在另一个聚合结果上使用聚合 的全部内容, 来源链接: utcz.com/qa/422995.html

回到顶部