c# - RabbitMQ c# 客户端:利用所有 cpu 内核
问题描述
我正在使用 .net core 和 rabbitMQ 客户端测试一些东西。一项服务将发布数百万行数据,另一项服务将接收并解析这些数据。每行可能需要 4-10 秒的解析时间。所以重点是
1)能够利用更多的机器来运行接收服务,并且
2)利用运行服务的机器的所有cpu核心。
所以 1 似乎很明显。使用更多服务器来解析来自队列的更多消息。但我不知道如何做第二个。
假设我想使用 3 个内核,我编写了一个简单的 .net 内核程序:
static async Task Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello1",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var tasks = new List<Task>() {
ReceiveEvent(channel, 1),
ReceiveEvent(channel, 2),
ReceiveEvent(channel, 3)
};
await Task.WhenAll(tasks);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
//
channel.Close();
connection.Close();
}
}
private static async Task ReceiveEvent(IModel channel, int consumerNo)
{
var consumer = new AsyncEventingBasicConsumer(channel);
int i = 1;
consumer.Received += async (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received {i++} lines in consumer {consumerNo} --- {message}");
Thread.Sleep(500 * consumerNo);
//
channel.BasicAck(ea.DeliveryTag, false);
await Task.Yield();
};
channel.BasicConsume(queue: "hello1",
autoAck: false,
consumer: consumer);
}
无论我运行多少次,只有 1 号消费者在捕捉事件。我是否需要在 Task.WhenAll 上拨打此类电话?或者如果事件被异步触发,.net 核心运行时是否会利用所有 cpu 核心?
非常感谢
解决方案
根据我对 RabbitMQ 的经验,当收到一条消息时,会创建一个消息侦听器线程。在处理此消息时,队列不会将另一条消息推送到侦听器。在您的情况下,您必须首先收到消息,然后才能在多个线程中处理。
推荐阅读
- python - 如何解决 jupyter 中内核工作的问题?
- google-maps - 网站通过 SDK 动态限制 Google Maps API 密钥
- python - 有没有办法让一个对象跟随另一个对象?
- matlab - 如何在matlab中将三角数作为幂
- r - 如何制作一个数值变量和一个分类变量的散点图
- python - 使用 Python 分析 http MP3 音频流
- python - 解析和分析csv文件
- javascript - 如何调用javascript并将其写在html上?
- c++ - 如何使用 MPI_Sendrecv 进行矩阵转置?
- python - python中五对角矩阵的Cholesky优化