java - 如何使用kafka流处理块/批处理数据?
问题描述
对于大数据中的许多情况,最好一次使用少量记录缓冲区,而不是一次处理一条记录。
最自然的例子是调用一些支持批处理以提高效率的外部 API。
我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。
到目前为止,我有:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
我想要的是:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
在 Scala 和 Akka Streams 中,该函数被称为grouped
or batch
。在 Spark Structured Streaming 中,我们可以做到mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
。
解决方案
似乎还不存在。观看此空间https://issues.apache.org/jira/browse/KAFKA-7432
推荐阅读
- excel - 如何使用 Powershell 脚本导出与特定 IIS 站点相关的证书详细信息
- javascript - 如何从 javascript 函数调用 vuejs 函数?
- javascript - node.js 无法从 ionic 4 发出 post 请求
- html - VBA。从站点解析数据
- ruby - ruby MS windows gem:win32/daemon:Service_Main线程异常退出
- nginx - Nginx 的 sub_filter 不会替换 CSS 和 JS 中的路径
- android - 棒棒糖设备上未安装 INSTALL_FAILED_SHARED_USER_INCOMPATIBLE 应用程序
- c# - ASP Net Core / Entity Framework 表单保存错误
- regex - 正则表达式完全匹配 - 数据表
- mongodb - 根据文档中存储的参数查找匹配的文档