首页 > 解决方案 > Kafka Spark Streaming 缓存

问题描述

我一直以JavaPairInputDStream(使用 twitter 流 api)的形式从一个 kafka 主题获取数据,计划从两个主题获取数据,检查是否使用 tweet_id 重复,如果它不在包中(包用于发送回 kafka),添加它。我也想缓存数据 x 分钟然后处理它。

我可以从 kafka 主题中获取数据并将其输出

stream.foreachRDD(rdd -> {
    System.out.println("--- New RDD with " + rdd.partitions().size()
     + " partitions and " + rdd.count() + " records");
     rdd.foreach(record -> System.out.println(record._2));});

但我无法设法缓存它。尝试rdd.cache()并坚持使用count(). 但它似乎并没有起到什么作用,或者我只是无法理解它。

任何人都可以指导我如何做这些事情?

标签: javaapache-spark

解决方案


好的,所以它似乎不可能像这样缓存 rdd。我创建了另一个 rdd,每当流创建新的 rdd 并以这种方式缓存时,我都在使用 union()。


推荐阅读