首页 > 解决方案 > flink 在检查点上给了我“也可以打开文件”

问题描述

我多次运行相同的流作业,但参数不同。该任务使用关键状态来计算当前事件与收到的最后一个事件之间的差异,并将其发送到 Kafka 的同一主题(我知道这不合逻辑或不常见,但不是我的决定)。经过几次测试一切正常,直到一个小时后我得到这个例外:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_306d8342cb5b2ad8b53f1be57f65bee8_(1/3) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
... 5 more
Caused by: java.io.FileNotFoundException: /home/quantion/flink-checkpoints/PeriodDailyAvrgValuePeak/1cb1374a0ea7dc9d74f86a8de9be3bec/chk-1/3274bb7c-0352-4367-87bc-9f85939f00b3 (Too many open files)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more

更多的

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:420)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.<init>(Selector.java:129)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:156)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)
... 11 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:127)
... 14 more

正如我在类似问题中看到的那样,我检查了该主题的领导者是否正确,但一切似乎都很好。

我已经尝试增加系统可以使用ulimit -n处理的文件数量,但错误仍然存​​在。

我还手动删除了几个检查点文件,因为我将它们保持在取消状态,但到目前为止没有任何影响。

到目前为止我的代码:

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // set up checkpoint
    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStateBackend((StateBackend) new FsStateBackend("file:///home/quantion/flink-checkpoints/diferential-checkpoints"));
    env.enableCheckpointing(3600000);

    if (args.length != 3) throw new Exception("Needs some arguments:\n" +
                "file.jar <dev_type> <sensor_type> <new_label>");

    // get arguments
    Arguments arguments = new Arguments(args);
    String TYPE = arguments.getTYPE();
    String TARGET = arguments.getTARGET();
    String NEW_LABEL = arguments.getNEW_LABEL();

    // Propiedades Kafka
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", HOST+":9092");
    properties.setProperty("zookeeper.connect", HOST+":2181");
    properties.setProperty("group.id", TYPE); //TYPE
    Logger LOG = LoggerFactory.getLogger(DifferentialConsumption.class);

    DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), properties))
            .name("Read Kafka source --topic "+TOPIC);

    DataStream<JSONObject> jsonObjectDataStream = inputStream.map(x -> {
        try {
            return new JSONObject(x);
        } catch (Exception e) {
            LOG.error("An {} occurred.", "error", e);
            return new JSONObject();
        }
    }).name("Parse events to JSONObjects");

    DataStream<JSONObject> filterStream = jsonObjectDataStream.filter(x -> x.has("type") && x.get("type").equals(TYPE) && !x.has(NEW_LABEL))
            .name("Filter events by type: "+TYPE+" and don't have "+NEW_LABEL+" already");

    DataStream<JSONObject> saveSate = filterStream.keyBy(x -> x.getString("id")).flatMap(new Diferential(TARGET, NEW_LABEL))
            .name("KeyedState for differential");
    DataStream<String> streamToString = saveSate.map(JSONObject::toString)
            .name("Parse JSON to String");

    FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(HOST+":9092", TOPIC, new SimpleStringSchema());
    streamToString.addSink(myProducer).name("Write Kafka sink --topic "+TOPIC);
    //streamToString.print();

    // execute program
    env.execute("Get differential for "+TYPE);
}

标签: javaflink-streamingcheckpoint

解决方案


格式错误的消息保留了从检查点恢复和重新打开文件的所有作业,因此即使我重新启动 Flink,错误仍然存​​在。

对于这种情况,我添加了一个JSONObject.has()以在读取参数之前检查参数是否存在,并且在重新启动 Flink 节点后作业可以从检查点恢复。

一个可能的替代解决方案是重新启动 Kafka 中的偏移量,但这可能会导致丢失一些事件。


推荐阅读