自学内容网 自学内容网

RabbitMQ-死信队列

死信,就是无法被消费的消息,一般来说生产者将消息投递 到broker或者直接到队列里了,消费者从队列取出消息进行消费。

但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有死信队列。

死信队列还是队列---只是用来接受特殊的消息----没有被正常消费的消息。 没有被确认的消息;

应用场景

为了保证订单业务的数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。

用户在商城下单成功并点击支付后,在指定时间内未支付将自动失效死信队列的形成场景

1、消息TTL过期

2、队列达到最大长度(队列满了,无法再添加数据到mq中)

3、消息被拒绝(basic.reject或basic.nack)并且requeue=false.

描述:

Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-deadletter-routing-key(死信路由key)指向Q2(队列2)

P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发 特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息 Q2的消息。

案例:

//死信交换机
string dlxexChange = "dlx.exchange";
//死信队列
string dlxQueueName = "dlx.queue";
//消息交换机
string exchange = "direct-exchange";
//消息队列
string queueName = "queue_a";

using (IConnection connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {

        //创建死信队列和路由
        {
            //创建死信交换机
            channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct,
                durable: true, autoDelete: false);
            //创建死信队列
            channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false,
                autoDelete: false);
            //死信队列绑定死信交换机
            channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);
        }

        // 创建消息交换机
        channel.ExchangeDeclare(exchange, type: ExchangeType.Direct,
            durable: true, autoDelete: false);
        Dictionary<string, object> arguments = new Dictionary<string, object>
        {
            //设置当前队列的DLX(死信交换机)
            { "x-dead-letter-exchange",dlxexChange}, 
            //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
            { "x-dead-letter-routing-key",dlxQueueName},
        };
        //创建消息队列,并指定死信队列
        channel.QueueDeclare(queueName, durable: true, exclusive: false,
            autoDelete: false, arguments);
        //消息队列绑定消息交换机
        channel.QueueBind(queueName, exchange, routingKey: queueName);

        string message = "hello rabbitmq message";
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;
        properties.Expiration = "10000"; //消息的有效期为10秒

        //发布消息
        channel.BasicPublish(exchange: exchange, routingKey: queueName,
            basicProperties: properties, body: Encoding.UTF8.GetBytes(message));
        Console.WriteLine($"向队列:{queueName}发送消息:{message}");
    }
}

原文地址:https://blog.csdn.net/nullcodeworld/article/details/145127708

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!