首页 > 解决方案 > 如何将多个主题的数据集中到一个地方进行处理?

问题描述

我有一个要求,我必须从 3 个 kafka 主题获取消息作为流数据,然后根据这 3 个主题数据之间的连接生成结果。请建议我使用 Direct Stream for Scala 的好方法。谢谢

标签: scalaapache-kafkaspark-streaming

解决方案


如果不同主题的数据相同,并且消费数据时处理逻辑相同,则可以从同一流中的不同主题消费并进行聚合。如果不同主题的处理逻辑不同,则指定concurrentThreads为4,然后在4个流之间进行聚合。您可以查看spark 结构化流式文档以从多个主题中消费。

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

<--- your aggregation logic here --->

推荐阅读