apache-kafka-streams - 有没有办法根据大小刷新 Kafka Streams WindowStore?
问题描述
我正在使用 DSL API 编写 Kafka Streams 应用程序,该 API 将从 kafka 主题中读取元组。在拓扑中,我想批量处理元组。然后,如果(1)30 秒过去或(2)批处理大小超过 1 GB,我想将浴写入磁盘上的文件。
我使用 TimeWindowedKStream 编写了组元组的拓扑。然后调用聚合并传递一个窗口存储。
我的问题是,当状态存储尝试写入 Kafka ChangeLog 时,我得到一个
org.apache.kafka.common.errors.RecordTooLargeException
例外。
尤其是:
原因:org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sent since an error catched with a previous record (key \x00\x00\x00\x06\x00\x00\x01h$\xE7\x88 \x00\x00\x00\x00 值 [B@419761c 时间戳 1546807396524)到主题 ibv2-capt-consumer-group-3-record-store-changelog 由于 org.apache.kafka.common.errors.RecordTooLargeException:请求包括大于服务器将接受的最大消息大小的消息..
我尝试设置CACHE_MAX_BYTES_BUFFERING_CONFIG
为 1MB,但正如文档所述,此配置适用于整个拓扑。
这是我的拓扑
这是我一直在使用的 Scala 代码。注意我在这里使用的是 kafka-streams-scala。
val builder = new StreamsBuilderS()
import com.lightbend.kafka.scala.streams.DefaultSerdes._
implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]
val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)
val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10))
val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))
import org.apache.kafka.common.utils.Bytes
val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
.as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
.withKeySerde(integerSerde)
.withValueSerde(recordSeqSerde)
.withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava) // increased max.request.size to 10 x default
val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
() => RecordSeq(Seq()),
(randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
windowedStore
)
val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")
recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))
注:案例类 RecordSeq(records: Seq[Record])
解决方案
主题可以具有属性中定义的最大大小的记录message.max.bytes
。这是代理可以接收并附加到主题中的最大消息大小。您的记录大小可能超过该限制。因此,您需要更改此属性的配置以允许更大的记录大小。
它可以设置在代理级别以及主题级别。您可以在此处参考更多详细信息:
推荐阅读
- flutter - 未找到 Flutter 网络图像图像
- android - ItemTouchHelper.Callback.getDefaultUIUtil().onDraw() 改变 z-index
- c# - 如何使用启动表单应用程序打开 API 控制器?
- node.js - 如何在自定义 Jest 节点环境中使用 ES6?
- python - 如何查找用户在服务器 Discord.py 中发送的消息数
- c# - Entity Framework Core 迁移问题与动态连接字符串取决于标头值和另一个数据库
- apache-nifi - 如何将 HDF NiFi 版本升级到最新版本?
- java - 自定义警报对话框背景颜色
- python - 使用列表导航 selenium web 驱动程序
- api - Jmeter 中的 x-appiyo-key 和 x-appiyo-hash 是什么?