首页 > 解决方案 > ProducerFencedException 处理 Kafka 流

问题描述

我正在使用卡夫卡 1.1.0。kafka 流始终抛出此异常(尽管消息不同)

WARN o.a.k.s.p.i.RecordCollectorImpl@onCompletion:166 - task [0_0] Error sending record (key KEY value VALUE timestamp TIMESTAMP) to topic OUTPUT_TOPIC due to Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.; No more records will be sent and no more offsets will be recorded for this task.
WARN o.a.k.s.p.i.AssignedStreamsTasks@closeZombieTask:202 - stream-thread [90556797-3a33-4e35-9754-8a63200dc20e-StreamThread-1] stream task 0_0 got migrated to another thread already. Closing it as zombie.
WARN o.a.k.s.p.internals.StreamThread@runLoop:752 - stream-thread [90556797-3a33-4e35-9754-8a63200dc20e-StreamThread-1] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now.
org.apache.kafka.streams.errors.TaskMigratedException: StreamsTask taskId: 0_0
    ProcessorTopology:
        KSTREAM-SOURCE-0000000000:
            topics:
        [INPUT_TOPIC]
            children:   [KSTREAM-PEEK-0000000001]
        KSTREAM-PEEK-0000000001:
            children:   [KSTREAM-MAP-0000000002]
        KSTREAM-MAP-0000000002:
            children:   [KSTREAM-SINK-0000000003]
        KSTREAM-SINK-0000000003:
            topic:
        OUTPUT_TOPIC
        Partitions [INPUT_TOPIC-0]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:238)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: task [0_0] Abort sending since producer got fenced with a previous record 

我不确定是什么导致了这个异常。当我重新启动应用程序时,它似乎在失败之前成功处理了一些记录并出现相同的异常。奇怪的是,即使流设置为只处理一次,记录也会成功处理多次。这是流配置:

Properties streamProperties = new Properties();
streamProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, service.getName());
streamProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
//Should be DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG - but that field is private.
streamProperties.put("default.production.exception.handler", ErrorHandler.class);
streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
streamProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
streamProperties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamProperties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

在三台服务器中,只有两台在重新启动流应用程序时生成相关日志。以下是来自第一台服务器的日志:

[2018-05-09 14:42:14,635] INFO [GroupCoordinator 1]: Member INPUT_TOPIC-09dd8ac8-2cd6-4dd1-b963-63ea804c8fcc-StreamThread-1-consumer-3fedb398-91fe-480a-b5ee-1b5879d0956c in group INPUT_TOPIC has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-05-09 14:42:14,636] INFO [GroupCoordinator 1]: Preparing to rebalance group INPUT_TOPIC with old generation 1 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator)
[2018-05-09 14:42:14,636] INFO [GroupCoordinator 1]: Group INPUT_TOPIC with generation 2 is now empty (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator)
[2018-05-09 14:42:15,848] INFO [GroupCoordinator 1]: Preparing to rebalance group INPUT_TOPIC with old generation 2 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator)
[2018-05-09 14:42:15,848] INFO [GroupCoordinator 1]: Stabilized group INPUT_TOPIC generation 3 (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator)
[2018-05-09 14:42:15,871] INFO [GroupCoordinator 1]: Assignment received from leader for group INPUT_TOPIC for generation 3 (kafka.coordinator.group.GroupCoordinator)

从第二台服务器:

[2018-05-09 14:42:16,228] INFO [TransactionCoordinator id=0] Initialized transactionalId INPUT_TOPIC-0_0 with producerId 2010 and producer epoch 37 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator)
[2018-05-09 14:44:22,121] INFO [TransactionCoordinator id=0] Completed rollback ongoing transaction of transactionalId: INPUT_TOPIC-0_0 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
[2018-05-09 14:44:42,263] ERROR [ReplicaManager broker=0] Error processing append operation on partition OUTPUT_TOPIC-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 37 (request epoch), 38 (server epoch)

看起来第一台服务器看到消费者失败并在向第二台服务器注册之前将其从消费者组中删除。任何想法可能导致消费者失败?或者,有什么想法可以优雅地处理这个失败?可能是这个错误,有人知道可能的解决方法吗?

标签: apache-kafkaapache-kafka-streams

解决方案


我不确定是什么导致了问题,但将 减少max.poll.records到 1 解决了问题。


推荐阅读