首页 > 解决方案 > 从进程元素内的多个线程读取 flink MapState 键值会引发 ConcurrentModification 异常

问题描述

在我的 flink 工作中,我将数据保存在 mapstate 中几分钟以进行进一步处理,在流程元素中,我需要借助 mapstate 值执行一组操作,因此我使用多个线程在 mapstate 内容的帮助下进行处理。线程没有修改统计信息,它只获取导致 CocurrentModification 异常的键值,而且这个 TTL 异常也出现在配置的 TTL 时间之前,日志如下。

`java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkRuntimeException: Failed to incrementally clean up state with TTL
    at java.util.concurrent.FutureTask.report(Futur`enter code here`eTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.XXXXXX.processResults(DataProcessor.java:198)
    at com.XXXXXX.processElement(DataProcessor.java:151)
    at com.XXXXXX.processElement(DataProcessor.java:1)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$753/1478413652.accept(Unknown Source)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$706/2073221180.runDefaultAction(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to incrementally clean up state with TTL
    at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.stateAccessed(TtlIncrementalCleanup.java:60)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory$$Lambda$789/1622849283.run(Unknown Source)
    at org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:59)
    at org.apache.flink.runtime.state.ttl.TtlMapState.get(TtlMapState.java:54)
    at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
    at com.XXXXXX.treamFunctions.findPreviousData(DATAExecutor.java:216)
    at com.XXXXXX.previousVal(DataExecutor.java:175)
    at com.XXXXXX.$FlinkStreamFunctions.execute(DATAExecutor.java:107)
    at com.XXXXXX.evaluateFunction(ExpressionEvaluator.java:89)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
    at java.util.ArrayList$Itr.next(ArrayList.java:851)
    at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.runCleanup(TtlIncrementalCleanup.java:78)
    at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.stateAccessed(TtlIncrementalCleanup.java:58)
    ... 17 more

java apache-flink`

标签: apache-flink

解决方案


简短的回答是 Flink 的状态抽象不是为并发访问而设计的,不应该在多个线程之间共享。

长答案是 Flink 的状态抽象不仅仅是一个Map或一个只存储一些值的值容器。当用户通过这些抽象访问 Flink 状态时,在后台可能会发生多种事情。例如,如果您配置了 TTL,它将确保 TTL 没有过期。此外,如果您使用键控流,它将确保您根据当前键获得正确的状态值。根据StateBackend操作,抽象可能必须从磁盘读取状态并为您反序列化。

话虽如此,在某些条件下,仍然可以在线程之间共享实际状态值。如果您的线程只使用状态而不写入状态,那么您可以读出状态并将实际值传递给其他线程。

如果您需要将并发线程的更新写回状态,那么事情会变得有点复杂。您必须确保从 Flink 的Task线程访问状态。FutureTask此外,当您想将状态从 a 写回Flink 状态时,您必须确保正在处理正确的键。因此,您需要一些簿记来建立键与并发任务之间的映射。但是请注意,这可能会变得非常棘手,特别是如果您想确保正确的故障转移行为。

或者,您可以查看 Flink 的异步 I/O 运算符。使用异步 I/O 运算符时,您无法从运算符访问状态。您必须输出结果并将状态保存在下游运算符中。


推荐阅读