首页 > 解决方案 > 有没有办法根据大小刷新 Kafka Streams WindowStore?

问题描述

我正在使用 DSL API 编写 Kafka Streams 应用程序,该 API 将从 kafka 主题中读取元组。在拓扑中,我想批量处理元组。然后,如果(1)30 秒过去或(2)批处理大小超过 1 GB,我想将浴写入磁盘上的文件。

我使用 TimeWindowedKStream 编写了组元组的拓扑。然后调用聚合并传递一个窗口存储。

我的问题是,当状态存储尝试写入 Kafka ChangeLog 时,我得到一个

org.apache.kafka.common.errors.RecordTooLargeException

例外。

尤其是:

原因:org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sent since an error catched with a previous record (key \x00\x00\x00\x06\x00\x00\x01h$\xE7\x88 \x00\x00\x00\x00 值 [B@419761c 时间戳 1546807396524)到主题 ibv2-capt-consumer-group-3-record-store-changelog 由于 org.apache.kafka.common.errors.RecordTooLargeException:请求包括大于服务器将接受的最大消息大小的消息..

我尝试设置CACHE_MAX_BYTES_BUFFERING_CONFIG为 1MB,但正如文档所述,此配置适用于整个拓扑。

这是我的拓扑

这是我一直在使用的 Scala 代码。注意我在这里使用的是 kafka-streams-scala。

val builder = new StreamsBuilderS()

import com.lightbend.kafka.scala.streams.DefaultSerdes._

implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]

val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)

val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10)) 

val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))

import org.apache.kafka.common.utils.Bytes

val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
  .as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
  .withKeySerde(integerSerde)
  .withValueSerde(recordSeqSerde)
  .withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava)  // increased max.request.size to 10 x default

val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
  () => RecordSeq(Seq()),
  (randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
  windowedStore
)

val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")

recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))

注:案例类 RecordSeq(records: Seq[Record])

标签: apache-kafka-streams

解决方案


主题可以具有属性中定义的最大大小的记录message.max.bytes 。这是代理可以接收并附加到主题中的最大消息大小。您的记录大小可能超过该限制。因此,您需要更改此属性的配置以允许更大的记录大小。

它可以设置在代理级别以及主题级别。您可以在此处参考更多详细信息:

http://kafka.apache.org/documentation/#brokerconfigs

http://kafka.apache.org/documentation/#topicconfigs


推荐阅读