apache-kafka - 用于数据摄取的 Kafka 生产者配置
问题描述
我目前正在使用我们基于 Kafka 的数据分析管道从源系统中摄取大量日志文件。在摄取期间,我不在生产者中使用任何类型的延迟/暂停,我只是从日志文件(包含 JSON)中读取并将其发送到 Kafka。我的生产者有以下配置。
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [1.2.3.184:9092, 1.2.3.185:9092, 1.2.3.186:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-1
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = all
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 1
使用此配置,我丢失了几乎 1/3 的消息,并出现以下错误
7:27:14.053 [kafka-producer-network-thread | producer-1] ERROR com.abc.telemetry.service.KafkaService - Batch containing 39 record(s) expired due to timeout while requesting metadata from brokers for
我计划为我的用例更新 linger.ms、batch.size。还有什么我可以微调以启用此摄取管道而不会丢失任何数据的东西吗?
干杯!
解决方案
我见过多个关于通过 Kafka 发送大量数据的问题。因此,我想尝试一下。
Kafka 不适合发送大量有效载荷/消息。您应该将其视为分布式消息总线,它为您提供分布式系统的所有权限。
由于以下原因,Kafka 限制了可以发送的消息的大小
- 巨大的消息增加了代理的内存压力。
- 大消息会减慢代理的速度,并且处理它们非常昂贵。
解决方案:
- 您可以很好地使用
Reference Based Messaging
将大量消息的位置发送给消费者的位置,而不是按原样发送大量数据。这将允许您使用外部数据存储的功能,并减少 Kafka 代理的压力。 - 您还可以
chunk
将数据在线发送re-assemble
到接收器。
推荐阅读
- c# - 在使用 V4 C# 开发的聊天机器人的自适应卡中单击提交后,如何解决与网络聊天频道中的 500 错误代码相关的问题?
- thingsboard - 如何将现有的 MQTT 代理与 Thingsboard 连接
- java - 如何使用 Apache Commons 解压缩 BZIP(不是 BZIP2)
- xml - 如何在lxml中获取范围内的元素
- azure-functions - Azure 函数/WebJobs 扩展如何注册 http 端点?
- angular - 何时使用@ViewChild、@Input 和@Output?
- powershell - 用于“监视”新文件的目录并移动到新位置的 Powershell 脚本
- java - 调用私有静态 void 方法
- iis - IIS Rewrite 模块不会在第一次加载页面时重定向
- c# - 关闭对话框后无法关注任何控件