Spark Streaming检查点失败后读取

我试图实现包含容错的Kafka应用程序的Spark Streaming。当我重新启动应用程序时,它会读取重新启动前已读取的消息,并且我的计算出错了。请帮我解决这个问题。Spark Streaming检查点失败后读取

这是用Java编写的代码。

public static JavaStreamingContext createContextFunc() { 

SummaryOfTransactionsWithCheckpoints app = new SummaryOfTransactionsWithCheckpoints();

ApplicationConf conf = new ApplicationConf();

String checkpointDir = conf.getCheckpointDirectory();

JavaStreamingContext streamingContext = app.getStreamingContext(checkpointDir);

JavaDStream<String> kafkaInputStream = app.getKafkaInputStream(streamingContext);

return streamingContext;

}

public static void main(String[] args) throws InterruptedException {

String checkpointDir = conf.getCheckpointDirectory();

Function0<JavaStreamingContext> createContextFunc =() -> createContextFunc();

JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(checkpointDir, createContextFunc);

streamingContext.start();

streamingContext.awaitTermination();

}

public JavaStreamingContext getStreamingContext(String checkpointDir) {

ApplicationConf conf = new ApplicationConf();

String appName = conf.getAppName();

String master = conf.getMaster();

int duration = conf.getDuration();

SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);

sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");

JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(duration));

streamingContext.checkpoint(checkpointDir);

return streamingContext;

}

public SparkSession getSession() {

ApplicationConf conf = new ApplicationConf();

String appName = conf.getAppName();

String hiveConf = conf.getHiveConf();

String thriftConf = conf.getThriftConf();

int shufflePartitions = conf.getShuffle();

SparkSession spark = SparkSession

.builder()

.appName(appName)

.config("spark.sql.warehouse.dir", hiveConf)

.config("hive.metastore.uris", thriftConf)

.enableHiveSupport()

.getOrCreate();

spark.conf().set("spark.sql.shuffle.partitions", shufflePartitions);

return spark;

}

public JavaDStream<String> getKafkaInputStream(JavaStreamingContext streamingContext) {

KafkaConfig kafkaConfig = new KafkaConfig();

Set<String> topicsSet = kafkaConfig.getTopicSet();

Map<String, Object> kafkaParams = kafkaConfig.getKafkaParams();

// Create direct kafka stream with brokers and topics

JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(

streamingContext,

LocationStrategies.PreferConsistent(),

ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

JavaDStream<String> logdata = messages.map(ConsumerRecord::value);

return logdata;

}

这里是github项目的链接。 https://github.com/ThisaST/Spark-Fault-Tolerance

回答:

我已经通过在代码中添加以下配置来解决此问题。

sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true") 

以上是 Spark Streaming检查点失败后读取 的全部内容, 来源链接: utcz.com/qa/263837.html

回到顶部