apache-flink - Flink Kafka Producer 用于 AVRO 消息抛出 UnsupportedOperationException
问题描述
我将 flink 管道定义为
- 使用来自 kafka 主题的冒号分隔字符串
- 使用地图 API 将其转换为 AVRO 记录
- 将其生成回不同的 kafka 主题
代码如下
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>("topic-input", new SimpleStringSchema(), prop));
SingleOutputStreamOperator<GenericRecord> transformedData = messageStream.map(new RichMapFunction<String, GenericRecord>() {
...
});
transformedData.addSink(new FlinkKafkaProducer<GenericRecord>("topic-output", new KafkaSerializationSchema<GenericRecord>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(GenericRecord genericRecord, @Nullable Long aLong) {
...
return new ProducerRecord(...);
}
}, prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
当生成主题(主题输入)的消息时,我遇到了错误,有什么线索吗?
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
...
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
.....
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
解决方案
推荐阅读
- java - 工厂返回带有 EJB 注入字段 Null 的 Bean
- ios - 如何在 MapKit 中限制平移和缩小苹果地图的某个区域?
- java - Freemarker 宏中的 Hashmap 不起作用
- python - 根据匹配索引合并两个数据帧以更新数据帧中的其他列
- android - 使用 espresso 为 React-native 应用程序编写 UI 测试
- python - 如何将包含短语的 str 列表转换为 int 列表?
- selenium - Chrome 开发者工具中的 Selenium IDE 在哪里?
- typescript - 将复杂类型分配给另一个复杂类型,而不是抛出错误
- xml - 将 XML 转换为 CSV tensorflow 对象检测 api
- php - 通过将变量名作为字符串传递来调用变量