php - 使用 RDKAFKA 将多个组 id 分配给多个消费者
问题描述
我在 php 中使用 RDKAFKA 来实现并行运行消费者。
但是第一个消费者消费了来自主题的所有消息,所以第二个消费者没有得到主题的消息。
所以我对不同的消费者使用了不同的组 ID,但同样的问题仍然存在。
请帮我。
$
conf->set('group.id', 'myConsumerGroup');
$
conf->set('metadata.broker.list', '127.0.0.2');
$
topicConf = new RdKafka\TopicConf();
$
topicConf->set('auto.offset.reset', 'smallest');
$
conf->setDefaultTopicConf( $
topicConf);
$
consumer1 = new RdKafka\KafkaConsumer( $
conf);
$
消费者1->订阅(['dbtest']);而(真){
$
j = 0;
$
消息=$
消费者1->消费(120*1000);
switch ( $
message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR:
$
dataEx = json_decode( $
message->payload,true); var_dump($
数据);
$
sql = "INSERT INTO emp (name, email) VALUES ('" $
.dataEx['name']."','". $
dataEx['email']."')";
`$`servername = "localhost";
`$`username = "A";
`$`password = "ASD";
`$`dbname = "test";
`$`conn = new mysqli(`$`servername, `$`username, `$`password, `$`dbname);
if (`$`conn->connect_error) {
die("Connection failed: " . `$`conn->connect_error);
}
if (`$`conn->query(`$`sql) === TRUE) {
echo "New record created successfully to datbase ".`$`dbname."/n";
} else {
echo "Error: " . `$`sql . "<br>" . `$`conn->error;
}
`$`conn->close();
echo "produced `$`j----------------------<br> ";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception(`$`message->errstr(), `$`message->err);
break;
}
$j++;
} 回声“这里”;
$
conf->set('group.id', 'myConsumerGroup1');
$
conf->set('metadata.broker.list', '127.0.0.2');
$
topicConf = new RdKafka\TopicConf();
$
topicConf->set('auto.offset.reset', 'smallest');
$
conf->setDefaultTopicConf( $
topicConf);
$
consumer2 = new RdKafka\KafkaConsumer( $
conf); 回声“奏鸣曲”;
$
消费者2->订阅(['dbtest']);while (true) {
$
message2 = $
consumer2->consume(120*1000); switch ( $
message2->err) { case RD_KAFKA_RESP_ERR_NO_ERROR:
$
dataEx = json_decode(message2- $
>payload,true); ``$sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','". $
dataEx['email']."')"; $servername1 = "本地主机"; $用户名1 = "A"; $password1 = "ASD"; $dbname1 = "test1";
$
conn1 = new mysqli( $
servername1, $
username1, $
password1, $
dbname1);$
$
conn1->connect_error); }
if (`$`conn1->query(`$`sql) === TRUE) {
echo "New record created successfully ".`$`dbname1;
} else {
echo "Error: " . `$`sql . "<br>" . `$`conn1->error;
}
`$`conn1->close();
echo "produced `$`j----------------------<br> ";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception(`$`message->errstr(), `$`message->err);
break;
}
`$`j++;
}
解决方案
推荐阅读
- matplotlib-basemap - cartopy:NOAA APT 图像上的地图叠加层
- c++ - 在课堂组合中建立数据链接的最佳方式
- spring - Run and close child context from parent app
- awk - awk / gawk printf 可变格式字符串时,将零更改为破折号
- html - 日期选择器和时间选择器应该分开
- swift - 从 Swift 中的 Firebase 实时数据库获取当前服务器时间
- c# - 如何转换为传递给函数的类型?
- c# - 警告:源文档中没有元素匹配 '/configuration/connectionStrings/add[@name='MyDB']'
- javascript - Angular - 如何访问注释类方法
- ruby-on-rails - 无法将 Ruby 应用程序部署到 Heroku