消费者就是接受消息的那一端
代码逻辑:
//创建链接
private static readonly ConnectionFactory factory = new ConnectionFactory()
{
HostName = "localhost",// "127.0.0.1",
UserName = "guest",
Password = "guest1",
Port = 5679,
};
//定义消息队列交换机名称
const string ExchangeName = "Jent.Exchange1";
//定义消息队列名称
const string QueueName = "Jent.Queue1";
private static void DirectAcceptExchange()
{
//创建一个rabbitmq链接
var conn = factory.CreateConnection();
var channel= conn.CreateModel();
//创建一个路由交换器
channel.ExchangeDeclare(ExchangeName, ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//创建一个队列
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//将交换器和消息队列绑定到一起,并且指定一个routingKey,生产者就根据这个routingKey和路由名称作为推送信息的依据
channel.QueueBind(QueueName, ExchangeName, routingKey: "cusumer_key");
//一条一条的从队列中打过来(下面会确认一条打印一条过来),防止把所有信息一次打印过来后客户的突然崩溃造成信息丢失的情况
channel.BasicQos(0, 1, true);
//接收消息的消费者事件
var consumer = new EventingBasicConsumer(channel);
//使用事件机制获取消息(长链接的方式接收,一旦生产者推送消息过来以后,就会进入下面的事件)
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//在一条一条的确认打印
//返回消息确认,没确认就不会消费掉消息
channel.BasicAck(ea.DeliveryTag, false);
};
//这里的autoAck最好要为false很重要,为true当再次推送的时候有时候会推送不过来(呢里是一个大坑)
channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
}
解释:
1:EventingBasicConsumer【订阅式】
2:【QOS + Ack】服务质量 + 消息确认(如果当前消费者挂掉,那么未处理的消息将会丢失),确认一条打印一条过来可以解决呢个问题
3:BasicQos:
prefetchSize, 限制每次取的长度,如果为0就不限制
prefetchCount,每次取几条
global:是否对connection通用
4:durable,是否开启持久化exchange
5:autoDelete, 当已经没有消费者时,服务器是否可以删除该exchange