apache-flink - 如何防止 Flink 作业自行取消
问题描述
环境
我的 Flink Job 在独立集群上运行,会话模式。版本是 1.13 ( https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode )
问题
它从 kafka 读取消息并接收到 MySQL 和 HBase。我通知 Flink 在工作了大约两天后取消了该作业,而 Flink Cluster 仍然很好。
我的观察
在 flink 日志中,我看到在作业最终取消之前,已经从 MySQL 接收器抛出了近 #100,000# 个 SQL 异常。
2021-07-15 09:26:17,455 WARN com.exceeddata.vcloud.mysql.sink.StagingDataSliceSink [] - insert slice already exists with packet id: 370913 and slice_num: 8
java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '370913-8' for key 'packet_id_slice_num'
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:370) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at com.exceeddata.vcloud.mysql.sink.StagingDataSliceSink.invoke(StagingDataSliceSink.java:153) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at com.exceeddata.vcloud.mysql.sink.StagingDataSliceSink.invoke(StagingDataSliceSink.java:16) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:82) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) [flink-dist_2.11-1.13.1.jar:1.13.1]
at com.exceeddata.vcloud.streaming.vdata.VDataStreamingJob$6.processElement(VDataStreamingJob.java:112) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at com.exceeddata.vcloud.streaming.vdata.VDataStreamingJob$6.processElement(VDataStreamingJob.java:86) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) [flink-dist_2.11-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) [flink-dist_2.11-1.13.1.jar:1.13.1]
但是,这个异常应该在我们的代码中被捕获,并且警告消息已经证明它确实被捕获了:
if(ex.getMessage().toLowerCase().contains("unique key")
|| ex.getMessage().toLowerCase().contains("duplicate entry")) {
// flink job does print this warning message,
LOG.warn("insert slice already exists with packet id: " + packetId + " and slice_num: " + dataSlice.getSliceNum(), ex);
} else {
LOG.error("insert staging_data_slice error, packet id: " + packetId + " and slice_num: " + dataSlice.getSliceNum(), ex);
throw ex;
}
我的其他观察
我从 kafka 获取消息并在我的本地环境中重新生成问题,flink 集群和作业都很好
我的首选方法
我想继续我的工作
【7月20日更新】Job Manager日志
// 在这部分之前有很多行completed checkpoint xxxx for job xxxx
2021-07-16 17:01:43,878 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 19094 for job ad2b1cc19dbc5fc782e67bcea8a483ed (1061 bytes in 3 ms).
2021-07-16 17:01:44,498 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job vData streaming (ad2b1cc19dbc5fc782e67bcea8a483ed) switched from state RUNNING to CANCELLING.
2021-07-16 17:01:44,498 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ingress_kafka -> Process -> (Sink: HBase: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice_error, <Mapping>: StagingDataSliceModel to StagingDataSliceError -> Sink: HBase: sink to staging_data_slice_error, Sink: MySQL: sink to staging_message_error, Sink: MySQL: sink to staging_data_slice_out_of_scope_error, Sink: MySQL: sink to staging_temp_vcompute) (1/1) (776d7910e418d672ffaedda4d237dbb5) switched from RUNNING to CANCELING.
2021-07-16 17:01:44,513 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ingress_kafka -> Process -> (Sink: HBase: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice_error, <Mapping>: StagingDataSliceModel to StagingDataSliceError -> Sink: HBase: sink to staging_data_slice_error, Sink: MySQL: sink to staging_message_error, Sink: MySQL: sink to staging_data_slice_out_of_scope_error, Sink: MySQL: sink to staging_temp_vcompute) (1/1) (776d7910e418d672ffaedda4d237dbb5) switched from CANCELING to CANCELED.
2021-07-16 17:01:44,514 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job vData streaming (ad2b1cc19dbc5fc782e67bcea8a483ed) switched from state CANCELLING to CANCELED.
2021-07-16 17:01:44,514 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job ad2b1cc19dbc5fc782e67bcea8a483ed
2021-07-16 17:01:44,514 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job ad2b1cc19dbc5fc782e67bcea8a483ed.
2021-07-16 17:01:44,514 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
2021-07-16 17:01:44,514 INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint with ID 19094 at 'file:/home/edgeuser/flinkData/checkpoints/ad2b1cc19dbc5fc782e67bcea8a483ed/chk-19094' not discarded.
2021-07-16 17:01:44,514 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job ad2b1cc19dbc5fc782e67bcea8a483ed reached terminal state CANCELED.
2021-07-16 17:01:44,523 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job vData streaming(ad2b1cc19dbc5fc782e67bcea8a483ed).
2021-07-16 17:01:44,523 INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [ef791c2d1fd5461b6a15306f4dcd682f].
2021-07-16 17:01:44,525 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job ad2b1cc19dbc5fc782e67bcea8a483ed
2021-07-16 17:01:44,525 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection efe52ea6c6b1862bf96eab2b63616f43: Stopping JobMaster for job vData streaming(ad2b1cc19dbc5fc782e67bcea8a483ed)..
2021-07-16 17:01:44,525 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_8 for job ad2b1cc19dbc5fc782e67bcea8a483ed from the resource manager.
2021-07-16 17:04:56,790 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@10.131.133.189:41366] has failed, address is now gated for [50] ms. Reason: [Disassociated]
2021-07-16 17:04:56,790 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink-metrics@10.131.133.189:37250] has failed, address is now gated for [50] ms. Reason: [Disassociated]
2021-07-16 17:04:57,456 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2021-07-16 17:04:57,457 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:34942
2021-07-16 17:04:57,457 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2021-07-16 17:04:57,458 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Shutting down rest endpoint.
2021-07-16 17:04:57,488 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2021-07-16 17:04:57,488 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2021-07-16 17:04:57,502 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2021-07-16 17:04:57,502 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2021-07-16 17:04:57,520 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2021-07-16 17:04:57,520 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
解决方案
推荐阅读
- java - 无法使用 Java 编辑 XSL 属性值
- amazon-web-services - 如何触发服务器的侦听器
- assembly - 当内核开发者想写汇编时,他们是写高层次的,然后用编译器转换,还是直接写汇编?
- django - 从 `views.py` 在 django 中动态创建模型
- python - 在 Python 中访问外部函数和类
- tomcat - Nginx 提供静态中断页面但不显示图像 - 他们 502
- node.js - 通过猫鼬模式传递硬编码值
- ansible - 使用预定义的用户名和密码运行 Ansible playbook 任务
- sas - SAS:使用调用符号宏变量作为过滤标准
- sapui5 - sap.ui.core.format.DateFormat.format 给出错误的日期