首页 > 解决方案 > Kafka consumer.poll 与 bitnami 容器一起挂起

问题描述

我在远程服务器上安装了最新的 bitnami kafka 容器。

[2021-04-07 18:05:38,263] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
[2021-04-07 18:05:40,137] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)

我的 kafka 已配置为可以进行外部连接。

kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
ports:
  - '9092:9092'
  - '9093:9093'
environment:
  - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
  - ALLOW_PLAINTEXT_LISTENER=yes
  - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
  - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
  - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
  - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT

ping 和 telnet 到 IP 地址都可以。

我能够运行生产者并在 python 中发送数据。

import kafka
from time import sleep
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer

#Producer---------------------------------------------------------------
producer = KafkaProducer(bootstrap_servers=['192.xxx.xx.xx:9093'],
                     value_serializer=lambda x: 
                     dumps(x).encode('utf-8'))

producer.send('TestTopic1', value='MyTest')

但是,我无法使用数据。该脚本挂在 consumer.poll 处,并且从不更改行。

import kafka
from time import sleep
from json import dumps
from kafka import KafkaConsumer

# Consumer---------------------------------------------------------------
consumer = KafkaConsumer(
    'TestTopic1',
    bootstrap_servers=['192.xxx.xx.xx:9093'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id='testgroup',
    value_deserializer=lambda x : loads(x.decode('utf-8')))

#I've tried both with group_id to None or with a group_id.

print('BEFORE subscribe: ')
consumer.subscribe(['TestTopic1'])

print('BEFORE poll: ')
# HANGS HERE!! Never gets to the print after
consumer.poll(timeout_ms=500)

print('AFTER POLL: ')
consumer.seek_to_beginning()

print('partitions of the topic: ', consumer.partitions_for_topic('TestTopic1'))

for msg in consumer:
    print(type(msg))

在 Kafka 日志中,我看到创建了主题以及其他我不太确定它们的意思的行。

[2021-04-07 18:05:40,234] INFO [broker-1001-to-controller-send-thread]: Recorded new controller, from now on will use broker 1001 (kafka.server.BrokerToControllerRequestThread)
[2021-04-07 18:06:37,509] INFO Creating topic __consumer_offsets with configuration {compression.type=producer, cleanup.policy=compact, segment.bytes=104857600} and initial partition assignment Map(23 -> ArrayBuffer(1001), 32 -> ArrayBuffer(1001), 41 -> ArrayBuffer(1001), 17 -> ArrayBuffer(1001), 8 -> ArrayBuffer(1001), 35 -> ArrayBuffer(1001), 44 -> ArrayBuffer(1001), 26 -> ArrayBuffer(1001), 11 -> ArrayBuffer(1001), 29 -> ArrayBuffer(1001), 38 -> ArrayBuffer(1001), 47 -> ArrayBuffer(1001), 20 -> ArrayBuffer(1001), 2 -> ArrayBuffer(1001), 5 -> ArrayBuffer(1001), 14 -> ArrayBuffer(1001), 46 -> ArrayBuffer(1001), 49 -> ArrayBuffer(1001), 40 -> ArrayBuffer(1001), 13 -> ArrayBuffer(1001), 4 -> ArrayBuffer(1001), 22 -> ArrayBuffer(1001), 31 -> ArrayBuffer(1001), 16 -> ArrayBuffer(1001), 7 -> ArrayBuffer(1001), 43 -> ArrayBuffer(1001), 25 -> ArrayBuffer(1001), 34 -> ArrayBuffer(1001), 10 -> ArrayBuffer(1001), 37 -> ArrayBuffer(1001), 1 -> ArrayBuffer(1001), 19 -> ArrayBuffer(1001), 28 -> ArrayBuffer(1001), 45 -> ArrayBuffer(1001), 27 -> ArrayBuffer(1001), 36 -> ArrayBuffer(1001), 18 -> ArrayBuffer(1001), 9 -> ArrayBuffer(1001), 21 -> ArrayBuffer(1001), 48 -> ArrayBuffer(1001), 3 -> ArrayBuffer(1001), 12 -> ArrayBuffer(1001), 30 -> ArrayBuffer(1001), 39 -> ArrayBuffer(1001), 15 -> ArrayBuffer(1001), 42 -> ArrayBuffer(1001), 24 -> ArrayBuffer(1001), 6 -> ArrayBuffer(1001), 33 -> ArrayBuffer(1001), 0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,534] INFO [KafkaApi-1001] Auto creation of topic __consumer_offsets with 50 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,547] INFO Creating topic TestTopic1 with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,557] INFO [KafkaApi-1001] Auto creation of topic TestTopic1 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,906] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(__consumer_offsets-22, __consumer_offsets-30, __consumer_offsets-8, __consumer_offsets-21, __consumer_offsets-4, __consumer_offsets-27, __consumer_offsets-7, __consumer_offsets-9, __consumer_offsets-46, __consumer_offsets-25, __consumer_offsets-35, __consumer_offsets-41, __consumer_offsets-33, __consumer_offsets-23, __consumer_offsets-49, __consumer_offsets-47, __consumer_offsets-16, __consumer_offsets-28, __consumer_offsets-31, __consumer_offsets-36, __consumer_offsets-42, __consumer_offsets-3, __consumer_offsets-18, __consumer_offsets-37, __consumer_offsets-15, __consumer_offsets-24, __consumer_offsets-38, __consumer_offsets-17, __consumer_offsets-48, __consumer_offsets-19, __consumer_offsets-11, __consumer_offsets-13, __consumer_offsets-2, __consumer_offsets-43, __consumer_offsets-6, __consumer_offsets-14, __consumer_offsets-20, __consumer_offsets-0, __consumer_offsets-44, __consumer_offsets-39, __consumer_offsets-12, __consumer_offsets-45, __consumer_offsets-1, __consumer_offsets-5, __consumer_offsets-26, __consumer_offsets-29, __consumer_offsets-34, __consumer_offsets-10, __consumer_offsets-32, __consumer_offsets-40) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:37,979] INFO [Log partition=__consumer_offsets-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:37,991] INFO Created log for partition __consumer_offsets-0 in /bitnami/kafka/data/__consumer_offsets-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:37,992] INFO [Partition __consumer_offsets-0 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-0 (kafka.cluster.Partition)
[2021-04-07 18:06:37,994] INFO [Partition __consumer_offsets-0 broker=1001] Log loaded for partition __consumer_offsets-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,011] INFO [Log partition=__consumer_offsets-29, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)

