scala - 当第二个流的数据可能尚不可用时,如何比较 Spark 中的两个 RDD?
问题描述
我正在开发一个 Spark 应用程序,该应用程序从两个不同的主题topic_a
和topic_b
一个 Kafka 服务器流式传输数据。我想使用两个流并检查来自两个主题的数据是否相等。
val streamingContext = new StreamingContext(sparkContext, Seconds(batchDuration))
val eventStream = KafkaUtils.createDirectStream[String, String](streamingContext, PreferConsistent, Subscribe[String, String](topics, consumerConfig))
def start(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = {
// ...
def cmp(rddA: RDD[ConsumerRecord[String, String]], rddB: RDD[ConsumerRecord[String, String]]): Unit = {
// Do compare...
// but rddA or rddB may be empty! :-(
}
val rddTopicA = rdd.filter(_.topic == 'topic_a')
val rddTopicB = rdd.filter(_.topic == 'topic_b')
cmp(rddTopicA, rddTopicB)
}
eventStream.foreachRDD((x, y) => start((x, y)))
streamingContext.start()
streamingContext.awaitTermination()
问题是,在比较 中的两个 RDD 时cmp
,其中一个 RDD 可能为空,因为数据流在 Kafka 中可能尚不可用。是否有可能以某种方式等到两个 RDD 具有相同数量的行然后开始比较?还是先把有数据的RDD转成DataSet,然后临时存起来,方便以后比较?
解决方案
推荐阅读
- python - 删除/编辑数据框中条目不符合条件的行
- confidence-interval - 结合置信区间和优势比 + 在 Gtsummary 中为 P 值添加开始
- php - 通过 AJAX 函数将数据传递给 PHP 脚本给我错误?
- excel - 如何插入参数
- windows - Windows 命令提示符,搜索点
- python - TensorFlow:“UnimplementedError:不支持将字符串转换为浮点数”
- javascript - Javascript新手,有人可以帮我解决这个子字符串问题吗?
- r - R - 仅在 ggplot2 的图例中更改形状
- c# - 无法使用种子数据初始化 DbContext
- javascript - 为什么 forEach 不适用于新数组?