Apache Flink与Elasticsearch的集成

我正在尝试将Flink与Elasticsearch 2.1.1集成,我正在使用Maven依赖项

     <dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-elasticsearch2_2.10</artifactId>

<version>1.1-SNAPSHOT</version>

</dependency>

这是我从Kafka队列中读取事件的Java代码(工作正常),但是无论如何,如果我更改了任何相关设置,则事件不会在Elasticsearch中发布,也没有错误,在以下代码中到ElasticSearch的端口,主机名,集群名称或索引名称,然后立即看到错误,但当前它不显示任何错误,也没有在ElasticSearch中创建任何新文档

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// parse user parameters

ParameterTool parameterTool = ParameterTool.fromArgs(args);

DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

messageStream.print();

Map<String, String> config = new HashMap<>();

config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");

config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");

config.put("cluster.name", "FlinkDemo");

List<InetSocketAddress> transports = new ArrayList<>();

transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));

messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));

env.execute();

}

private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {

private static final long serialVersionUID = 1L;

public IndexRequest createIndexRequest(String element) {

Map<String, Object> json = new HashMap<>();

json.put("data", element);

return Requests.indexRequest()

.index("flink").id("hash"+element).source(json);

}

@Override

public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {

indexer.add(createIndexRequest(element));

}

}

回答:

我确实确实在本地计算机上运行它并进行调试,但是,我缺少的唯一一件事就是正确配置日志记录,因为大多数弹性问题在“

log.warn”语句中进行了描述。问题是elasticsearch-2.2.1客户端API中“

BulkRequestHandler.java”内部的异常,该异常引发错误-“

org.elasticsearch.action.ActionRequestValidationException:验证失败:1:类型丢失;”。因为我创建了索引但没有创建一个类型,但我发现它很奇怪,因为它应该主要与索引有关,并默认创建该类型。

以上是 Apache Flink与Elasticsearch的集成 的全部内容, 来源链接: utcz.com/qa/430037.html

回到顶部