自学内容网 自学内容网

RabbitMQ-交换机

1 交换机

1 工作生活中的交换机/路由器-------转发网络信号,且只是转发 网络信号。

2 可以控制黑名单,隐藏网络,权限控制---除了信号不是他提 供的,他只是转发信号服务外,其他的事儿,他是可以控制 的。

RabbitMq中的交换机--------转发数据消息

2 交换机核心设计

有了交换机的设计之后---交换机就决定了消息究竟往哪儿走. 程序设计的角度:

结构型设计模式: 代理模式, 适配器模式。

为什么要有这个设计呢?

消息如何入队, 进入那个队列, 就可以有交换机来决定了.

就可以在交换机这里定义一些规则;

就可以根据不同的业务场景, 来直接选用各自的规则. 更好的满 足业务需求;

业务需求是很多的。 规则定义,当然是为了更好的满足业务需 求了; 规则也可以复用;

3 RabbiMq交换机分类

交换机是负责消息入队列的,决定消息如何进入队列

1 Direct类型交换机

2 Topic类型交换机

3 Fanout类型交换机

4 Headers类型交换机

各自的类型就是各自一个入队规则. 为了更好满足业务的需求.

4 Direct交换机

Direct类型交换机

定点发送

规则:

1 交换机和队列必须建立联系,联系—绑定关系---routingkey(唯一标识)

2 生产者在发送消息的时候,也需要给定一个标识

3 交换机收到消息后,用生产者传递过来的routingkey去和交换机与 队列绑定标识做匹配,如果匹配到了;就把消息转发到匹配到的队列中去;

Direct案例实操

写日志

1 日志分为四种类型 info error warning debug

2 异常---敏感日志----特殊处理

3 全日制也需要记录

记录两部分日志;---日志记录.高并发执行; 思路:

1 定义两个队列,一个队列接受异常的日志,一个队列接受全部日志 1 定义一个Direct类型的交换机,把交换机和队列绑定

2 绑定的时候:接受异常消息的队列和交换机绑定一个routingkey-error

3 接受全部日志消息的队列,绑定四个routingkey

4 分别把info、 error、 warning、 debug都绑定建立

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;

