首页 > 解决方案 > 使用 Node.js + Kafka 为消费者组创建分布式消费者

问题描述

我正在使用Node.js 10+Apache Kafka 2.3no-kafka npm 包。

目前,我创建了一个复制因子为 3 且分区为 3 的主题。我在 3 个不同的端口上有 3 个 kafka 代理。

使用 no-kafka,我可以看到根据分区计数创建的 3 个消费者并且都驻留在同一台机器上。下面是代码和运行模型的快照。

代码:

var Kafka = require('no-kafka');
var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer({
    connectionString: 'kafka://192.168.1.172:9092, kafka://192.168.1.172:9093, kafka://192.168.1.172:9094'
  });

var dataHandler = function (messageSet, topic, partition) {
    return Promise.each(messageSet, function (m){
        console.log("Topic: " + topic, ", Partition: " + partition, ", Offset: " + m.offset, 
            ", Message: " + m.message.value.toString('utf8'));
        return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
    });
};

var strategies = [{
    subscriptions: ['test'],
    handler: dataHandler
}];

consumer.init(strategies);

当我创建一个生产者并运行它时,我在控制台上得到以下输出。

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-console-producer.sh --broker-list 192.168.1.172:9092 --topic test
>hey
>there
>how are you
>I am
>fine
>and
>how
>about
>you

下面是消费者的输出。

PS D:\checkout\javascript\sample projects\kafka> node .\consumer.js
2019-12-23T15:43:07.822Z INFO no-kafka-client Joined group no-kafka-group-v0.9 generationId 45 as no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7
2019-12-23T15:43:07.822Z INFO no-kafka-client Elected as group leader
2019-12-23T15:43:07.839Z DEBUG no-kafka-client Subscribed to test:0 offset 57 leader 192.168.1.172:9094
2019-12-23T15:43:07.840Z DEBUG no-kafka-client Subscribed to test:1 offset 56 leader 192.168.1.172:9094
2019-12-23T15:43:07.841Z DEBUG no-kafka-client Subscribed to test:2 offset 58 leader 192.168.1.172:9094
Topic: test , Partition: 2 , Offset: 58 , Message: hey
Topic: test , Partition: 1 , Offset: 56 , Message: there
Topic: test , Partition: 0 , Offset: 57 , Message: how are you
Topic: test , Partition: 1 , Offset: 57 , Message: fine
Topic: test , Partition: 2 , Offset: 59 , Message: I am
Topic: test , Partition: 0 , Offset: 58 , Message: and
Topic: test , Partition: 2 , Offset: 60 , Message: how
Topic: test , Partition: 0 , Offset: 59 , Message: you
Topic: test , Partition: 1 , Offset: 58 , Message: about

一切正常,就像是为 1 个消费者服务一样,

1.自动创建1个消费者

2.消息以循环方式分配到主题分区中

3.消费者均匀分布在 3 个分区上以实现负载均衡,但在同一台机器上。

当我使用 kafka 提供的脚本调查消费者组状态时,控制台上会输出以下内容。

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group  no-kafka-group-v0.9 --bootstrap-server 192.168.1.172:9093

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
no-kafka-group-v0.9 test            0          60              60              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client
no-kafka-group-v0.9 test            1          59              59              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client
no-kafka-group-v0.9 test            2          61              61              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client

问题:

  1. 唯一的问题是消费者在同一台机器上。我希望将其分布在不同的机器上,以实现负载平衡和跨硬件利用适当的资源。
  2. 有没有办法做到这一点?

注意:我只能使用 Node.js

标签: apache-kafkakafka-consumer-api

解决方案


我使用kafkajs npm 包解决了这个问题。

注意:请参阅上面的问题以通过控制台连接生产者。

代码:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.1.172:9092', '192.168.1.172:9093', '192.168.1.172:9094']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {

  await consumer.connect()
  await consumer.subscribe({ topic: 'test', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

单端:

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.613Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.625Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:19:02.964Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2019-12-24T03:19:03.015Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[0,1,2]},"groupProtocol":"RoundRobinAssigner","duration":48}
{"level":"ERROR","timestamp":"2019-12-24T03:19:03.620Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{ partition: 0, offset: '107', value: 'fgh' }
{ partition: 2, offset: '109', value: '' }
{ partition: 1, offset: '108', value: 'asdsa' }

2个同时终端:

当我打开另一个终端并在新打开的终端上运行相同的命令时,我得到如下控制台输出

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.229Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.236Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":257}
{"level":"INFO","timestamp":"2019-12-24T03:22:21.530Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:22.236Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}

现在,由于新消费者是通过第二个终端添加的,所以第一个消费者会在控制台上通过以下日志进行通知。

{"level":"INFO","timestamp":"2019-12-24T03:22:26.023Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":39}

3个同时终端:

虽然我们保持以前的终端打开,但我现在打开第三个终端,belos 是控制台。

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.516Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.528Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":273}
{"level":"INFO","timestamp":"2019-12-24T03:28:07.865Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:08.523Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.803Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-499da929-d351-4e59-94c9-88a18e97999d","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0]},"groupProtocol":"RoundRobinAssigner","duration":3937}

第 2 个终端添加重新平衡信息如下:

{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.720Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"192.168.1.172:9093","clientId":"my-app","error":"The group is rebalancing, so a rejoin is needed","correlationId":144,"size":10}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.725Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.801Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":70}

第一个终端将重新平衡信息累加如下:

{"level":"ERROR","timestamp":"2019-12-24T03:28:11.750Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":337}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.797Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[2]},"groupProtocol":"RoundRobinAssigner","duration":40}

所有消费者启动并运行:

通过轰炸生产者事件,下面是消费者在各个分区上监听的快照。现在每个消费者都在监听一个主题的 1 个特定分区,这很棒,现在可以在不同的机器上使用以实现并行性。

在此处输入图像描述

下面是消费者通过运行命令进行分区映射的状态./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group  --bootstrap-server 192.168.1.172:9093

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
test-group      test            2          126             126             0               my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48   my-app
test-group      test            1          124             124             0               my-app-945d6f38-bcda-4f02-b1a2-325957db5846 /192.168.1.48   my-app
test-group      test            0          124             124             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app

关闭 1 个消费者和重新平衡效果:

现在,如果我关闭 1 个消费者,例如在第 3 个终端上,那么下面是在 2 个消费者之间为主题的 3 个分区发生的重新平衡快照:

在此处输入图像描述

下面是消费者通过运行命令进行分区映射的状态./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group  --bootstrap-server 192.168.1.172:9093

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
test-group      test            0          123             123             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app
test-group      test            2          125             125             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app
test-group      test            1          123             123             0               my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48   my-app

推荐阅读