apache-spark - Spark Streaming中批处理时间和提交时间相差50分钟
问题描述
spark版本是2.2.0伪代码:
用 5 分钟的窗口从 kafka 读取 data1
从 10 分钟窗口和 5 分钟幻灯片持续时间的 kafka 读取数据2
data1 在某些条件下加入 data2
做一些 agg 并写入 mysql
问题:批处理时间是15:00但是提交时间是15:50,处理时间不到1分钟。发生了什么?
val shareDs = KafkaUtils.createDirectStream[String, String](streamContext, LocationStrategies.PreferBrokers, shareReqConsumer)
val shareResDS = KafkaUtils.createDirectStream[String, String](streamContext, LocationStrategies.PreferBrokers, shareResConsumer).window(Minutes(WindowTime), Minutes(StreamTime))
shareDs doSomeMap join (shareResDs doSomeMap) forEachRddd{do some things then write to mysql}
有一些日志:
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:20:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_afp_com_input_result-2, topic_wh_sparkstream_afp_com_input_result-1, topic_wh_sparkstream_afp_com_input_result-0]
19/07/22 11:20:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] (Re-)joining group
19/07/22 11:25:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Successfully joined group with generation 820
19/07/22 11:25:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-6, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_afp_com_input_result-2, topic_wh_sparkstream_afp_com_input_result-1, topic_wh_sparkstream_afp_com_input_result-0]
19/07/22 11:25:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:25:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:25:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_decision_report_result-1, topic_wh_sparkstream_decision_report_result-2, topic_wh_sparkstream_decision_report_result-0]
19/07/22 11:25:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] (Re-)joining group
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Successfully joined group with generation 821
19/07/22 11:30:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-5, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_decision_report_result-1, topic_wh_sparkstream_decision_report_result-2, topic_wh_sparkstream_decision_report_result-0]
19/07/22 11:30:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_echo_mixed_risk_record-1, topic_wh_sparkstream_echo_mixed_risk_record-2, topic_wh_sparkstream_echo_mixed_risk_record-0]
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] (Re-)joining group
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Marking the coordinator 10.124.35.112:9092 (id: 2147483534 rack: null) dead
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Discovered group coordinator 10.124.35.112:9092 (id: 2147483534 rack: null)
19/07/22 11:30:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] (Re-)joining group
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Successfully joined group with generation 822
19/07/22 11:35:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-4, groupId=dashboard] Setting newly assigned partitions [topic_wh_sparkstream_echo_mixed_risk_record-1, topic_wh_sparkstream_echo_mixed_risk_record-2, topic_wh_sparkstream_echo_mixed_risk_record-0]
19/07/22 11:35:00 INFO dstream.MappedDStream: Slicing from 1563765000000 ms to 1563765600000 ms (aligned to 1563765000000 ms and 1563765600000 ms)
19/07/22 11:35:00 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Revoking previously assigned partitions [topic_wh_sparkstream_echo_mixed_risk_result_detail-2, topic_wh_sparkstream_echo_mixed_risk_result_detail-1, topic_wh_sparkstream_echo_mixed_risk_result_detail-0, topic_wh_sparkstream_echo_behavior_features_result-0, topic_wh_sparkstream_echo_behavior_features_result-1, topic_wh_sparkstream_echo_behavior_features_result-2]
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] (Re-)joining group
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Marking the coordinator 10.124.35.112:9092 (id: 2147483534 rack: null) dead
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] Discovered group coordinator 10.124.35.112:9092 (id: 2147483534 rack: null)
19/07/22 11:35:00 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-3, groupId=dashboard] (Re-)joining group
在窗口时间戳,只做 kafka 重新分区而不是添加作业。
解决方案
我解决了这个问题。使用带有 kafka 的 spark-Streaming,使用单独的 group_id 配置每个流并禁用自动提交,配置适当的 kafka 参数。特别是心跳、会话超时、请求超时、最大轮询间隔。
推荐阅读
- android - 共享房间数据库备份导致文件不受支持或为空
- r - 在 R 编程中使用变量打印下一个闰年的值
- javascript - Leaflet.JS 试图扩展 Circle 类
- java - Java 大型 2D 双矩阵不会在 Eclipse 中打印到控制台
- c# - 如何在控制器中插入否定测试用例?
- wordpress - 如何修复 WordPress 中的“URL 错误 #:操作在 300001 毫秒后超时,收到 0 个字节”
- django - 试图用 django 频道围绕消息流
- amazon-web-services - 如何使用 AWS SNS 向特定用户发送通知?
- wso2 - capp 文件中的 WSO2 EI 自定义招摇
- javascript - 删除 json 对象中的动态不必要属性