首页 > 解决方案 > 如何使用kafka流处理块/批处理数据?

问题描述

对于大数据中的许多情况,最好一次使用少量记录缓冲区,而不是一次处理一条记录。

最自然的例子是调用一些支持批处理以提高效率的外部 API。

我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。

到目前为止,我有:

builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")

我想要的是:

builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")

在 Scala 和 Akka Streams 中,该函数被称为groupedor batch。在 Spark Structured Streaming 中,我们可以做到mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))

标签: javascalaapache-sparkapache-kafkaapache-kafka-streams

解决方案


似乎还不存在。观看此空间https://issues.apache.org/jira/browse/KAFKA-7432


推荐阅读