apache-spark - Java 中的 Spark Streaming:使用 JavaInputDStream 使用一个消费者从两个 Kafka 主题中读取
问题描述
我有一个 spark 应用程序,需要使用一个使用 Spark Java 的消费者从两个不同的主题中读取。两个主题的 kafka 消息键和值模式相同。
以下是工作流程:
1. Read messages from both the topics, same groupID, using JavaInputDStream<ConsumerRecord<String, String>> and iterate using foreachRDD
2. Inside the loop, Read offsets, filter messages based on the message key and create JavaRDD<String>
3. Iterate on JavaRDD<String> using mapPartitions
4. Inside mapPartitions loop, iterate over them using forEachRemaining.
5. Perform data enrichment, transformation, etc on the rows inside forEachRemaining loop.
6. commit
我想了解以下问题。请提供您的答案或分享任何可以帮助我找到答案的文档。
1. How the messages are received/consumed from two topics(one common group id, same schema both key/value) in one consumer.
Let say the consumer reads data every second. Producer1 produces 50 messages to Topic1 and Producer 2 produces 1000 messages to Topic2.
2. Is it going to read all msgs(1000+50) in one batch and process together in the workflow, OR is it going to read 50 msgs first, process them and then read 1000 msgs and process them.
3. What parameter should i use to control the number of messages being read in one batch per second.
4. Will same group id create any issue while consuming.
解决方案
Spark Streaming 中的官方文档已经解释了如何为每个组 id 消费多个主题。 https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
- 一个组 id 并遵循两个主题的相同架构。
- 对此不确定,但据我了解,它会根据批量大小消耗所有消息。
- “spark.streaming.backpressure.enabled”将此设置为true,“spark.streaming.kafka.maxRatePerPartition”将其设置为数值,基于此spark限制每批从kafka消耗的消息数量。还相应地设置批处理持续时间。https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html
- 这完全取决于您的应用程序使用情况。
推荐阅读
- azure-batch - 如何构建弹性 Azure Batch 应用程序?
- php - 未找到 Guzzle。客户端令牌授予的 API 请求
- azure - Azure AD 链接角色到组
- reactjs - 如何使用 jest 和酶在 reactJs 中构建测试用例以使用 fetch 调用进行测试
- d3.js - 如何显示 .tickValues() 中指定的确切数字
- ruby-on-rails - 带有 GCS 文件的 ActiveStorage 不上传(签名不是 base64 编码的)|| (签名不匹配)
- java - 返回 ?扩展类型
- c# - 升级到 3.0 后 ASP.NET Core 身份验证中断
- javascript - 使用多种方法实现事件管理器
- javascript - 检查接收到的参数是否与对象的名称相同,并从这些对象中获取属性的随机值