docker - NodeJS:KafkaJSProtocolError:组成员支持的协议与现有成员的协议不兼容
问题描述
我正在尝试使用 MongoDB debezium 连接器从 Kafka 捕获数据,但是当我尝试使用 KafkaJS 读取数据时出现错误:
KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members
我正在使用 docker 图像来捕获数据。
以下是步骤,我正在关注:
启动 Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
启动卡夫卡
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
我已经在复制模式下运行了 MongoDB
启动 debezium Kafka 连接
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka debezium/connect:latest
然后发布 MongoDB 连接器配置
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'
有了这个,如果我运行一个观察者 docker 容器,我可以在控制台中以 Json 格式数据
docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test
但我想在应用程序中捕获这些数据,以便我可以对其进行操作、处理并推送到 ElasticSearch。为此我正在使用
https://github.com/tulios/kafkajs
但是当我运行消费者代码时,我得到了错误。这是代码示例
//'use strict';
// clientId=connect-1, groupId=1
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'connect-1',
brokers: ['localhost:9092', 'localhost:9093']
})
// Consuming
const consumer = kafka.consumer({ groupId: '1' })
var consumeMessage = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
consumeMessage();
KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members
解决方案
您不应在 Connect 和 KafkaJS 使用者中使用相同的 groupId。如果你这样做,它们将成为同一个消费者组的一部分,这意味着消息只会被其中一个或另一个消费,如果它甚至可以工作的话。
如果您将 KafkaJS 消费者的 groupId 更改为独特的,它应该可以工作。
请注意,默认情况下,新的 KafkaJS 消费者组将从最新的偏移量开始消费,因此它不会消费已经产生的消息。您可以使用调用fromBeginning
中的标志覆盖此行为。consumer.subscribe
见https://kafka.js.org/docs/sumption#from-beginning
推荐阅读
- java - 需要帮助,使用 java 以结构化方式解析 PDF 文件
- python - 计算所选元素为最大值的子数组的数量
- java - 如何在 Spring Boot 项目中显示 Thymeleaf 中的对象列表?
- node.js - 如何使用 exec 函数处理 mysql 命令行密码提示
- node.js - NodeJS & MongoDB - 无法使用稍后在 mongoDB 搜索中使用的表单发送参数
- python - 有没有办法在另一个图上绘制普通最小二乘类型的线?
- c - C 语言和数据结构
- css - 如何使用输入类型提交按钮设置文本区域的样式
- html - url中的'#'元素不重定向
- python - For 循环为用户输入的列表与硬编码列表提供不同的结果