php - php \RdKafka\Consumer 批量消息
问题描述
您好,我在 php 中使用 Kafka。特别是这个库:\RdKafka。这是我的脚本,它工作正常,但我一个接一个地接收消息,但我希望得到 10 个 10 个。需要一些帮助来实现这一点,因为我是使用 Kafka 的新手。
消费者连接到 Kafka 并正确获取消息。
<?php
include_once("config.php");
const REQUEST_SLEEP_TIME = 12*1000; //12 seg
$conf = new RdKafka\Conf();
$conf->set("bootstrap.servers", $KAFKA_SOCKET);
$conf->set("group.id", "test-consumer-group");
$rk = new RdKafka\Consumer($conf);
$topicConf = new RdKafka\TopicConf();
$topicConf->set("request.required.acks", 1);
$topicConf->set("auto.commit.enable", 0);
$topicConf->set("auto.commit.interval.ms", 100);
$topicConf->set("offset.store.method", "broker");
$topic = $rk->newTopic(KAFKA_TOPIC, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_END);
$i = 0;
while (true) {
echo "start $i \n";
$message = $topic->consume(0, REQUEST_SLEEP_TIME);
if (is_null($message)) {
sleep(1);
echo "No more messages: ".date("H:i:s")."\n";
continue;
}
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "RD_KAFKA_RESP_ERR_NO_ERROR\n";
print_r($message->payload."\n");
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
$i++;
echo "end $i\n";
}
解决方案
推荐阅读
- memory - FPGA Stratix 3 存储器能否处理大量数据?
- javascript - Javascript函数计算自以来经过的年数
- python - Pandas 中的条件累积和
- c++ - 如何调用存储在数组中的数组的值
- javascript - 赛普拉斯测试:如何比较 AJAX 调用前后的元素数量
- c++ - 如何使用 DirectX 11 渲染多个网格
- swift - 使用 ARKit 在 AR 中放置的 3D 对象中存储“标题”等数据
- java - 使用 Netty 限制 HTTP/2 中的流读取
- python - 使用 python 客户端从 yaml 创建 Kubernetes CronJob
- c# - 如何使用循环更改名称相似的内部类?