spring-boot - 同一作业的 Spark 和 Spark 流输出差异
问题描述
我正在为我的项目使用 Spark 和 Spark 流进行一些 POC。所以我所做的就是从 Topic 中读取文件名。从“src/main/sresource”下载文件并执行通常的“WordCount”频率应用程序。
@KafkaListener(topics = Constants.ABCWordTopic, groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID)
public void processTask(@Payload String fileResourcePath) {
log.info("ABC Receiving task from WordProducer filepath {} at time {}", fileResourcePath,
LocalDateTime.now());
// Spark job
/*
* JavaRDD wordRDD =
* sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
* log.info("ABC Map Contents : {}", wordRDD.countByValue().toString());
* wordRDD.coalesce(1,
* true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
*/
// Spark Streaming job
JavaPairDStream wordPairStream = streamingContext
.textFileStream(extractFile(fileResourcePath))
.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
.mapToPair(s -> new Tuple2(s, 1)).reduceByKey((i1, i2) -> i1 + i2);
wordPairStream.foreachRDD(wordRDD -> {
// javaFunctions(wordTempRDD).writerBuilder("vocabulary", "words", mapToRow(String.class))
// .saveToCassandra();
log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
wordRDD.coalesce(1, true)
.saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
});
streamingContext.start();
try {
streamingContext.awaitTerminationOrTimeout(-1);
} catch (InterruptedException e) {
log.error("Terminated streaming context {}", e);
}
}
- 在上面的代码中,我正在收听 Kafka Topic("ABCtopic") 并对其进行处理。“Spark 作业”注释代码工作得非常好。它计算单词并按预期给出结果,但是“火花流作业”代码的行为与预期不同,它输出 null。
- 该行
log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
给出“{}”作为输出。写入文件是空的。从鲜为人知的“Spark 流”中了解 Spark 流是一个额外的库,用于实时连续处理来自任何来源(如文件、主题等)的数据。 - 上面代码中缺少的火花流在突出显示的日志行以及正在写入磁盘的输出数据文件中输出“null”,而 Spark 作业完美地完成了同样的工作。
解决方案
为其他可能被困在这一点上的人添加他的答案。乍一看它应该可以工作,但是在这里阅读 Spark 的文档就是结论。
“streamingContext.textFileStream(..)” API 不会从任何目录读取静态内容。因此,它无法从目录中读取文件,或者更确切地说无法处理它。它旨在读取流数据,因此必须在监视目录中添加或更新数据。因此,我在网上阅读的内容中的一个快速破解方法是在程序执行开始后(即 StreamingContext.start 已执行)将文件或更新文件到 windows 目录(我使用 windows 10)。
请注意,即使在我尝试了所有这些 hack 之后,我也无法让它执行,但考虑到这不是流式传输的正确用例(从文件夹读取和处理可以通过 Spark 作业轻松实现,这就是我的代码演示)我必须保留它。
推荐阅读
- python - 如何基于 CPLEX 上的其他两个约束来实施新约束?
- r - 在大型数据集上运行 R 脚本时如何防止计算机崩溃
- javascript - 使用 javascript 平滑滚动到某些内容的最佳方法是什么?
- c# - 如何在 c# 中使用实体框架 6.1 在遗留模式下添加存储过程?
- python - 如何在 pycharm 上创建多个 django 项目而不会与其他端口冲突
- google-apps-script - 在谷歌应用脚本中迭代过滤数据透视表
- angularjs - 如何使用 api 前缀在 NGINX 中配置 Laravel API 和 Angular 前端
- python - Python 错误 ModuleNotFoundError:没有名为“pypresence”的模块
- c# - 将多个标签添加到 div (C#)
- excel - 在 Excel 中根据 2 个分隔符拆分单元格