首页 > 解决方案 > Flink Kafka Producer 用于 AVRO 消息抛出 UnsupportedOperationException

问题描述

我将 flink 管道定义为

  1. 使用来自 kafka 主题的冒号分隔字符串
  2. 使用地图 API 将其转换为 AVRO 记录
  3. 将其生成回不同的 kafka 主题

代码如下

DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>("topic-input", new SimpleStringSchema(), prop));
SingleOutputStreamOperator<GenericRecord> transformedData = messageStream.map(new RichMapFunction<String, GenericRecord>() {
...
});             
transformedData.addSink(new FlinkKafkaProducer<GenericRecord>("topic-output", new KafkaSerializationSchema<GenericRecord>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(GenericRecord genericRecord, @Nullable Long aLong) {
                ...
                return new ProducerRecord(...);
            }
        }, prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

            

当生成主题(主题输入)的消息时,我遇到了错误,有什么线索吗?

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
    ...
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
    .....
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) 

标签: apache-flinkavro

解决方案


推荐阅读