首页 > 解决方案 > 使用 RDKAFKA 将多个组 id 分配给多个消费者

问题描述

我在 php 中使用 RDKAFKA 来实现并行运行消费者。

但是第一个消费者消费了来自主题的所有消息,所以第二个消费者没有得到主题的消息。

所以我对不同的消费者使用了不同的组 ID,但同样的问题仍然存在。

请帮我。 $conf->set('group.id', 'myConsumerGroup'); $conf->set('metadata.broker.list', '127.0.0.2'); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $conf->setDefaultTopicConf( $topicConf); $consumer1 = new RdKafka\KafkaConsumer( $conf); $消费者1->订阅(['dbtest']);而(真){ $j = 0; $消息=$消费者1->消费(120*1000);

switch ( $message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $dataEx = json_decode( $message->payload,true); var_dump($数据); $sql = "INSERT INTO emp (name, email) VALUES ('" $.dataEx['name']."','". $dataEx['email']."')";

    `$`servername = "localhost";
    `$`username = "A";
    `$`password = "ASD";
    `$`dbname = "test";
    `$`conn = new mysqli(`$`servername, `$`username, `$`password, `$`dbname);
    if (`$`conn->connect_error) {
        die("Connection failed: " . `$`conn->connect_error);
    }

    if (`$`conn->query(`$`sql) === TRUE) {
        echo "New record created successfully to datbase ".`$`dbname."/n";
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn->error;
    }
    `$`conn->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
$j++;

} 回声“这里”; $conf->set('group.id', 'myConsumerGroup1'); $conf->set('metadata.broker.list', '127.0.0.2'); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $conf->setDefaultTopicConf( $topicConf);

$consumer2 = new RdKafka\KafkaConsumer( $conf); 回声“奏鸣曲”; $消费者2->订阅(['dbtest']);while (true) { $message2 = $consumer2->consume(120*1000); switch ( $message2->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $dataEx = json_decode(message2- $>payload,true); ``$sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','". $dataEx['email']."')"; $servername1 = "本地主机"; $用户名1 = "A"; $password1 = "ASD"; $dbname1 = "test1"; $conn1 = new mysqli( $servername1, $username1, $password1, $dbname1);$$conn1->connect_error); }

    if (`$`conn1->query(`$`sql) === TRUE) {
        echo "New record created successfully ".`$`dbname1;
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn1->error;
    }
    `$`conn1->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
`$`j++;

}

标签: phpapache-kafka

解决方案


推荐阅读