docker - Confluent 平台无法正确地将消息发送/消费到 Kafka 主题中
问题描述
目前,我使用 Maven 和 io.fabric8 docker-maven-plugin 来自动启动 Kafka 和 ZooKeeper。这是我当前的配置,效果很好:
<properties>
<zookeeper.port>2181</zookeeper.port>
<kafka.host>127.0.0.1</kafka.host>
<kafka.port>9092</kafka.port>
</properties>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>${docker-maven-plugin.version}</version>
<configuration>
<showLogs>true</showLogs>
<images>
<image>
<name>wurstmeister/zookeeper:${zookeeper.version}</name>
<alias>zookeeper</alias>
<run>
<ports>
<port>${zookeeper.port}:2181</port>
</ports>
</run>
</image>
<image>
<name>wurstmeister/kafka:${kafka.version}</name>
<alias>kafka</alias>
<run>
<ports>
<port>${kafka.port}:9092</port>
</ports>
<links>
<link>zookeeper:zookeeper</link>
</links>
<env>
<KAFKA_ADVERTISED_HOST_NAME>${local.ip}
</KAFKA_ADVERTISED_HOST_NAME>
<KAFKA_ADVERTISED_PORT>${kafka.port}</KAFKA_ADVERTISED_PORT>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
<KAFKA_MESSAGE_MAX_BYTES>15000000</KAFKA_MESSAGE_MAX_BYTES>
</env>
</run>
</image>
<image>
<name>confluentinc/cp-ksql-server:5.0.0</name>
<alias>cp-ksql-server</alias>
<run>
<ports>
<port>8088:8088</port>
</ports>
<links>
<link>kafka:kafka</link>
</links>
<env>
<KSQL_BOOTSTRAP_SERVERS>${local.ip}:9092</KSQL_BOOTSTRAP_SERVERS>
<KSQL_LISTENERS>http://0.0.0.0:8088/</KSQL_LISTENERS>
<KSQL_KSQL_SERVICE_ID>confluent_test_2</KSQL_KSQL_SERVICE_ID>
</env>
</run>
</image>
</images>
</configuration>
</plugin>
现在我需要更多的灵活性和监控工具,以便能够正确分析我的本地 Kafka 实例,这就是为什么我决定开始使用 All-In-One Confluent Platform https://docs.confluent.io/current/quickstart /ce-docker-quickstart.html
我执行了上述文档中描述的步骤,并且能够通过docker-compose up -d
以下docker-compose.yml
https://github.com/confluentinc/cp-docker-images/tree/master/examples/cp-all-in-one上的命令运行 Confluent Platform . 之后,我可以在http://localhost:9021上访问 Confluent 控制中心
我的应用程序也已启动,但消息未正确发送或使用。这是日志:
2018-08-25 20:54:33.786 INFO 11304 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.1.0
2018-08-25 20:54:33.786 INFO 11304 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fdcf75ea326b8e07
2018-08-25 20:54:33.794 INFO 11304 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: NCbZ7abTTdu6SQHhthMbIw
2018-08-25 20:54:33.827 INFO 11304 --- [nio-8080-exec-1] c.p.domain.service.post.PostServiceImpl : Message sent to the post creation queue: Post [id=5b8197d9e67db62c28ebe8d5, status=PENDING, externalPostId=2bdbd597-cc30-4478-b946-e873c8d53eec, chatName= ....]
2018-08-25 20:55:03.875 ERROR 11304 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='Post [id=5b8197d9e67db62c28ebe8d5, status=PENDING, externalPostId=2bdbd597-cc30-4478-b946-e873c8d53e...' to topic post.send:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for post.send-0: 30047 ms has passed since batch creation plus linger time
我可以在那里看到我的主题:
那里可能有什么问题以及如何解决?
更新
我在我的应用程序中启用了 Kafka 上的调试日志,现在可以看到以下错误:
java.io.IOException: Can't resolve address: broker:9092
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-1.1.0.jar:na]
at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:792) [kafka-clients-1.1.0.jar:na]
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1002) [kafka-clients-1.1.0.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Caused by: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_171]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_171]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-1.1.0.jar:na]
... 6 common frames omitted
解决方案
无法解析地址:broker:9092
卡夫卡正在向客户返回它所知道的听众
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
基本上,bootstrap.servers
只需要至少一个地址,但其余的 Kafka 集群地址将返回给客户端,以便与各个代理建立连接。
当您在 Docker 网络之外连接到 时localhost:9092
,Docker 端口转发将起作用,但随后 Kafka 返回到您的客户端broker:9092
,并且客户端断开连接,因为您尚未在主机和 Docker 网络之间设置 DNS(而且您不应该需要,除非使用 Docker Swarm)。
您的解决方案有两种选择
- 在您的代码中使用
localhost:29092
,事情可能会开始变得更好 - 通过 Compose 文件在同一网络设置上的 Docker 容器中运行您的 Java 代码并留下
kafka:9092
参考(这类似于您的 KSQL 和控制中心容器的工作方式)
推荐阅读
- processing - 处理 - 将填充颜色与鼠标移动混合
- c++ - 将向量(或其他任何东西)从外部移动到类成员的正确方法是什么?
- javascript - 当我在页面上滚动时,我的图像会跳动
- node.js - Nodemail - sendMail 无法读取未定义的属性“sendMail”
- vue.js - e.$OneSignal.on 不是 Nuxtjs PWA 的函数 - OneSignal
- django - django 的 AWS redis 服务器配置
- node.js - 如何在 mongodb (mongoose) 中实现食谱
- php - 问题理解 PHP 中的二维数组搜索
- python - 如何在 django 中覆盖密码表单的 help_text 和标签
- flutter - firestore queryBuilder中的多个where查询