首页 > 解决方案 > 当第二个流的数据可能尚不可用时,如何比较 Spark 中的两个 RDD?

问题描述

我正在开发一个 Spark 应用程序,该应用程序从两个不同的主题topic_atopic_b一个 Kafka 服务器流式传输数据。我想使用两个流并检查来自两个主题的数据是否相等。

val streamingContext = new StreamingContext(sparkContext, Seconds(batchDuration))
val eventStream = KafkaUtils.createDirectStream[String, String](streamingContext, PreferConsistent, Subscribe[String, String](topics, consumerConfig))

def start(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = {
    // ...

    def cmp(rddA: RDD[ConsumerRecord[String, String]], rddB: RDD[ConsumerRecord[String, String]]): Unit = {
        // Do compare...
        // but rddA or rddB may be empty! :-(
    }

    val rddTopicA = rdd.filter(_.topic == 'topic_a')
    val rddTopicB = rdd.filter(_.topic == 'topic_b')
    cmp(rddTopicA, rddTopicB)

}


eventStream.foreachRDD((x, y) => start((x, y)))
streamingContext.start()
streamingContext.awaitTermination()

问题是,在比较 中的两个 RDD 时cmp,其中一个 RDD 可能为空,因为数据流在 Kafka 中可能尚不可用。是否有可能以某种方式等到两个 RDD 具有相同数量的行然后开始比较?还是先把有数据的RDD转成DataSet,然后临时存起来,方便以后比较?

标签: scalaapache-sparkapache-kafka

解决方案


推荐阅读