然后我的日志中有另一系列此类消息。

[2021-04-07 18:06:38,563] INFO Created log for partition __consumer_offsets-13 in /bitnami/kafka/data/__consumer_offsets-13 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-13 (kafka.cluster.Partition)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] Log loaded for partition __consumer_offsets-13 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,577] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-22 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-25 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-28 (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,589] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-22 in 12 milliseconds, of which 2 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,596] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-25 in 17 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,597] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-28 in 18 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,638] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(TestTopic1-0) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:38,643] INFO [Log partition=TestTopic1-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:38,644] INFO Created log for partition TestTopic1-0 in /bitnami/kafka/data/TestTopic1-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] No checkpointed highwatermark is found for partition TestTopic1-0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] Log loaded for partition TestTopic1-0 with initial high watermark 0 (kafka.cluster.Partition)

我在这里看不到任何与消费者有关的东西。

请注意,这只是一个开发服务器。我们应该用它作为概念证明,看看 Kafka 是否适合我们,看看我们是否会在 prod 中使用它。

任何帮助将不胜感激,因为我们真的希望能够使其工作并在生产中使用它。

标签: pythondockerapache-kafkakafka-consumer-api

解决方案


安装在远程服务器上。

然后您需要在 中宣传该服务器的地址KAFKA_CFG_ADVERTISED_LISTENERS仅端口映射是不够的

超时是因为引导协议返回了广告地址,所以您的远程消费者正试图从localhost:9093

您的生产者也会有类似的问题,但您并没有刷新生产者以实际发送数据


如果您使用 Docker 编排平台进行生产,则需要解决其他网络配置


推荐阅读