java - java中带有spark文件流的检查点
问题描述
如果在任何情况下我的火花流应用程序停止/终止,我想使用火花文件流应用程序实现检查点以处理来自 hadoop 的所有未处理文件。我正在关注这个:流式编程指南,但没有找到 JavaStreamingContextFactory。请帮助我该怎么办。
我的代码是
public class StartAppWithCheckPoint {
public static void main(String[] args) {
try {
String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";
String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
jssc.checkpoint(checkpointDirectory);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();
context.close();
sparkSession.close();
} catch(Exception e) {
e.printStackTrace();
}
}
}
解决方案
您必须使用检查点
对于检查点,请使用有状态转换updateStateByKey
或reduceByKeyAndWindow
。spark-examples中有很多示例,以及 git-hub 中的预构建 spark 和 spark 源。对于您的具体情况,请参阅JavaStatefulNetworkWordCount.java;
推荐阅读
- opensuse - 出于审计目的,需要提供通过 zypper patch 命令应用的所有补丁。如何列出最近 3 个月内安装的所有补丁
- c++ - 为什么我不能从函数返回的流的引用中读取?
- c# - 我正在使用 ASP.Net 实现 DocuSign API,但是在每次发送信封并重定向到 DocuSign 站点时需要登录到 DocuSign 站点?
- java - Liferay:执行索引操作时发生错误
- agora.io - 在 Android 中未收到来自 iOS 的 RTM 消息
- xgboost - xgboost.core.XGBoostError:不支持 Unicode
- batch-file - 批处理命令删除文件中的“
- c# - Nuget - “无法加载文件或程序集‘{类名},版本 = xxx,文化 = 中性,PublicKeyToken = null’或其依赖项之一”
- javascript - 如何用 split() 为表拆分字符串
- r - 从 R 中同一图表上的数据集创建多个箱线图