using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        #region 声明路由和队列
        //声明一个队列用来接受所有的日志
        //DirectExchangeLogAllQueue:记录所有的日志
        channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true,
            exclusive: false, autoDelete: false, arguments: null);

        //声明一个队列用来接受Error类型的日志
        //DirectExchangeErrorQueue:专门用来记录敏感的日志消息 ---Error
        channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true,
            exclusive: false, autoDelete: false, arguments: null);

        //声明一个路由  type: ExchangeType.Direct  Direct类型的路由
        channel.ExchangeDeclare(type: ExchangeType.Direct, exchange: "DirectExChange",
            durable: true, autoDelete: false, arguments: null);

        #endregion 
        //日志四种分类
        string[] logtypes = new string[] { "debug", "info", "warn", "error" };

        foreach (string logtype in logtypes)
        {
            //把交换机和队列 DirectExchangeLogAllQueue 绑定,
            //把四种类型的日志 字符串都作为routingkey
            channel.QueueBind(queue: "DirectExchangeLogAllQueue",
                exchange: "DirectExChange", routingKey: logtype);
        }
        channel.QueueBind(queue: "DirectExchangeErrorQueue",
            exchange: "DirectExChange", routingKey: "error");

        //模拟一堆日志消息
        List<LogMsgModel> logList = new List<LogMsgModel>();
        for (int i = 1; i <= 100; i++)
        {
            if (i % 4 == 0)
            {
                logList.Add(new LogMsgModel()
                { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") });
            }
            if (i % 4 == 1)
            {
                logList.Add(new LogMsgModel() 
                { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") });
            }
            if (i % 4 == 2)
            {
                logList.Add(new LogMsgModel()
                { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") });
            }
            if (i % 4 == 3)
            {
                logList.Add(new LogMsgModel()
                { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") });
            }
        }

        //logList 日志消息一共应该是100条   四种类型的消息  各自25条

        Console.WriteLine("生产者发送100条日志信息");
        //发送日志信息
        foreach (var log in logList)
        {
            channel.BasicPublish(exchange: "DirectExChange",routingKey: log.LogType,
                basicProperties: null, body: log.Msg);
            Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)}  已发送~~");
        }
    }
}

5 Fanout交换机

Fanout类型交换机

阳光普照,所有和当前交换机建立联系的队列都可以收到消息;

规则:

1 交换机和队列必然要建立联系---不需要标识了,只要建立联 系即可

2 只要是建立了联系以后,只要是经过这个交换机的消息,任 何和这个交换机建立联系的队列都可以收到消息

适合啥场景? 通知、 广播、发布订阅等, 一个点如果想要发到多个点;

Fanout案例分析

总公司向分公司发送通知,总公司武汉,分公司北京、上海、成都、杭州。

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "FanoutExchange_ShangHai", durable: true, 
            exclusive: false, autoDelete: false, arguments: null);

        channel.QueueDeclare(queue: "FanoutExchange_BeiJing", durable: true, 
            exclusive: false, autoDelete: false, arguments: null);

        channel.QueueDeclare(queue: "FanoutExchange_Hangzhou", durable: true, 
            exclusive: false, autoDelete: false, arguments: null);

        channel.QueueDeclare(queue: "FanoutExchange_ChengDu", durable: true, 
            exclusive: false, autoDelete: false, arguments: null);

        //声明一个Fanout 类型的交换机
        channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout, 
            durable: true, autoDelete: false, arguments: null);


        channel.QueueBind(queue: "FanoutExchange_ShangHai", exchange: "FanoutExchange",
            routingKey: string.Empty, arguments: null);

        channel.QueueBind(queue: "FanoutExchange_BeiJing", exchange: "FanoutExchange",
            routingKey: string.Empty, arguments: null);

        channel.QueueBind(queue: "FanoutExchange_Hangzhou", exchange: "FanoutExchange",
            routingKey: string.Empty, arguments: null);

        channel.QueueBind(queue: "FanoutExchange_ChengDu", exchange: "FanoutExchange",
            routingKey: string.Empty, arguments: null);

        //在控制台输入消息,按enter键发送消息
        int i = 1;
        while (true)
        {
            //Console.WriteLine("请输入通知~~");
            //var message = Console.ReadLine();
            var message = $"这里是一条测试消息。。通知{i}";
            var body = Encoding.UTF8.GetBytes(message);
            //基本发布
            channel.BasicPublish(exchange: "FanoutExchange", routingKey: "",
                basicProperties: null,body: body);
            Console.WriteLine($"通知【{message}】已发送到队列");
            Thread.Sleep(800);
            i++;
        }

    }
}

6 Topic交换机

Topic类型交换机

模糊匹配;绑定关系的时候,支持了通配符;

发送消息的时候,带的routingkey 可以和通配符模糊匹配;

1 Topic交换机和Derect交换机很像,交换机和队列都必须绑定 且必须给定一个标识

2 标识可以支持通配符

3 “#”号和“*”号的通配符

4 “#”号可以匹配任何一串字符串

5 “*”号可以一串字符串

