apache-kafka - 合并的kafka流的序列化错误
问题描述
我有 N 个包含相同值类型的 Kafka 主题。我希望将这些主题合并为一个,每个键都有节流事件。
这是我目前的代码:
KStream<Long, Event> allEvents = null;
for (String topic : EventsTopics.split(",")) {
KStream<Long, Event> events = builder.stream(topic,
Consumed.with(longAvroSerde, EventsAvroSerde));
if (allEvents == null) {
allEvents = events;
} else {
allEvents = allEvents.merge(events);
}
}
allEvents
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofMillis(0)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.peek((key, value) -> System.out.printf("key=%s, value=%s\n", key, value.toString()))
.to(mergeTopic);
这在只有一个主题时有效。这是 KStreamImpl 的一个片段:
allEvents = {KStreamImpl@1519}
repartitionRequired = false
repartitionNode = null
name = "KSTREAM-SOURCE-0000000000"
keySerde = {PrimitiveAvroSerde@1514}
valSerde = {SpecificAvroSerde@1515}
subTopologySourceNodes = {Collections$SingletonSet@1521} size = 1
streamsGraphNode = {StreamSourceNode@1522} "StreamSourceNode{topicNames=[global_events_uat], topicPattern=null, consumedInternal=org.apache.kafka.streams.kstream.internals.ConsumedInternal@d721838b} StreamsGraphNode{nodeName='KSTREAM-SOURCE-0000000000', buildPriority=0, hasWrittenToTopology=false, keyChangingOperation=false, valueChangingOperation=false, mergeNode=false, parentNodes=[root]}"
builder = {InternalStreamsBuilder@1523}
但是,当包含多个主题时,会出现序列化错误。
Exception in thread "merge-c30bd85c-2b6e-4460-ae3d-b7a5ffa117c5-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: stream-thread [merge-c30bd85c-2b6e-4460-ae3d-b7a5ffa117c5-StreamThread-1] task [0_0] Failed to flush state store KSTREAM-REDUCE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:453)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:357)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:955)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:851)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:714)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:121)
以下是合并后的 KStreamImpl 在调试器中的外观:
allEvents = {KStreamImpl@1525}
repartitionRequired = false
repartitionNode = null
name = "KSTREAM-MERGE-0000000002"
keySerde = null
valSerde = null
subTopologySourceNodes = {HashSet@1527} size = 2
streamsGraphNode = {ProcessorGraphNode@1528} "ProcessorNode{processorParameters=ProcessorParameters{processor class=class org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor, processor name='KSTREAM-MERGE-0000000002'}} StreamsGraphNode{nodeName='KSTREAM-MERGE-0000000002', buildPriority=3, hasWrittenToTopology=false, keyChangingOperation=false, valueChangingOperation=false, mergeNode=true, parentNodes=[KSTREAM-SOURCE-0000000000, KSTREAM-SOURCE-0000000001]}"
builder = {InternalStreamsBuilder@1529}
我是 Kafka Streams 的新手,所以不知道如何调查。非常感谢任何提示。
解决方案
您可以考虑使用 cogroup。您将对每个流进行分组,然后从所有分组的流中创建一个共同分组的流。然后,您可以窗口化并聚合该流。它也比合并许多流然后将它们分组更有效。
KTable<K, CG> cogrouped =
grouped1
.cogroup(aggregator1)
.cogroup(grouped2, aggregator2)
.cogroup(grouped3, aggregator3)
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofMillis(0)))
.aggregate(initializer1, materialized1);
至于 serdes 错误,有时当有多个流传入时,流很难验证要使用的 serdes,它会退回到配置的默认值。我建议您确保设置正确。这可能就是为什么当您有多个流时它们为空的原因。
推荐阅读
- variables - 如何在 Informix 中检索活动 SAVEPOINT 的名称
- reactjs - 错误:在使用 react-redux 时,Provider 的钩子调用无效
- tensorflow - 对象检测:锚框的纵横比和比例
- c# - 命名空间“Microsoft”中不存在类型或命名空间名称“ServiceBus”?
- python - 我将 python 从 3.5 更新到 3.7,现在找不到 manage.py
- python - Pyinstaller 依赖项的许可证
- mysql - 具有事务正确语法和 in 参数的 MySQL SP
- android - Camera2 CaptureResult.CONTROL_AF_STATE - INACTIVE
- java - 如何根据值 JSON 设置选择项?
- c++ - 使用 qtsql 打开加密数据库