首页 > 解决方案 > 合并的kafka流的序列化错误

问题描述

我有 N 个包含相同值类型的 Kafka 主题。我希望将这些主题合并为一个,每个键都有节流事件。

这是我目前的代码:

KStream<Long, Event> allEvents = null;
for (String topic : EventsTopics.split(",")) {
    KStream<Long, Event> events = builder.stream(topic,
            Consumed.with(longAvroSerde, EventsAvroSerde));

    if (allEvents == null) {
        allEvents = events;
    } else {
        allEvents = allEvents.merge(events);
    }
}

allEvents
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofMillis(0)))
    .reduce((value1, value2) -> value2)
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .toStream()
    .peek((key, value) -> System.out.printf("key=%s, value=%s\n", key, value.toString()))
    .to(mergeTopic);

这在只有一个主题时有效。这是 KStreamImpl 的一个片段:

allEvents = {KStreamImpl@1519} 
 repartitionRequired = false
 repartitionNode = null
 name = "KSTREAM-SOURCE-0000000000"
 keySerde = {PrimitiveAvroSerde@1514} 
 valSerde = {SpecificAvroSerde@1515} 
 subTopologySourceNodes = {Collections$SingletonSet@1521}  size = 1
 streamsGraphNode = {StreamSourceNode@1522} "StreamSourceNode{topicNames=[global_events_uat], topicPattern=null, consumedInternal=org.apache.kafka.streams.kstream.internals.ConsumedInternal@d721838b} StreamsGraphNode{nodeName='KSTREAM-SOURCE-0000000000', buildPriority=0, hasWrittenToTopology=false, keyChangingOperation=false, valueChangingOperation=false, mergeNode=false, parentNodes=[root]}"
 builder = {InternalStreamsBuilder@1523} 

但是,当包含多个主题时,会出现序列化错误。

Exception in thread "merge-c30bd85c-2b6e-4460-ae3d-b7a5ffa117c5-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: stream-thread [merge-c30bd85c-2b6e-4460-ae3d-b7a5ffa117c5-StreamThread-1] task [0_0] Failed to flush state store KSTREAM-REDUCE-STATE-STORE-0000000003
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:453)
    at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:357)
    at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:955)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:851)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:714)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
    at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:121)                

以下是合并后的 KStreamImpl 在调试器中的外观:

allEvents = {KStreamImpl@1525} 
 repartitionRequired = false
 repartitionNode = null
 name = "KSTREAM-MERGE-0000000002"
 keySerde = null
 valSerde = null
 subTopologySourceNodes = {HashSet@1527}  size = 2
 streamsGraphNode = {ProcessorGraphNode@1528} "ProcessorNode{processorParameters=ProcessorParameters{processor class=class org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor, processor name='KSTREAM-MERGE-0000000002'}} StreamsGraphNode{nodeName='KSTREAM-MERGE-0000000002', buildPriority=3, hasWrittenToTopology=false, keyChangingOperation=false, valueChangingOperation=false, mergeNode=true, parentNodes=[KSTREAM-SOURCE-0000000000, KSTREAM-SOURCE-0000000001]}"
 builder = {InternalStreamsBuilder@1529} 

我是 Kafka Streams 的新手,所以不知道如何调查。非常感谢任何提示。

标签: apache-kafkaapache-kafka-streams

解决方案


您可以考虑使用 cogroup。您将对每个流进行分组,然后从所有分组的流中创建一个共同分组的流。然后,您可以窗口化并聚合该流。它也比合并许多流然后将它们分组更有效。

KTable<K, CG> cogrouped =
  grouped1
    .cogroup(aggregator1)
    .cogroup(grouped2, aggregator2)
    .cogroup(grouped3, aggregator3)
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofMillis(0)))
    .aggregate(initializer1, materialized1);

至于 serdes 错误,有时当有多个流传入时,流很难验证要使用的 serdes,它会退回到配置的默认值。我建议您确保设置正确。这可能就是为什么当您有多个流时它们为空的原因。


推荐阅读