首页 > 解决方案 > 重启后 Kafka 消费者不阅读消息。我正在使用 weiboad / kafka-php

问题描述

以下是我的消费者和生产者脚本: 消费者脚本

$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setGroupId('test');
$config->setBrokerVersion('1.1.1');
$config->setTopics(['Request']);
$config->setOffsetReset('earliest');
$consumer = new Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});

制片人脚本

$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setBrokerVersion('1.1.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'Request',
'value' => 'My Test Data',
],
];
}
);
$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode) {
var_dump($errorCode);
});
$producer->send(true);

它工作正常。当消费者正在运行并且生产者正在发布数据时,消费者能够读取它。但是一旦我停止消费者然后发布数据然后启动消费者,消费者就不会读取该数据。我究竟做错了什么?有人可以帮忙吗?

标签: apache-kafka

解决方案


如果您设置auto.offset.resetearliest它并不意味着消费者将始终从一开始就读取事件。该设置用于定义没有提交偏移量时的消费者行为。或者,它可以配置为latestnone

enable.auto.commit默认为true这意味着 Kafka 消费者将定期提交其当前偏移量。

此外,您有一个已配置的消费者组 ID,这意味着您的应用程序将在重启后继续消费。

如果您总是想从一开始就处理消息,您可以设置enable.auto.commit=false或删除组 ID。

请看消费者配置

影响偏移管理的两个主要设置是是否启用自动提交和偏移重置策略。首先,如果您设置 enable.auto.commit(这是默认设置),那么消费者将按照 auto.commit.interval.ms 设置的时间间隔定期自动提交偏移量。默认值为 5 秒。

其次,使用 auto.offset.reset 来定义消费者在没有提交位置时(第一次初始化组时就是这种情况)或偏移量超出范围时的行为。您可以选择将位置重置为“最早”偏移或“最新”偏移(默认值)。如果您希望自己设置初始偏移量并且愿意手动处理超出范围的错误,也可以选择“无”。


推荐阅读