Apache Flink的吞吐量和延迟

我为Apache

Flink写了一个非常简单的Java程序,现在我对测量统计信息感兴趣,例如吞吐量(每秒处理的元组数)和等待时间(程序需要处理每个输入元组的时间)。

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")

.map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");

JobExecutionResult res = env.execute();

我知道Flink公开了一些指标:

https://ci.apache.org/projects/flink/flink-docs-

release-1.2/monitoring/metrics.html

但是我不确定如何使用它们来获取我想要的东西。从链接中我已经读到“仪表”可以用来测量平均吞吐量,但是在定义后,我应该如何使用它?

回答:

我们正在运行在纱线上的生产流作业中运行自定义指标,例如仪表,仪表。

步骤如下:

对pom.xml的附加依赖

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-metrics-dropwizard</artifactId>

<version>${flink.version}</version>

</dependency>

我们正在使用1.2.1版

然后将仪表添加到MyMapper类。

import org.apache.flink.api.common.JobExecutionResult;

import org.apache.flink.api.common.functions.RichMapFunction;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;

import org.apache.flink.metrics.Meter;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Test {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env

.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")

.map(new MyMapper())

.writeAsCsv("/home/LizardKing/Results.csv");

JobExecutionResult res = env.execute();

}

private static class MyMapper extends RichMapFunction<String, Object> {

private transient Meter meter;

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

this.meter = getRuntimeContext()

.getMetricGroup()

.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));

}

@Override

public Object map(String value) throws Exception {

this.meter.markEvent();

return value;

}

}

}

希望这可以帮助 。

以上是 Apache Flink的吞吐量和延迟 的全部内容, 来源链接: utcz.com/qa/429388.html

回到顶部