python - Kafka consumer.poll 与 bitnami 容器一起挂起
问题描述
我在远程服务器上安装了最新的 bitnami kafka 容器。
[2021-04-07 18:05:38,263] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
[2021-04-07 18:05:40,137] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)
我的 kafka 已配置为可以进行外部连接。
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
ports:
- '9092:9092'
- '9093:9093'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
ping 和 telnet 到 IP 地址都可以。
我能够运行生产者并在 python 中发送数据。
import kafka
from time import sleep
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer
#Producer---------------------------------------------------------------
producer = KafkaProducer(bootstrap_servers=['192.xxx.xx.xx:9093'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
producer.send('TestTopic1', value='MyTest')
但是,我无法使用数据。该脚本挂在 consumer.poll 处,并且从不更改行。
import kafka
from time import sleep
from json import dumps
from kafka import KafkaConsumer
# Consumer---------------------------------------------------------------
consumer = KafkaConsumer(
'TestTopic1',
bootstrap_servers=['192.xxx.xx.xx:9093'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='testgroup',
value_deserializer=lambda x : loads(x.decode('utf-8')))
#I've tried both with group_id to None or with a group_id.
print('BEFORE subscribe: ')
consumer.subscribe(['TestTopic1'])
print('BEFORE poll: ')
# HANGS HERE!! Never gets to the print after
consumer.poll(timeout_ms=500)
print('AFTER POLL: ')
consumer.seek_to_beginning()
print('partitions of the topic: ', consumer.partitions_for_topic('TestTopic1'))
for msg in consumer:
print(type(msg))
在 Kafka 日志中,我看到创建了主题以及其他我不太确定它们的意思的行。
[2021-04-07 18:05:40,234] INFO [broker-1001-to-controller-send-thread]: Recorded new controller, from now on will use broker 1001 (kafka.server.BrokerToControllerRequestThread)
[2021-04-07 18:06:37,509] INFO Creating topic __consumer_offsets with configuration {compression.type=producer, cleanup.policy=compact, segment.bytes=104857600} and initial partition assignment Map(23 -> ArrayBuffer(1001), 32 -> ArrayBuffer(1001), 41 -> ArrayBuffer(1001), 17 -> ArrayBuffer(1001), 8 -> ArrayBuffer(1001), 35 -> ArrayBuffer(1001), 44 -> ArrayBuffer(1001), 26 -> ArrayBuffer(1001), 11 -> ArrayBuffer(1001), 29 -> ArrayBuffer(1001), 38 -> ArrayBuffer(1001), 47 -> ArrayBuffer(1001), 20 -> ArrayBuffer(1001), 2 -> ArrayBuffer(1001), 5 -> ArrayBuffer(1001), 14 -> ArrayBuffer(1001), 46 -> ArrayBuffer(1001), 49 -> ArrayBuffer(1001), 40 -> ArrayBuffer(1001), 13 -> ArrayBuffer(1001), 4 -> ArrayBuffer(1001), 22 -> ArrayBuffer(1001), 31 -> ArrayBuffer(1001), 16 -> ArrayBuffer(1001), 7 -> ArrayBuffer(1001), 43 -> ArrayBuffer(1001), 25 -> ArrayBuffer(1001), 34 -> ArrayBuffer(1001), 10 -> ArrayBuffer(1001), 37 -> ArrayBuffer(1001), 1 -> ArrayBuffer(1001), 19 -> ArrayBuffer(1001), 28 -> ArrayBuffer(1001), 45 -> ArrayBuffer(1001), 27 -> ArrayBuffer(1001), 36 -> ArrayBuffer(1001), 18 -> ArrayBuffer(1001), 9 -> ArrayBuffer(1001), 21 -> ArrayBuffer(1001), 48 -> ArrayBuffer(1001), 3 -> ArrayBuffer(1001), 12 -> ArrayBuffer(1001), 30 -> ArrayBuffer(1001), 39 -> ArrayBuffer(1001), 15 -> ArrayBuffer(1001), 42 -> ArrayBuffer(1001), 24 -> ArrayBuffer(1001), 6 -> ArrayBuffer(1001), 33 -> ArrayBuffer(1001), 0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,534] INFO [KafkaApi-1001] Auto creation of topic __consumer_offsets with 50 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,547] INFO Creating topic TestTopic1 with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,557] INFO [KafkaApi-1001] Auto creation of topic TestTopic1 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,906] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(__consumer_offsets-22, __consumer_offsets-30, __consumer_offsets-8, __consumer_offsets-21, __consumer_offsets-4, __consumer_offsets-27, __consumer_offsets-7, __consumer_offsets-9, __consumer_offsets-46, __consumer_offsets-25, __consumer_offsets-35, __consumer_offsets-41, __consumer_offsets-33, __consumer_offsets-23, __consumer_offsets-49, __consumer_offsets-47, __consumer_offsets-16, __consumer_offsets-28, __consumer_offsets-31, __consumer_offsets-36, __consumer_offsets-42, __consumer_offsets-3, __consumer_offsets-18, __consumer_offsets-37, __consumer_offsets-15, __consumer_offsets-24, __consumer_offsets-38, __consumer_offsets-17, __consumer_offsets-48, __consumer_offsets-19, __consumer_offsets-11, __consumer_offsets-13, __consumer_offsets-2, __consumer_offsets-43, __consumer_offsets-6, __consumer_offsets-14, __consumer_offsets-20, __consumer_offsets-0, __consumer_offsets-44, __consumer_offsets-39, __consumer_offsets-12, __consumer_offsets-45, __consumer_offsets-1, __consumer_offsets-5, __consumer_offsets-26, __consumer_offsets-29, __consumer_offsets-34, __consumer_offsets-10, __consumer_offsets-32, __consumer_offsets-40) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:37,979] INFO [Log partition=__consumer_offsets-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:37,991] INFO Created log for partition __consumer_offsets-0 in /bitnami/kafka/data/__consumer_offsets-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:37,992] INFO [Partition __consumer_offsets-0 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-0 (kafka.cluster.Partition)
[2021-04-07 18:06:37,994] INFO [Partition __consumer_offsets-0 broker=1001] Log loaded for partition __consumer_offsets-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,011] INFO [Log partition=__consumer_offsets-29, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
然后我的日志中有另一系列此类消息。
[2021-04-07 18:06:38,563] INFO Created log for partition __consumer_offsets-13 in /bitnami/kafka/data/__consumer_offsets-13 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-13 (kafka.cluster.Partition)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] Log loaded for partition __consumer_offsets-13 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,577] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-22 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-25 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-28 (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,589] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-22 in 12 milliseconds, of which 2 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,596] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-25 in 17 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,597] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-28 in 18 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,638] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(TestTopic1-0) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:38,643] INFO [Log partition=TestTopic1-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:38,644] INFO Created log for partition TestTopic1-0 in /bitnami/kafka/data/TestTopic1-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] No checkpointed highwatermark is found for partition TestTopic1-0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] Log loaded for partition TestTopic1-0 with initial high watermark 0 (kafka.cluster.Partition)
我在这里看不到任何与消费者有关的东西。
请注意,这只是一个开发服务器。我们应该用它作为概念证明,看看 Kafka 是否适合我们,看看我们是否会在 prod 中使用它。
任何帮助将不胜感激,因为我们真的希望能够使其工作并在生产中使用它。
解决方案
安装在远程服务器上。
然后您需要在 中宣传该服务器的地址,KAFKA_CFG_ADVERTISED_LISTENERS
仅端口映射是不够的
超时是因为引导协议返回了广告地址,所以您的远程消费者正试图从localhost:9093
您的生产者也会有类似的问题,但您并没有刷新生产者以实际发送数据
如果您使用 Docker 编排平台进行生产,则需要解决其他网络配置
推荐阅读
- c - 我应该如何验证从 argv 传入的整数值不会溢出?
- javascript - 如何在 Electron 的单个窗口中呈现多个 HTML 文件?
- algorithm - 如何在对角线上找到对称值的索引?
- xml - 基于不包含特定值的后代元素选择元素列表
- node.js - 使用 IAM 角色调用 AWS API Gateway
- groovy - 如何在for循环中使用字符串数组
- android - 在 Unity3D 中使用 AdMob 和受众网络 SDK 的多个 DEX 错误
- html - 如何使内联块容器的左右边距始终相等?
- javascript - 使用给定函数在 JS 中显示当前时间
- django - 从 heroku 上的 django 项目中删除某些 django 应用程序并将其从未来推送中排除