apache-kafka - 重启后 Kafka 消费者不阅读消息。我正在使用 weiboad / kafka-php
问题描述
以下是我的消费者和生产者脚本: 消费者脚本
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setGroupId('test');
$config->setBrokerVersion('1.1.1');
$config->setTopics(['Request']);
$config->setOffsetReset('earliest');
$consumer = new Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
制片人脚本
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('localhost:9092');
$config->setBrokerVersion('1.1.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
function() {
return [
[
'topic' => 'Request',
'value' => 'My Test Data',
],
];
}
);
$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode) {
var_dump($errorCode);
});
$producer->send(true);
它工作正常。当消费者正在运行并且生产者正在发布数据时,消费者能够读取它。但是一旦我停止消费者然后发布数据然后启动消费者,消费者就不会读取该数据。我究竟做错了什么?有人可以帮忙吗?
解决方案
如果您设置auto.offset.reset
为earliest
它并不意味着消费者将始终从一开始就读取事件。该设置用于定义没有提交偏移量时的消费者行为。或者,它可以配置为latest
或none
。
enable.auto.commit
默认为true
这意味着 Kafka 消费者将定期提交其当前偏移量。
此外,您有一个已配置的消费者组 ID,这意味着您的应用程序将在重启后继续消费。
如果您总是想从一开始就处理消息,您可以设置enable.auto.commit=false
或删除组 ID。
请看消费者配置
影响偏移管理的两个主要设置是是否启用自动提交和偏移重置策略。首先,如果您设置 enable.auto.commit(这是默认设置),那么消费者将按照 auto.commit.interval.ms 设置的时间间隔定期自动提交偏移量。默认值为 5 秒。
其次,使用 auto.offset.reset 来定义消费者在没有提交位置时(第一次初始化组时就是这种情况)或偏移量超出范围时的行为。您可以选择将位置重置为“最早”偏移或“最新”偏移(默认值)。如果您希望自己设置初始偏移量并且愿意手动处理超出范围的错误,也可以选择“无”。
推荐阅读
- java - 切换字符串中多个单词的首尾字母
- python - 如何检查是否使用 SQLAlchemy ORM 更新了条目?
- vue.js - Vue.js 在选项之间切换时将 previuos 值更改回默认值
- python-3.x - 在不移动任何按钮的情况下更新 tkinter
- google-chrome - Google Chrome/MS Edge 版本 87 会影响 iFrame 的可见性吗?
- javascript - 如何在 axios 请求后修复 vuejs 中的值更新延迟?
- airflow - 在 Airflow 中,他们希望您做什么而不是在任务之间传递数据
- php - laravel 7所有路由都指向实时服务器上的主页,但路由在本地工作
- python - pytesseract.image_to_string 似乎无法从图像中提取文本
- php - Joomla 组件“高级模块管理器”不适用于 j2store。怎么做?