c# - 使用 Kafka 的消息滞后
问题描述
我使用 Kafka 作为两个 API 之间的消息代理。当我在进行一些负载测试后开始注意到消息中的一些延迟时,我尝试调整配置,然后切换回 Confluent 自己的 .NET 库。目前我将代码修剪为简单地接收和发送消息而没有其他任何内容,但是当我运行负载测试时我仍然遇到延迟,而且我目前对如何解决这个问题一无所知。谁能给我一些帮助?
这就是我设置我的消费者的方式。
public abstract class BenchmarkConsumer<T> : BackgroundService where T : IntegrationEvent
{
private readonly IConsumer<Null, string> _consumer;
private readonly IMediator _mediator;
private readonly ILogger _logger;
private readonly string _topic;
private readonly string _host;
protected BenchmarkConsumer(string host, string group, string topic, ILogger logger, IMediator mediator)
{
_host = host;
_topic = topic;
_logger = logger;
_mediator = mediator;
var consumerConfig = new ConsumerConfig
{
GroupId = group,
BootstrapServers = host,
SessionTimeoutMs = 6000,
ConsumeResultFields = "none",
QueuedMinMessages = 1000000,
SecurityProtocol = SecurityProtocol.Plaintext,
AutoOffsetReset = AutoOffsetReset.Earliest,
Acks = Acks.Leader
};
_consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();
}
private async Task CreateTopic()
{
using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = _host }).Build();
try
{
await adminClient.CreateTopicsAsync(new[]
{
new TopicSpecification {Name = _topic, ReplicationFactor = 3, NumPartitions = 6}
}
);
}
catch (CreateTopicsException e)
{
_logger.LogError($"Topic creation failed: ({_topic}): {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
}
}
public async Task StartConsumingAsync(CancellationToken cancellationToken)
{
await CreateTopic();
_consumer.Subscribe(_topic);
while (!cancellationToken.IsCancellationRequested)
{
var consumedResult = _consumer.Consume(cancellationToken);
if (consumedResult == null) continue;
var messageAsEvent = JsonSerializer.Deserialize<T>(consumedResult.Message.Value);
_logger.LogInformation($"Message deserialized: {JsonSerializer.Serialize(messageAsEvent)}");
await _mediator.Publish(messageAsEvent, CancellationToken.None);
}
}
}
这就是我设置制作人的方式:
public override void Produce(InitialOrderCreatedIntegrationEvent order)
{
var message = JsonSerializer.Serialize(order);
Producer.Produce(_kafkaConfiguration.TopicOrderProducer, new Message<Null, string> {Value = message});
_logger.LogInformation($"Message sent, OrderId: {order.OrderId}");
}
解决方案
推荐阅读
- r - 如何在地图上覆盖具有给定坐标的矩形?
- google-cloud-storage - 如何将 GCS 存储桶与实际的 GCS 文件系统资源相关联?
- c# - 字符串之间的 '+=' 是否有时会转换为 StringBuilder 调用?
- image-processing - 如何随机化图像大小
- python - Discord.py 彩虹嵌入
- powershell - Powershell - 获取 WmiObject 过滤器
- r - 如何通过匹配参考值来替换子字符串值
- sql - 创建一个看起来像书页的表格/视图
- postgresql - 如何在 Postgresql 查询中存储一个临时值,然后多次使用它?
- javascript - 用JavaScript(客户端)逐字分离句子