apache-kafka - 使用 Node.js + Kafka 为消费者组创建分布式消费者
问题描述
我正在使用Node.js 10+、Apache Kafka 2.3和no-kafka npm 包。
目前,我创建了一个复制因子为 3 且分区为 3 的主题。我在 3 个不同的端口上有 3 个 kafka 代理。
使用 no-kafka,我可以看到根据分区计数创建的 3 个消费者并且都驻留在同一台机器上。下面是代码和运行模型的快照。
代码:
var Kafka = require('no-kafka');
var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer({
connectionString: 'kafka://192.168.1.172:9092, kafka://192.168.1.172:9093, kafka://192.168.1.172:9094'
});
var dataHandler = function (messageSet, topic, partition) {
return Promise.each(messageSet, function (m){
console.log("Topic: " + topic, ", Partition: " + partition, ", Offset: " + m.offset,
", Message: " + m.message.value.toString('utf8'));
return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
});
};
var strategies = [{
subscriptions: ['test'],
handler: dataHandler
}];
consumer.init(strategies);
当我创建一个生产者并运行它时,我在控制台上得到以下输出。
emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-console-producer.sh --broker-list 192.168.1.172:9092 --topic test
>hey
>there
>how are you
>I am
>fine
>and
>how
>about
>you
下面是消费者的输出。
PS D:\checkout\javascript\sample projects\kafka> node .\consumer.js
2019-12-23T15:43:07.822Z INFO no-kafka-client Joined group no-kafka-group-v0.9 generationId 45 as no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7
2019-12-23T15:43:07.822Z INFO no-kafka-client Elected as group leader
2019-12-23T15:43:07.839Z DEBUG no-kafka-client Subscribed to test:0 offset 57 leader 192.168.1.172:9094
2019-12-23T15:43:07.840Z DEBUG no-kafka-client Subscribed to test:1 offset 56 leader 192.168.1.172:9094
2019-12-23T15:43:07.841Z DEBUG no-kafka-client Subscribed to test:2 offset 58 leader 192.168.1.172:9094
Topic: test , Partition: 2 , Offset: 58 , Message: hey
Topic: test , Partition: 1 , Offset: 56 , Message: there
Topic: test , Partition: 0 , Offset: 57 , Message: how are you
Topic: test , Partition: 1 , Offset: 57 , Message: fine
Topic: test , Partition: 2 , Offset: 59 , Message: I am
Topic: test , Partition: 0 , Offset: 58 , Message: and
Topic: test , Partition: 2 , Offset: 60 , Message: how
Topic: test , Partition: 0 , Offset: 59 , Message: you
Topic: test , Partition: 1 , Offset: 58 , Message: about
一切正常,就像是为 1 个消费者服务一样,
1.自动创建1个消费者
2.消息以循环方式分配到主题分区中
3.消费者均匀分布在 3 个分区上以实现负载均衡,但在同一台机器上。
当我使用 kafka 提供的脚本调查消费者组状态时,控制台上会输出以下内容。
emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group no-kafka-group-v0.9 --bootstrap-server 192.168.1.172:9093
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
no-kafka-group-v0.9 test 0 60 60 0 no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48 no-kafka-client
no-kafka-group-v0.9 test 1 59 59 0 no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48 no-kafka-client
no-kafka-group-v0.9 test 2 61 61 0 no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48 no-kafka-client
问题:
- 唯一的问题是消费者在同一台机器上。我希望将其分布在不同的机器上,以实现负载平衡和跨硬件利用适当的资源。
- 有没有办法做到这一点?
注意:我只能使用 Node.js
解决方案
我使用kafkajs npm 包解决了这个问题。
注意:请参阅上面的问题以通过控制台连接生产者。
代码:
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['192.168.1.172:9092', '192.168.1.172:9093', '192.168.1.172:9094']
})
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'test', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
单端:
PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.613Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.625Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:19:02.964Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2019-12-24T03:19:03.015Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[0,1,2]},"groupProtocol":"RoundRobinAssigner","duration":48}
{"level":"ERROR","timestamp":"2019-12-24T03:19:03.620Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{ partition: 0, offset: '107', value: 'fgh' }
{ partition: 2, offset: '109', value: '' }
{ partition: 1, offset: '108', value: 'asdsa' }
2个同时终端:
当我打开另一个终端并在新打开的终端上运行相同的命令时,我得到如下控制台输出
PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.229Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.236Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":257}
{"level":"INFO","timestamp":"2019-12-24T03:22:21.530Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:22.236Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}
现在,由于新消费者是通过第二个终端添加的,所以第一个消费者会在控制台上通过以下日志进行通知。
{"level":"INFO","timestamp":"2019-12-24T03:22:26.023Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":39}
3个同时终端:
虽然我们保持以前的终端打开,但我现在打开第三个终端,belos 是控制台。
PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.516Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.528Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":273}
{"level":"INFO","timestamp":"2019-12-24T03:28:07.865Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:08.523Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.803Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-499da929-d351-4e59-94c9-88a18e97999d","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0]},"groupProtocol":"RoundRobinAssigner","duration":3937}
第 2 个终端添加重新平衡信息如下:
{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.720Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"192.168.1.172:9093","clientId":"my-app","error":"The group is rebalancing, so a rejoin is needed","correlationId":144,"size":10}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.725Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.801Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":70}
第一个终端将重新平衡信息累加如下:
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.750Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":337}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.797Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[2]},"groupProtocol":"RoundRobinAssigner","duration":40}
所有消费者启动并运行:
通过轰炸生产者事件,下面是消费者在各个分区上监听的快照。现在每个消费者都在监听一个主题的 1 个特定分区,这很棒,现在可以在不同的机器上使用以实现并行性。
下面是消费者通过运行命令进行分区映射的状态./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093
emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test 2 126 126 0 my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48 my-app
test-group test 1 124 124 0 my-app-945d6f38-bcda-4f02-b1a2-325957db5846 /192.168.1.48 my-app
test-group test 0 124 124 0 my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48 my-app
关闭 1 个消费者和重新平衡效果:
现在,如果我关闭 1 个消费者,例如在第 3 个终端上,那么下面是在 2 个消费者之间为主题的 3 个分区发生的重新平衡快照:
下面是消费者通过运行命令进行分区映射的状态./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093
emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test 0 123 123 0 my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48 my-app
test-group test 2 125 125 0 my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48 my-app
test-group test 1 123 123 0 my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48 my-app
推荐阅读
- wifi - Rollerblind 项目问题:Wifi tmr.alarm
- c# - 正则表达式匹配组的值保持为空
- java - 即使路径正确也无法获取路径
- wordpress - 用 WooCommerce 产品搜索替换本机搜索
- python - pd.read_csv 问题,两个不同的表在 .csv 中相互重叠
- python - PyQt5 使用所需字体打开“QFontDialog”
- javascript - 如何使用 requireJs 来包含 electronJs 模块?
- r - 在带有 matplotlib 的 rstudio 中使用带有网状包的 knitter 时出现问题
- python - 更改在 python 的装饰器中用作默认参数的全局变量
- ios - Decode JSON file starting with "[" in SWIFT