首页 > 解决方案 > 如何仅在收到数据时制作 Spark Streaming 保存文件?

问题描述

我正在开发一个非常简单的 spark 流应用程序,它从 s3 位置获取 inputTextFiles 并进行一些转换,然后再次将结果输出到某个 s3 位置。但问题是

JavaPairDStream<String, String> pairs = lines.mapToPair(
            line -> new Tuple2<String, String>(line, fetcher.fetchAndSaveUrl(process(line) ))
            );
pairs.dstream().saveAsTextFiles(s3Path + "/output", "output");

如果在特定的批处理间隔中未使用名为 _SUCCESS 的空文件进行处理,则保存文件事件。如何确保仅在读取某些输入时才保存文件?

标签: apache-sparkspark-streaming

解决方案


您需要过滤我们的空 RDD(即检查 rdd.count() 或 rdd,isEmpty(),无论您发现什么更合适)。为此,您需要以使用 foreachRDD() 的方式重写对 DStream 的操作。


推荐阅读