首页 > 解决方案 > 如何修复与组协调器相关的kafka流问题不可用或无效,将尝试重新发现

问题描述

当我尝试使用 PROCESSING_GUARANTEE_CONFIG 运行 Kafka Streams 应用程序时遇到问题,对于其他情况,它等于“恰好一次语义”,例如至少一次语义它工作得很好。

我在日志中注意到出了点问题,我在这里找到了一些建议来解决这个问题,但不幸的是它没有帮助我:(

03:35:28.627 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Discovered group coordinator kafka:9093 (id: 2147483646 rack: null)
03:35:28.627 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Group coordinator kafka:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
03:35:28.628 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=to-transform] Discovered group coordinator kafka:9093 (id: 2147483646 rack: null)
03:35:28.628 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Group coordinator kafka:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
03:35:48.628 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Discovered group coordinator kafka:9093 (id: 2147483646 rack: null)
03:35:48.630 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Found no committed offset for partition topic-0
03:35:48.631 INFO  o.a.k.c.c.KafkaConsumer - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
03:35:48.631 INFO  o.a.k.s.p.i.StreamThread - stream-thread [transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
03:35:48.631 INFO  o.a.k.s.KafkaStreams - stream-client [transform-f8268b2b-4673-49ac-9396-6a2b86d45697] State transition from REBALANCING to RUNNING
03:35:48.632 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Attempt to heartbeat failed for since member id transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer-6aacbde6-4553-43ee-bc2f-2b5718e55acf is not valid.
03:35:48.632 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Found no committed offset for partition topic-0
03:35:48.633 INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Resetting offset for partition topic-0 to offset 0.
03:35:48.634 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
03:35:48.634 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Lost previously assigned partitions topic-0
03:35:48.634 INFO  o.a.k.s.p.i.StreamThread - stream-thread [transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1] at state RUNNING: partitions [topic-0] lost due to missed rebalance.

例如,如果我只运行单个 kafka 代理节点,那么我必须将分区和复制配置设置为 1,第二个建议是重新启动也没有结果的 kafka 代理

  kafka:
    image: wurstmeister/kafka:2.12-2.4.1
    ports:
      - "9092:9092"
      - "9093:9093"
    depends_on:
      - zookeeper 
    links:
      - zookeeper:zk
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENERS: OUTSIDE://kafka:9092,INSIDE://kafka:9093
      KAFKA_ADVERTISED_LISTENERS: OUTSIDE://localhost:9092,INSIDE://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_RETENTION_HOURS: 1
      KAFKA_MESSAGE_MAX_BYTES: 1048576
      KAFKA_REPLICA_FETCH_MAX_BYTES: 1048576
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 30000
      KAFKA_NUM_PARTITIONS: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_DELETE_RETENTION_MS: 86400000
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: topic:1:1, transform:1:1

感谢您的帮助,维克多

标签: apache-kafkaapache-kafka-streams

解决方案


观察到的问题可能有很多原因。一般来说,exaclty-once 更昂贵,并且会给代理和 KafkaStreams 应用程序带来更高的负载。

另请注意,如果您真的想获得一次性处理,您应该使用至少 3 个代理运行(并且主题应该配置为复制因子为 3,min-isr 为 2)。否则,EOS 无法真正得到保证。

增加commit.interval.ms可能有助于缓解问题。请注意,对于 EOS,它可能会导致更高的处理延迟(这就是如果启用 EOS,默认提交间隔减少到 100 毫秒的原因)。如果您可以接受更高的延迟,您可能希望将其增加到例如 1 秒。

此外,对 EOS 进行了大量投资,新版本包含许多改进。如果可以,您可能希望升级到即将发布的 2.6 版本并测试新的“eos_beta”处理模式(需要代理 2.5 或更高版本)。


推荐阅读