首页 > 解决方案 > Confluent kafka Python 客户端 Avro producer.producer() 执行没有错误,但主题中没有数据

问题描述

我的生产者没有抛出任何错误,但数据没有发送到目标主题。您能否推荐任何技术来调试这种情况。

我在同步循环中调用了 Confluent Python Avro Producer,以将数据发送到如下主题:

self.producer.produce(topic=test2, value=msg_dict)

在此调用之后,我有一段类似这样的代码来刷新队列:

num_messages_in_queue = self.producer.flush(timeout = 2.0)
print(f"flushed {num_messages_in_queue} messages from producer queue in iteration {num_iterations} ")

这执行没有任何错误。但是在此代码执行后也没有触发回调。我的制作人发起如下:

 def __init__(self,broker_url=None,topic=None,schema_registry_url=None,schema_path=None):
  try:
    with open(schema_path, 'r') as content_file:
      schema = avro.loads(content_file.read())
  except Exception as e:
    print(f"Error when trying to read avro schema file : {schema_path}")

  self.conf = {
    'bootstrap.servers': broker_url,
    'on_delivery': self.delivery_report,
    'schema.registry.url': schema_registry_url,
    'acks': -1, #This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. 
    'enable.idempotence': False, #
    "error_cb":self.error_cb
  }
  self.topic = topic
  self.schema_path = schema_path
  self.producer = AvroProducer(self.conf,default_key_schema=schema, default_value_schema=schema) 

我的回调方法如下:

def delivery_report(self, err, msg):
    print(f"began delivery_report")
    if err is None:
        print(f"delivery_report --> Delivered msg.value = {msg.value()} to topic= {msg.topic()} offset = {msg.offset} without err.")
    else:
        print(f"conf_worker AvroProducer failed to deliver message {msg.value()} to topic {self.topic}. got error= {err}") 

执行此代码后,我在模式注册表容器上查看我的主题,如下所示:

docker exec schema_registry_container kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic test2 --from-beginning

我看到这个输出:

[2020-04-03 15:48:38,064] 信息已注册 kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2020-04-03 15:48:38,742] INFO ConsumerConfig 值:auto.commit。 interval.ms = 5000 auto.offset.reset = 最早的 bootstrap.servers = [kafka:29092] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api。 timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = console-consumer- 49056 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = 真正的隔离。level = read_uncommitted key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max。 age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients. consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit。cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window .factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login。 refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl .enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key。密码 = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = 类 >>org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache .kafka.clients.consumer.ConsumerConfig) [2020-04-03 15:48:38,887] INFO Kafka 版本:2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser) [2020-04-03 15 :48:38,887] INFO Kafka commitId:bda8715f42a1a3db (org.apache.kafka.common.utils.AppInfoParser) [2020-04-03 15:48:39,221] INFO 集群 ID:KHKziPBvRKiozobbwvP1Fw (org.apache.kafka.clients.Metadata) [2020-04-03 15:48:39,224] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] 发现组协调员 kafka:29092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15:48:39,231] INFO [Consumer clientId=consumer-1, groupId =console-consumer-49056] 撤销先前分配的分区 [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15:48:39,231] INFO [Consumer clientId=consumer-1, groupId =console-consumer-49056](重新)加入组>(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2020-04-03 15:48:42,264] INFO [Consumer clientId=consumer-1,groupId=console-consumer-49056] 成功加入第 1 代组(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2020-04-03 15:48:42,267] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] 设置新分配的分区 [test2-0] >(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15:48:42,293] INFO [Consumer clientId= consumer-1, groupId=console-consumer-49056] 将分区 test2-0 的偏移量重置为偏移量 0。 >(org.apache.kafka.clients.consumer.internals.Fetcher)ConsumerCoordinator) [2020-04-03 15:48:42,293] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] 将分区 test2-0 的偏移量重置为偏移量 0。>(org.apache.kafka. clients.consumer.internals.Fetcher)ConsumerCoordinator) [2020-04-03 15:48:42,293] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] 将分区 test2-0 的偏移量重置为偏移量 0。>(org.apache.kafka. clients.consumer.internals.Fetcher)

标签: kafka-producer-apiconfluent-schema-registryconfluent-platform

解决方案


所以答案是如此琐碎,以至于令人尴尬!但它确实指出了这样一个事实,即在多层基础架构中,错误设置单个值可能会导致无声故障,追踪起来可能非常乏味。

所以问题来自于在我的 docker-compose.yml 文件中设置不正确的参数 my ,其中未设置 broker_url 的 env 变量。应用程序代码需要这个变量来引用 kafka 代理。然而,这个丢失的参数没有抛出异常,它默默地失败了。


推荐阅读