java - 如果一个代理关闭,流应用程序中的 KafkaStream EXACTLY_ONCE 会导致无法重新平衡
问题描述
我有一个带有 kafka-streams 和 kafka-clients 的 Kafka 流应用程序,两者都是 2.4.0,具有以下配置
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
- 经纪人= ip1:port1, ip2:port2,ip3:port3,
- 主题分区:3
- 主题复制:3
场景 1:我只启动 2 个代理(流应用程序在代理 ip 设置中仍然包含三个代理 ip),当我启动我的流应用程序时,会发生以下错误。
2020-02-13 13:28:19.711 WARN 18756 --- [-1-0_0-producer] org.apache.kafka.clients.NetworkClient : [Producer clientId=my-app1-a4c8867f-b914-49bb-bc58-203349700828-StreamThread-1-0_0-producer, transactionalId=my-app1-0_0] Connection to node -2 (/ip2:port2) could not be established. Broker may not be available.
1分钟后
org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app1-a4c8867f-b914-49bb-bc58-203349700828-StreamThread-1] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app1-a4c8867f-b914-49bb-bc58-203349700828-StreamThread-1] task [0_0] Failed to initialize task 0_0 due to timeout.
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:966)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:254)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:176)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
at org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
... 3 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
我正在测试高可用性测试场景。我认为 kafka 应该仍然可以正常工作,因为两个代理中存在正确的复制(我已经使用 kafka GUI 工具进行了检查)。
场景2:今天我注意到当我只启动2个经纪人并给这两个经纪人的ips(即流应用程序只有两个工作经纪人的ip)
2020-02-16 16:18:24.818 INFO 5741 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1-consumer, groupId=my-app] Group coordinator ip2:port2 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-02-16 16:18:24.818 ERROR 5741 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [my-app-0a357371-525b-46cf-9fe1-34ee94fa4158-StreamThread-1] task [0_0] Failed to initialize task 0_0 due to timeout.
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTransactions(StreamTask.java:966)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:254)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:176)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
at org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
... 3 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
注意:如果我没有在属性中设置EXACTLY_ONCE ,则情况并非如此。他们按预期工作。尝试增加 reties 并退出 max ms 但没有帮助。谁能解释我错过了什么?
代理 1 关闭时代理 2 的日志:
[2020-02-17 02:29:00,302] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Retrying leaderEpoch request for partition __consumer_offsets-36 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
Kafak 日志填充了上面的行。
现在一个主要观察:当我关闭代理 2(即代理 1 和代理 3 正在运行)时,我的流应用程序运行良好。 仅当代理 1 关闭时,我的应用程序才会关闭。我猜测应该在所有代理之间分发的一些关键信息只保存在代理 1 中。
解决方案
推荐阅读
- python - 将 numpy 数组转换为张量
- c++ - C++ 强制变量到一个固定的内存位置
- python - 如何在 ubuntu 上安装带有 anaconda 发行版的 Spark?
- javascript - javascript不抛出错误也不显示任何东西
- android - 如何在 Android 中为每个 ViewPager 显示一个 JSON 对象?
- python - Python,一个热门编码器的修改版本
- java - 为什么 Sentry 不能在 Java 中工作,而类似的代码在 python 中工作?
- android - 使用 Android Studio 获取 Android 应用的当前调度程序信息
- javascript - 未捕获的类型错误:无法设置未定义的属性“0” - Javascript
- javascript - 如何在 Discord.js 事件“guildCreate”中发送消息