java - Kafka Spark Streaming 缓存
问题描述
我一直以JavaPairInputDStream
(使用 twitter 流 api)的形式从一个 kafka 主题获取数据,计划从两个主题获取数据,检查是否使用 tweet_id 重复,如果它不在包中(包用于发送回 kafka),添加它。我也想缓存数据 x 分钟然后处理它。
我可以从 kafka 主题中获取数据并将其输出
stream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));});
但我无法设法缓存它。尝试rdd.cache()
并坚持使用count()
. 但它似乎并没有起到什么作用,或者我只是无法理解它。
任何人都可以指导我如何做这些事情?
解决方案
好的,所以它似乎不可能像这样缓存 rdd。我创建了另一个 rdd,每当流创建新的 rdd 并以这种方式缓存时,我都在使用 union()。
推荐阅读
- loops - 有人可以解释一下这段代码在即将到来的测试中是如何工作的吗
- python - 在 matplotlib 时间序列“意大利面条”图中使线条变粗
- r - 使用 ggplot 为绘图中的线条子集着色
- javascript - 忽略聚合管道中 $match 阶段的字段
- python - 使用字符串变量名称作为 mongodb 集合名称
- java - 检查2个链接列表是否在java中包含相同值的问题
- office-addins - 如何向需要使用 OAuth 2.0 进行身份验证的 AppSource 提交加载项?
- java - Java - 如何在特定条件下获得最佳元素列表
- vba - 如何修复“438 - 对象不支持属性或方法错误”
- c - Lex - 添加新行后未检测到令牌