Topic案例

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {

        //声明一个ExchangeType.Topic类型的路由
        channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic,
            durable: true, autoDelete: false, arguments: null);

        channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false,
            autoDelete: false, arguments: null);

        channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false,
            autoDelete: false, arguments: null);

        channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange",
            routingKey: "China.#", arguments: null);

        channel.QueueBind(queue: "newsQueue", exchange: "TopicExchange",
            routingKey: "#.news", arguments: null);

        {
            string message = "来自中国的新闻消息。。。。";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.news",
                basicProperties: null, body: body);
            Console.WriteLine($"消息【{message}】已发送到队列");
            //ChinaQueue :应该是几消息
            //newsQueue:应该是几消息
        }

        {
            string message = "来自中国的天气消息。。。。";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.weather",
                basicProperties: null, body: body);
            Console.WriteLine($"消息【{message}】已发送到队列");
        }
        {
            string message = "来自美国的新闻消息。。。。";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.news",
                basicProperties: null, body: body);
            Console.WriteLine($"消息【{message}】已发送到队列");
        }
        {
            string message = "来自美国的天气消息。。。。";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.weather",
                basicProperties: null, body: body);
            Console.WriteLine($"消息【{message}】已发送到队列");
        }

    }
}

7 Headers交换机

消息转发,不依赖于routingkey, 依赖于配置的header信息 "x-match","all" :header 信息中如果配置是的是 all---发送,配置的 头信息必须全部吻合。才会转发到对应的队列中去;---多个信息并且的关系"x-match","any":多个头信息只要是有一个满足就可以转发到绑定的队 列中去; 多个信息之间 或者的关系

Headers案例

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare(exchange: "HeaderExchange", type: ExchangeType.Headers,
            durable: false, autoDelete: false, arguments: null);
        channel.QueueDeclare(queue: "HeaderExchangeAllqueue", durable: false,
            exclusive: false, autoDelete: false, arguments: null);

        Console.WriteLine("生产者准备就绪....");

        channel.QueueBind(queue: "HeaderExchangeAllqueue", exchange: "HeaderExchange",
            routingKey: string.Empty,
            arguments: new Dictionary<string, object>
            {
                { "x-match","all"},
                { "teacher","guest"},
                { "pass","123"},
            });

        {
            var props = channel.CreateBasicProperties();
            props.Headers = new Dictionary<string, object>
            {
                { "teacher","guest"},
                { "pass","123"}
            };
            string message = "teacher和pass都相同时发送的消息";
            var body = Encoding.UTF8.GetBytes(message);
            //基本发布
            channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty,
                basicProperties: props, body: body);
            Console.WriteLine($"消息【{message}】已发送");
        }
        {
            string message = "teacher和pass有一个不相同时发送的消息";
            var props = channel.CreateBasicProperties();
            props.Headers = new Dictionary<string, object>()
            {
                  { "teacher","aaa"},
                  { "pass","234"}
            };
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "HeaderExchange",
                                 routingKey: string.Empty,
                                 basicProperties: props,
                                 body: body);
            Console.WriteLine($"消息【{message}】已发送");
        }
        Console.WriteLine("************************************************");
        {

            channel.QueueDeclare(queue: "HeaderExchangeAnyqueue", durable: false,
                exclusive: false, autoDelete: false, arguments: null);
            channel.QueueBind(queue: "HeaderExchangeAnyqueue", exchange: "HeaderExchange",
                routingKey: string.Empty,
                arguments: new Dictionary<string, object>
                {
                    { "x-match","any"},
                    { "teacher","guest2"},
                    { "pass","123"},
                });

            string msg = "teacher和pass完全相同时发送的消息";
            var props = channel.CreateBasicProperties();
            props.Headers = new Dictionary<string, object>()
            {
                { "teacher","guest2"},
                { "pass","123"}
            };
            var body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty,
                basicProperties: props, body: body);
            Console.WriteLine($"消息【{msg}】已发送");
        }

        {
            string msg = "teacher和pass有一个不相同时发送的消息";
            var props = channel.CreateBasicProperties();
            props.Headers = new Dictionary<string, object>()
            {
                { "teacher","guest2"},
                { "pass","234"}
            };
            var body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty,
                basicProperties: props, body: body);
            Console.WriteLine($"消息【{msg}】已发送");
        }

    }
}

原文地址:https://blog.csdn.net/nullcodeworld/article/details/145127704

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!