java - flink 在检查点上给了我“也可以打开文件”
问题描述
我多次运行相同的流作业,但参数不同。该任务使用关键状态来计算当前事件与收到的最后一个事件之间的差异,并将其发送到 Kafka 的同一主题(我知道这不合逻辑或不常见,但不是我的决定)。经过几次测试一切正常,直到一个小时后我得到这个例外:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_306d8342cb5b2ad8b53f1be57f65bee8_(1/3) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
... 5 more
Caused by: java.io.FileNotFoundException: /home/quantion/flink-checkpoints/PeriodDailyAvrgValuePeak/1cb1374a0ea7dc9d74f86a8de9be3bec/chk-1/3274bb7c-0352-4367-87bc-9f85939f00b3 (Too many open files)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more
更多的
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:420)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.<init>(Selector.java:129)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:156)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)
... 11 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:127)
... 14 more
正如我在类似问题中看到的那样,我检查了该主题的领导者是否正确,但一切似乎都很好。
我已经尝试增加系统可以使用ulimit -n处理的文件数量,但错误仍然存在。
我还手动删除了几个检查点文件,因为我将它们保持在取消状态,但到目前为止没有任何影响。
到目前为止我的代码:
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set up checkpoint
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend((StateBackend) new FsStateBackend("file:///home/quantion/flink-checkpoints/diferential-checkpoints"));
env.enableCheckpointing(3600000);
if (args.length != 3) throw new Exception("Needs some arguments:\n" +
"file.jar <dev_type> <sensor_type> <new_label>");
// get arguments
Arguments arguments = new Arguments(args);
String TYPE = arguments.getTYPE();
String TARGET = arguments.getTARGET();
String NEW_LABEL = arguments.getNEW_LABEL();
// Propiedades Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", HOST+":9092");
properties.setProperty("zookeeper.connect", HOST+":2181");
properties.setProperty("group.id", TYPE); //TYPE
Logger LOG = LoggerFactory.getLogger(DifferentialConsumption.class);
DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), properties))
.name("Read Kafka source --topic "+TOPIC);
DataStream<JSONObject> jsonObjectDataStream = inputStream.map(x -> {
try {
return new JSONObject(x);
} catch (Exception e) {
LOG.error("An {} occurred.", "error", e);
return new JSONObject();
}
}).name("Parse events to JSONObjects");
DataStream<JSONObject> filterStream = jsonObjectDataStream.filter(x -> x.has("type") && x.get("type").equals(TYPE) && !x.has(NEW_LABEL))
.name("Filter events by type: "+TYPE+" and don't have "+NEW_LABEL+" already");
DataStream<JSONObject> saveSate = filterStream.keyBy(x -> x.getString("id")).flatMap(new Diferential(TARGET, NEW_LABEL))
.name("KeyedState for differential");
DataStream<String> streamToString = saveSate.map(JSONObject::toString)
.name("Parse JSON to String");
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(HOST+":9092", TOPIC, new SimpleStringSchema());
streamToString.addSink(myProducer).name("Write Kafka sink --topic "+TOPIC);
//streamToString.print();
// execute program
env.execute("Get differential for "+TYPE);
}
解决方案
格式错误的消息保留了从检查点恢复和重新打开文件的所有作业,因此即使我重新启动 Flink,错误仍然存在。
对于这种情况,我添加了一个JSONObject.has()
以在读取参数之前检查参数是否存在,并且在重新启动 Flink 节点后作业可以从检查点恢复。
一个可能的替代解决方案是重新启动 Kafka 中的偏移量,但这可能会导致丢失一些事件。
推荐阅读
- python - 光束辐射倾斜系数计算
- dart - 撕下时不要创建 lambda
- php - 带有用户输入到 Phpmyadmin 的 SQL 插入语句不起作用
- javascript - 可以在 Launch 中工作的 _satellite.isOutboundLink() 的任何等效项
- vue.js - 按计算数组中的日期范围过滤?
- sas - 在 SAS 中定义字符数组
- java - 使用 maven 并行运行 junit 测试用例
- python - 仅在存在对象时才进行连接
- java - 无法再连接到我的 sql docker 容器
- python - How to apply function to Pyspark dataframe column?