自学内容网 自学内容网

RabbitMQ-基本使用

1 概述

RabbitMQ中的几个基本概念:

(1)信道(channel):信道是消息的生产者、消费者和服务器之间进行通信的虚拟连接。为什么叫“虚拟连接”呢?因为TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟信道。我们尽量重复使用TCP连接,而信道是可以用完就关闭的。

(2)队列(queue):队列是用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取消息。

(3)交换机(exchange):交换机用于把消息路由到一个或者多个队列中。

RabbitMQ介绍:

1 基于erlang 语言开发--- 高并发语言

2 索尼 爱立信 开发的;

3 依赖于erlang 环境;

4 功能全面

5 可视化的插件

2 环境准备

1 VS2022

2 .NET7平台

3 Docker

4 RabbitMQ

5 Linux的云服务器

3 Docker下安装RabbitMQ

docker search rabbitmq:management
docker pull rabbitmq:management
docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

拉取时建议使用management,同时会安装相应环境插件。

Rabbitmq默认端口是:15672—可视化界面,5672----操作数据的时候---通过这个端口去链接。

容器运行成功就可以访问RabbitMQ页面了,地址:http://192.168.xxx.xxx:15672,默认账号密码guest。

4 C#.Net如何使用RabbitMQ

程序驱动

1 Nuget引入

2 连接接RabbitMQ

3 写入消息

4 定义接受方,消费消息

5 构建生产者消费者模型

1 生产者写入消息到队列

2 消费者获取消息消费掉

6 消息入队全过程

1生产者打开RabbitMq链接

2声明队列

3声明交换机

4绑定队列和交换机

5消息进入到交换机---交换机转发给队列

6消息成功入队

7 落地生产者消费者模型

.NET程序连接RabbitMq

Nuget引入:RabbitMq连接帮助类库 RabbitMQ.Client 6.5.0。

生产者:

{
    //.NET程序连接RabbitMq
    //1.Nuget引入 RabbitMq连接帮助类库--RabbitMQ Client 
    string dir = AppDomain.CurrentDomain.BaseDirectory;
    //当前生产者名称
    string? producerName = Path.GetFileName(Path.GetDirectoryName(dir));

    //重点:
    //链接工厂;帮助配置一个完整的链接
    ConnectionFactory factory = new ConnectionFactory(); //创建链接的工厂 
    //RabbitMQ服务所在地址
    factory.HostName = UrlConfig.Rabbitmq_Url;
    //用户名    
    factory.UserName = UrlConfig.User;
    //密码  
    factory.Password = UrlConfig.Password;

    //程序调用默认的端口是:5672   
    factory.Port = 5672;

    //创建链接
    using (IConnection connection = factory.CreateConnection())
    {
        //创建信道  --高速公路---单车道
        using (IModel channel = connection.CreateModel())
        {

            //创建一个队列---根据名称来判断,如果队列名称存在,就不操作,
            //如果不存在,就根据名字创建一个队列出来。
            channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true,
                                 exclusive: false, autoDelete: false, arguments: null);

            //创建一个交换机--后面说
            channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange",
                                    type: ExchangeType.Direct, durable: true, 
                                    autoDelete: false, arguments: null);

            //交换机绑定队列
            channel.QueueBind(queue: "OnlyProducerMessage",
                              exchange: "OnlyProducerMessageExChange",
                              routingKey: string.Empty, arguments: null);


            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine($"=========生产者:{producerName} 已准备就绪=========");
            int i = 1;
            {
                while (true)
                {
                    string message = $"{producerName} : 学习.NET技术_{i}";
                    //smpq协议,需要传递数据格式微byte数组
                    byte[] body = Encoding.UTF8.GetBytes(message);

                    //用指定的频道去发送消息
                    channel.BasicPublish(
                        exchange: "OnlyProducerMessageExChange",
                        routingKey: string.Empty,
                        basicProperties: null,
                        body: body);
                    Console.WriteLine($"=========消息:{message} 已发送=========");
                    i++;
                    Thread.Sleep(500);
                }
            }
        }
    }
}

消费者:

string dir = AppDomain.CurrentDomain.BaseDirectory;
//当前生产者名称
string? consumerName = Path.GetFileName(Path.GetDirectoryName(dir));

ConnectionFactory factory = new ConnectionFactory(); //创建链接的工厂
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码  
factory.Port = 5672;

using (IConnection connection = factory.CreateConnection()) //创建链接
{
    using (var channel = connection.CreateModel())
    {
        Console.ForegroundColor = ConsoleColor.Green;

        //消费者准备就绪
        try
        {
            //事件驱动模式
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //接受到消息之后,触发事件--就会触发给事件注册的动作;
            consumer.Received += (model, ea) =>   //接受消息处理的事件;
            {
                ReadOnlyMemory<byte> body = ea.Body;
                string message = Encoding.UTF8.GetString(body.ToArray());

                Console.WriteLine($"{consumerName}: 接受消息: {message}");
            };

            //消息确认----自动确认---同时也明确我这里是接收哪个队列的消息
            channel.BasicConsume(queue: "OnlyProducerMessage", autoAck: true,
                consumer: consumer);


            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

原文地址:https://blog.csdn.net/nullcodeworld/article/details/145127702

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!