javascript - Kafka.JS 拒绝连接 <<[BrokerPool] 无法连接到种子代理,正在尝试列表中的另一个代理>>
问题描述
考虑 Kafka Producer:
const { Kafka, logLevel } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
logLevel: logLevel.ERROR,
});
const run = async () => {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'test-topic',
messages: [{ value: 'Hello KafkaJS user!' }],
});
await producer.disconnect();
};
run();
每当我点击: node producer.js 时,响应是:
C:\Development-T410\Kafka>node producer
{"level":"ERROR","timestamp":"2020-10-09T08:24:07.646Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka1:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:07.674Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":325}
{"level":"ERROR","timestamp":"2020-10-09T08:24:09.004Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka2:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:09.006Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":630}
{"level":"ERROR","timestamp":"2020-10-09T08:24:10.639Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka1:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:10.640Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1228}
{"level":"ERROR","timestamp":"2020-10-09T08:24:12.871Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka2:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:12.872Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":3,"retryTime":2104}
{"level":"ERROR","timestamp":"2020-10-09T08:24:13.464Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka1","broker":"kafka1:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka1\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26)"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:14.789Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka2","broker":"kafka2:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka2\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26)"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:15.722Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka1","broker":"kafka1:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka1\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26)"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:15.978Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka1:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:15.979Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4610}
{"level":"ERROR","timestamp":"2020-10-09T08:24:17.059Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka2","broker":"kafka2:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka2\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26)"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:17.980Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka1","broker":"kafka1:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka1\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26)"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:21.593Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka2:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2020-10-09T08:24:21.594Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":5,"retryTime":8310}
(node:9624) UnhandledPromiseRejectionWarning: KafkaJSNonRetriableError
Caused by: KafkaJSConnectionError: Connection timeout
at Timeout.onTimeout [as _onTimeout] (C:\Development-T410\Kafka\node_modules\kafkajs\src\network\connection.js:165:23)
at listOnTimeout (internal/timers.js:549:17)
at processTimers (internal/timers.js:492:7)
(Use `node --trace-warnings ...` to show where the warning was created)
(node:9624) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
(node:9624) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
{"level":"ERROR","timestamp":"2020-10-09T08:24:22.857Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo ENOTFOUND kafka2","broker":"kafka2:9092","clientId":"my-app","stack":"Error: getaddrinfo ENOTFOUND kafka2\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:66:26)"}
知道如何解决这个问题吗?
解决方案
我试图重现同样的问题。但是这段代码在brokers: ['localhost:9092']
. 此代码没有问题,您应该检查您的 Kafka 服务器。您的 Kafka 服务器可能配置不正确。
尝试从终端创建主题。您可以在 Kafka 安装文件夹的 bin 目录中找到所有脚本。
跑
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 1 --replication-factor 1
检查主题是否创建成功。如果不是,您的 Kafka 代理有问题。
推荐阅读
- node.js - 为什么 func 不是 node.js 中的函数?
- php - 如何使表格页眉和表格页脚出现在每一页上?
- serialization - 什么时候在 Django Rest Framework 中使用序列化器?
- java-8 - 将 Java 流映射到以对象为键的对象映射
- python-3.x - 接受可迭代序列并以相反的顺序返回它们
- bash - 过滤来自“git diff”命令的响应以仅获取 Shell 中的差异 - 动态解决方案
- spring - Spring boot starter web 项目:eclipse 无法解析注解 @RestController 和 @RequestMapping
- angularjs - Angularjs ng-disabled 不能使用自动填充
- apache-spark - Cassandra:身份验证错误...在集群配置中找不到身份验证器
- python - 已安装 `zlib` 但仍然出现错误。RuntimeError:压缩需要(缺少)zlib 模块