apache-kafka - flink 检查点和 kafka 生产者完全一次
问题描述
当我只使用一次创建 kafka 生产者时,如果我还使用检查点,则会导致这样的问题:
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1155)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1132)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1111)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:150)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1093)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1031)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:881)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
我该如何解决?
解决方案
推荐阅读
- python-3.x - 带有输入、while 循环和 if/else 语句的 Python 字符串比较函数
- java - 从 Jframe 创建图像
- r - 根据特定列中的值将值分配给特定单元格
- c++ - 在赋值运算符中调用复制构造函数,需要左值作为赋值的左操作数
- office-js - Office.js Outlook 加载项 InternetHeaders Prefer ImmutableId
- html - 我的 foreach 循环不像以前那样工作
- python - 如何在数据框 Pandas 中插入数据框
- matlab - 用于组合文件夹中的 .fig 文件以生成视频/动画的 MATLAB 代码?
- mysql - 为每个成员返回顶部字段的 SQL 查询
- c# - Dev Express Extreme Grid - 通过 IF 条件添加列属性