死信,就是无法被消费的消息,一般来说生产者将消息投递 到broker或者直接到队列里了,消费者从队列取出消息进行消费。
但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有死信队列。
死信队列还是队列---只是用来接受特殊的消息----没有被正常消费的消息。 没有被确认的消息;
应用场景
为了保证订单业务的数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
用户在商城下单成功并点击支付后,在指定时间内未支付将自动失效死信队列的形成场景
1、消息TTL过期
2、队列达到最大长度(队列满了,无法再添加数据到mq中)
3、消息被拒绝(basic.reject或basic.nack)并且requeue=false.
描述:
Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-deadletter-routing-key(死信路由key)指向Q2(队列2)
P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发 特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息 Q2的消息。
案例:
//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
//消息交换机
string exchange = "direct-exchange";
//消息队列
string queueName = "queue_a";
using (IConnection connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//创建死信队列和路由
{
//创建死信交换机
channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct,
durable: true, autoDelete: false);
//创建死信队列
channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false,
autoDelete: false);
//死信队列绑定死信交换机
channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
}
// 创建消息交换机
channel.ExchangeDeclare(exchange, type: ExchangeType.Direct,
durable: true, autoDelete: false);
Dictionary<string, object> arguments = new Dictionary<string, object>
{
//设置当前队列的DLX(死信交换机)
{ "x-dead-letter-exchange",dlxexChange},
//设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
{ "x-dead-letter-routing-key",dlxQueueName},
};
//创建消息队列,并指定死信队列
channel.QueueDeclare(queueName, durable: true, exclusive: false,
autoDelete: false, arguments);
//消息队列绑定消息交换机
channel.QueueBind(queueName, exchange, routingKey: queueName);
string message = "hello rabbitmq message";
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Expiration = "10000"; //消息的有效期为10秒
//发布消息
channel.BasicPublish(exchange: exchange, routingKey: queueName,
basicProperties: properties, body: Encoding.UTF8.GetBytes(message));
Console.WriteLine($"向队列:{queueName}发送消息:{message}");
}
}