首页 > 解决方案 > Java 中的 Spark Streaming:使用 JavaInputDStream 使用一个消费者从两个 Kafka 主题中读取

问题描述

我有一个 spark 应用程序,需要使用一个使用 Spark Java 的消费者从两个不同的主题中读取。两个主题的 kafka 消息键和值模式相同。

以下是工作流程:

1. Read messages from both the topics, same groupID, using JavaInputDStream<ConsumerRecord<String, String>> and iterate using foreachRDD
2. Inside the loop, Read offsets, filter messages based on the message key and create JavaRDD<String>
3. Iterate on JavaRDD<String> using mapPartitions
4. Inside mapPartitions loop, iterate over them using forEachRemaining.
5. Perform data enrichment, transformation, etc on the rows inside forEachRemaining loop.
6. commit 

我想了解以下问题。请提供您的答案或分享任何可以帮助我找到答案的文档。

1. How the messages are received/consumed from two topics(one common group id, same schema both key/value) in one consumer.
Let say the consumer reads data every second. Producer1 produces 50 messages to Topic1 and Producer 2 produces 1000 messages to Topic2.
2. Is it going to read all msgs(1000+50) in one batch and process together in the workflow, OR is it going to read 50 msgs first, process them and then read 1000 msgs and process them.
3. What parameter should i use to control the number of messages being read in one batch per second.
4. Will same group id create any issue while consuming.

标签: apache-sparkapache-kafkaspark-streamingkafka-consumer-api

解决方案


Spark Streaming 中的官方文档已经解释了如何为每个组 id 消费多个主题。 https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );
  1. 一个组 id 并遵循两个主题的相同架构。
  2. 对此不确定,但据我了解,它会根据批量大小消耗所有消息。
  3. “spark.streaming.backpressure.enabled”将此设置为true,“spark.streaming.kafka.maxRatePerPartition”将其设置为数值,基于此spark限制每批从kafka消耗的消息数量。还相应地设置批处理持续时间。https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html
  4. 这完全取决于您的应用程序使用情况。

推荐阅读