apache-flink - 从进程元素内的多个线程读取 flink MapState 键值会引发 ConcurrentModification 异常
问题描述
在我的 flink 工作中,我将数据保存在 mapstate 中几分钟以进行进一步处理,在流程元素中,我需要借助 mapstate 值执行一组操作,因此我使用多个线程在 mapstate 内容的帮助下进行处理。线程没有修改统计信息,它只获取导致 CocurrentModification 异常的键值,而且这个 TTL 异常也出现在配置的 TTL 时间之前,日志如下。
`java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkRuntimeException: Failed to incrementally clean up state with TTL
at java.util.concurrent.FutureTask.report(Futur`enter code here`eTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.XXXXXX.processResults(DataProcessor.java:198)
at com.XXXXXX.processElement(DataProcessor.java:151)
at com.XXXXXX.processElement(DataProcessor.java:1)
at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$753/1478413652.accept(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$706/2073221180.runDefaultAction(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to incrementally clean up state with TTL
at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.stateAccessed(TtlIncrementalCleanup.java:60)
at org.apache.flink.runtime.state.ttl.TtlStateFactory$$Lambda$789/1622849283.run(Unknown Source)
at org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:59)
at org.apache.flink.runtime.state.ttl.TtlMapState.get(TtlMapState.java:54)
at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
at com.XXXXXX.treamFunctions.findPreviousData(DATAExecutor.java:216)
at com.XXXXXX.previousVal(DataExecutor.java:175)
at com.XXXXXX.$FlinkStreamFunctions.execute(DATAExecutor.java:107)
at com.XXXXXX.evaluateFunction(ExpressionEvaluator.java:89)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.runCleanup(TtlIncrementalCleanup.java:78)
at org.apache.flink.runtime.state.ttl.TtlIncrementalCleanup.stateAccessed(TtlIncrementalCleanup.java:58)
... 17 more
java apache-flink`
解决方案
简短的回答是 Flink 的状态抽象不是为并发访问而设计的,不应该在多个线程之间共享。
长答案是 Flink 的状态抽象不仅仅是一个Map
或一个只存储一些值的值容器。当用户通过这些抽象访问 Flink 状态时,在后台可能会发生多种事情。例如,如果您配置了 TTL,它将确保 TTL 没有过期。此外,如果您使用键控流,它将确保您根据当前键获得正确的状态值。根据StateBackend
操作,抽象可能必须从磁盘读取状态并为您反序列化。
话虽如此,在某些条件下,仍然可以在线程之间共享实际状态值。如果您的线程只使用状态而不写入状态,那么您可以读出状态并将实际值传递给其他线程。
如果您需要将并发线程的更新写回状态,那么事情会变得有点复杂。您必须确保从 Flink 的Task
线程访问状态。FutureTask
此外,当您想将状态从 a 写回Flink 状态时,您必须确保正在处理正确的键。因此,您需要一些簿记来建立键与并发任务之间的映射。但是请注意,这可能会变得非常棘手,特别是如果您想确保正确的故障转移行为。
或者,您可以查看 Flink 的异步 I/O 运算符。使用异步 I/O 运算符时,您无法从运算符访问状态。您必须输出结果并将状态保存在下游运算符中。
推荐阅读
- python-3.x - 按钮没有ID也没有名称,python Selenium
- python-3.x - 在递归打印 1 到 N 个连续数字后打印“换行符”
- python - 创建堆叠的垂直子菜单,在单击时显示/隐藏小部件
- yugabyte-db - 如何在 YugabyteDB 中的时间戳列上缩放范围分片索引?
- java - 更改 doInBackground 中的通用返回类型
- javascript - 有没有办法在不知道变量是什么的情况下知道 if 语句中变量的值?
- regex - 使用 .htaccess 重写根文件夹以外的 URL
- flutter - 单击抽屉项目列表时尝试替换脚手架主体时出现黑屏
- python - 以下代码的时间复杂度是多少?O(nlogn) 还是 O(N)?复杂度 O(nlogn) 是什么时候?
- .net - XSLT - 用逗号对数字求和