首页 > 解决方案 > 同一作业的 Spark 和 Spark 流输出差异

问题描述

我正在为我的项目使用 Spark 和 Spark 流进行一些 POC。所以我所做的就是从 Topic 中读取文件名。从“src/main/sresource”下载文件并执行通常的“WordCount”频率应用程序。

 
@KafkaListener(topics = Constants.ABCWordTopic, groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID) 
public void processTask(@Payload String fileResourcePath) {
        log.info("ABC Receiving task from WordProducer filepath {} at time {}", fileResourcePath,
                LocalDateTime.now());
        // Spark job
        /*
         * JavaRDD wordRDD =
         * sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
         * log.info("ABC Map Contents : {}", wordRDD.countByValue().toString());
         * wordRDD.coalesce(1,
         * true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
         */
        // Spark Streaming job
        JavaPairDStream wordPairStream = streamingContext
                .textFileStream(extractFile(fileResourcePath))
                .flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
                .mapToPair(s -> new Tuple2(s, 1)).reduceByKey((i1, i2) -> i1 + i2);
        wordPairStream.foreachRDD(wordRDD -> {
        //  javaFunctions(wordTempRDD).writerBuilder("vocabulary", "words", mapToRow(String.class))
        //                  .saveToCassandra();
            log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
            wordRDD.coalesce(1, true)
                    .saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
        });
        streamingContext.start();
        try {
            streamingContext.awaitTerminationOrTimeout(-1);
        } catch (InterruptedException e) {
            log.error("Terminated streaming context {}", e);
        }
    }

标签: spring-bootapache-sparkspark-streamingspark-streaming-kafka

解决方案


为其他可能被困在这一点上的人添加他的答案。乍一看它应该可以工作,但是在这里阅读 Spark 的文档就是结论。
“streamingContext.textFileStream(..)” API 不会从任何目录读取静态内容。因此,它无法从目录中读取文件,或者更确切地说无法处理它。它旨在读取流数据,因此必须在监视目录中添加或更新数据。因此,我在网上阅读的内容中的一个快速破解方法是在程序执行开始后(即 StreamingContext.start 已执行)将文件或更新文件到 windows 目录(我使用 windows 10)。
请注意,即使在我尝试了所有这些 hack 之后,我也无法让它执行,但考虑到这不是流式传输的正确用例(从文件夹读取和处理可以通过 Spark 作业轻松实现,这就是我的代码演示)我必须保留它。


推荐阅读