首页 > 解决方案 > flink 检查点和 kafka 生产者完全一次

问题描述

当我只使用一次创建 kafka 生产者时,如果我还使用检查点,则会导致这样的问题:

java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1155)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1132)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1111)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1093)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1031)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

我该如何解决?

标签: apache-kafkaapache-flinkflink-streaming

解决方案


推荐阅读