首页 > 解决方案 > Kafka Consumer 仅在两条消息堆叠时才读取消息

问题描述

我们有一个 kafka 生产者,它偶尔会产生一些消息。

我写了一个消费者来消费这些消息。问题是,消息仅在其中 2 个堆叠时才被使用。例如,如果在 13:00 生成消息,则消费者不做任何事情。如果在 13:01 产生了另一条消息,则消费者会消费这两条消息。在 kafkaTool 中,在消费者属性中,它存在一个名为 LAG 的列,当消息未被使用时为 1。我缺少这个东西的任何配置吗?

消费者配置:

16:43:04,472 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (http--0.0.0.0-8180-1) ConsumerConfig values:
        request.timeout.ms = 180001
        check.crcs = true
        retry.backoff.ms = 100
        ssl.truststore.password = null
        ssl.keymanager.algorithm = SunX509
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.key.password = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.provider = null
        sasl.kerberos.service.name = null
        session.timeout.ms = 180000
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [mtxbuctra22.prod.orange.intra:9092]
        client.id =
        fetch.max.wait.ms = 180000
        fetch.min.bytes = 1024
        key.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        auto.offset.reset = earliest
        value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        ssl.endpoint.identification.algorithm = null
        max.partition.fetch.bytes = 1048576
        ssl.keystore.location = null
        ssl.truststore.location = null
        ssl.keystore.password = null
        metrics.sample.window.ms = 30000
        metadata.max.age.ms = 300000
        security.protocol = PLAINTEXT
        auto.commit.interval.ms = 1000
        ssl.protocol = TLS
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.trustmanager.algorithm = PKIX
        group.id = ifd_006
        enable.auto.commit = true
        metric.reporters = []
        ssl.truststore.type = JKS
        send.buffer.bytes = 131072
        reconnect.backoff.ms = 50
        metrics.num.samples = 2
        ssl.keystore.type = JKS
        heartbeat.interval.ms = 3000

16:43:04,493 INFO  [io.confluent.kafka.serializers.KafkaAvroDeserializerConfig] (http--0.0.0.0-8180-1) KafkaAvroDeserializerConfig values:
        max.schemas.per.subject = 1000
        specific.avro.reader = true
        schema.registry.url = [http://mtxbuctra22.prod.orange.intra:8081]

16:43:04,498 INFO  [io.confluent.kafka.serializers.KafkaAvroDeserializerConfig] (http--0.0.0.0-8180-1) KafkaAvroDeserializerConfig values:
        max.schemas.per.subject = 1000
        specific.avro.reader = true
        schema.registry.url = [http://mtxbuctra22.prod.orange.intra:8081]

卡夫卡工具: 在此处输入图像描述

标签: javakafka-consumer-api

解决方案


弄清楚了。在 kafka 0.9.0.1 的文档中指出 fetch.min.bytes 为 1。但我有 kafka 0.9.0.0。默认值为 1024。因此,只有在 2 条消息之后,该值才被传递。将 fetch.min.bytes 更改为 1,现在可以正常工作了。


推荐阅读