c# - Kafka 非常高的延迟 C#
问题描述
我正在对 Apache Kafka 进行一些性能测试,以将其与 RabbitMQ 和 ActiveMQ 等其他产品进行比较。这个想法是在消息传递系统上使用它来进行代理通信。
我正在使用不同数量的发布者和订阅者以及不同的负载测试多个场景(一对一、广播和多对一)。即使在一对一的最低负载情况下,10 对代理发送 500 条消息,发送之间有 1 毫秒的延迟,我也遇到了非常高的延迟(平均约为 200 毫秒)。如果我们去 100 对,数字上升到 ~1500 毫秒。同样的事情发生在广播和多对一上。
我将 Windows 与 Kafka 2.12-2.5.0 和 zookeeper 3.6.1 与 C# .Net 客户端 Confluent.Kafka 1.4.2 一起使用。根据我发现的一些帖子,我已经尝试了一些像 LingerMs = 0 这样的属性。我有默认设置的 Kafka 和 zookeeper。
我做了一个简单的测试代码,其中发生了问题:
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace KafkaSetupAgain
{
class Program
{
static void Main(string[] args)
{
int numberOfMessages = 500;
int numberOfPublishers = 10;
int numberOfSubscribers = 10;
int timeOfRun = 30000;
List<MCVESubscriber> Subscribers = new List<MCVESubscriber>();
for (int i = 0; i < numberOfSubscribers; i++)
{
MCVESubscriber ZeroMqSubscriber = new MCVESubscriber();
new Thread(() =>
{
ZeroMqSubscriber.read(i.ToString());
}).Start();
Subscribers.Add(ZeroMqSubscriber);
}
Thread.Sleep(10000);//to make sure all subscribers started
for (int i = 0; i < numberOfPublishers; i++)
{
MCVEPublisher ZeroMqPublisherBroadcast = new MCVEPublisher();
new Thread(() =>
{
ZeroMqPublisherBroadcast.publish(numberOfMessages, i.ToString());
}).Start();
}
Thread.Sleep(timeOfRun);
foreach (MCVESubscriber Subscriber in Subscribers)
{
Subscriber.PrintMessages("file.csv");
}
}
public class MCVEPublisher
{
public void publish(int numberOfMessages, string topic)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
LingerMs = 0,
Acks = 0,
};
var producer = new ProducerBuilder<Null, string>(config).Build();
int success = 0;
int failure = 0;
Thread.Sleep(3500);
for (int i = 0; i < numberOfMessages; i++)
{
Thread.Sleep(1);
long milliseconds = System.Diagnostics.Stopwatch.GetTimestamp() / TimeSpan.TicksPerMillisecond;
var t = producer.ProduceAsync(topic, new Message<Null, string> { Value = milliseconds.ToString() });
t.ContinueWith(task => {
if (task.IsFaulted)
{
failure++;
}
else
{
success++;
}
});
}
Console.WriteLine("Success: " + success + " Failure:" + failure);
}
}
public class MCVESubscriber
{
private List<string> prints = new List<string>();
public void read(string topic)
{
var config = new ConsumerConfig()
{
BootstrapServers = "localhost:9092",
EnableAutoCommit = false,
FetchErrorBackoffMs = 1,
};
var consumerConfig = new ConsumerConfig(config);
consumerConfig.GroupId = Guid.NewGuid().ToString();
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
consumerConfig.EnableAutoCommit = false;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(new[] { topic });
while (true)
{
var consumeResult = consumer.Consume();
long milliseconds = System.Diagnostics.Stopwatch.GetTimestamp() / TimeSpan.TicksPerMillisecond;
prints.Add(consumeResult.Message.Value + ";" + milliseconds.ToString());
}
consumer.Close();
}
}
public void PrintMessages(string path)
{
Console.WriteLine("printing " + prints.Count);
File.AppendAllLines(path, prints);
}
}
}
}
有人可能是什么问题?我可以更改哪些配置来改善延迟?
谢谢,
大卫科斯塔
解决方案
Kafka 并不是真正为低延迟消息分发而构建的,而是为高可用性而构建的。它可以配置为具有较低的延迟,但您开始失去 Kafka 提供的许多优势。
下面的一些提示/评论:
另一方面
KafkaProducer
,一般来说,您希望等到有足够的消息发送,以便更有效地批处理消息。那就是linger.ms
你已经提到的属性。通常将其设置为 50 毫秒,因此通过将其设置为零,您实际上是在告诉生产者尽可能快地发送数据。这可能会使生产者更“健谈”,但您可以保证它会在收到数据后立即将数据发送到集群。然而,一旦一条消息被“生产”到 Kafka 中,它会一直等待,直到它从较低层得到一个 ACK,即 broker 已成功接收到该消息。这里有多种选择:
- 一旦消息由生产者发送,就将消息视为“已收到”。也就是说,在本地,一旦网络层完成发送,生产者就会认为它“已发送并确认”
- 等待来自您将消息发送到的领导代理的 ACK,具体取决于分配的分区,因此您至少知道一个代理拥有它。 这是默认设置。
- 等待来自您将消息发送到的领导代理的 ACK,以及来自其他代理上每个分区副本的 ACK。这意味着,如果您的集群的复制因子为 3,例如,该消息将发送到代理 1,然后它会将其复制到具有相同分区副本的代理 2 和 3,等待这些代理回复说他们收到了消息,然后才回复生产者说消息已被确认。这通常用于您永远不希望丢失一条消息的环境中,因此您始终保证在生产者继续之前会有三个消息副本。
Kafka 文档的官方acks
解释:
https://kafka.apache.org/25/documentation.html#acks
还有其他设置需要考虑,例如 kafka 生产者压缩和代理压缩设置,可能会增加更多延迟/开销,但如果您使用默认值(没有生产者压缩和producer
代理压缩中的选项),那么这些设置中应该没有额外的延迟脚步。
说了这么多,我建议你尝试将acks
生产者中的选项设置为 0,看看你的延迟是如何变化的。我的猜测是你会得到更好的延迟,但也明白不能保证你的消息实际上被正确接收和存储。不稳定的网络、网络分区等可能会导致您丢失数据。这对于您的用例可能没问题,但请确保您了解它。
推荐阅读
- prettier - 如何使更漂亮不拆分此代码?
- mysql - 如何从另一个表 MySQL 更新表中的值
- coq - Coq destruct 带有排除的前提条件
- single-sign-on - 使用 Windows 身份验证的 ADFS SSO
- javascript - 如何为付款服务员重新调整我的帐户条带 ID
- php - 我们可以为两个项目的公共文件夹使用 httpdocs 托管两个 PHP 项目吗?
- reactjs - 反应图像一次加载
- python - Pandas - 检查列中的设置值是否是另一列中设置值的子集
- python - AttributeError:“numpy.ndarray”对象没有属性“imshow”
- django - 如何让 Django `manage.py test` 识别没有 unittest 基类的 pytest 测试方法?