1 概述
RabbitMQ中的几个基本概念:
(1)信道(channel):信道是消息的生产者、消费者和服务器之间进行通信的虚拟连接。为什么叫“虚拟连接”呢?因为TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟信道。我们尽量重复使用TCP连接,而信道是可以用完就关闭的。
(2)队列(queue):队列是用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取消息。
(3)交换机(exchange):交换机用于把消息路由到一个或者多个队列中。
RabbitMQ介绍:
1 基于erlang 语言开发--- 高并发语言
2 索尼 爱立信 开发的;
3 依赖于erlang 环境;
4 功能全面
5 可视化的插件
2 环境准备
1 VS2022
2 .NET7平台
3 Docker
4 RabbitMQ
5 Linux的云服务器
3 Docker下安装RabbitMQ
docker search rabbitmq:management
docker pull rabbitmq:management
docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
拉取时建议使用management,同时会安装相应环境插件。
Rabbitmq默认端口是:15672—可视化界面,5672----操作数据的时候---通过这个端口去链接。
容器运行成功就可以访问RabbitMQ页面了,地址:http://192.168.xxx.xxx:15672,默认账号密码guest。
4 C#.Net如何使用RabbitMQ
程序驱动
1 Nuget引入
2 连接接RabbitMQ
3 写入消息
4 定义接受方,消费消息
5 构建生产者消费者模型
1 生产者写入消息到队列
2 消费者获取消息消费掉
6 消息入队全过程
1生产者打开RabbitMq链接
2声明队列
3声明交换机
4绑定队列和交换机
5消息进入到交换机---交换机转发给队列
6消息成功入队
7 落地生产者消费者模型
.NET程序连接RabbitMq
Nuget引入:RabbitMq连接帮助类库 RabbitMQ.Client 6.5.0。
生产者:
{
//.NET程序连接RabbitMq
//1.Nuget引入 RabbitMq连接帮助类库--RabbitMQ Client
string dir = AppDomain.CurrentDomain.BaseDirectory;
//当前生产者名称
string? producerName = Path.GetFileName(Path.GetDirectoryName(dir));
//重点:
//链接工厂;帮助配置一个完整的链接
ConnectionFactory factory = new ConnectionFactory(); //创建链接的工厂
//RabbitMQ服务所在地址
factory.HostName = UrlConfig.Rabbitmq_Url;
//用户名
factory.UserName = UrlConfig.User;
//密码
factory.Password = UrlConfig.Password;
//程序调用默认的端口是:5672
factory.Port = 5672;
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道 --高速公路---单车道
using (IModel channel = connection.CreateModel())
{
//创建一个队列---根据名称来判断,如果队列名称存在,就不操作,
//如果不存在,就根据名字创建一个队列出来。
channel.QueueDeclare(queue: "OnlyProducerMessage", durable: true,
exclusive: false, autoDelete: false, arguments: null);
//创建一个交换机--后面说
channel.ExchangeDeclare(exchange: "OnlyProducerMessageExChange",
type: ExchangeType.Direct, durable: true,
autoDelete: false, arguments: null);
//交换机绑定队列
channel.QueueBind(queue: "OnlyProducerMessage",
exchange: "OnlyProducerMessageExChange",
routingKey: string.Empty, arguments: null);
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"=========生产者:{producerName} 已准备就绪=========");
int i = 1;
{
while (true)
{
string message = $"{producerName} : 学习.NET技术_{i}";
//smpq协议,需要传递数据格式微byte数组
byte[] body = Encoding.UTF8.GetBytes(message);
//用指定的频道去发送消息
channel.BasicPublish(
exchange: "OnlyProducerMessageExChange",
routingKey: string.Empty,
basicProperties: null,
body: body);
Console.WriteLine($"=========消息:{message} 已发送=========");
i++;
Thread.Sleep(500);
}
}
}
}
}
消费者:
string dir = AppDomain.CurrentDomain.BaseDirectory;
//当前生产者名称
string? consumerName = Path.GetFileName(Path.GetDirectoryName(dir));
ConnectionFactory factory = new ConnectionFactory(); //创建链接的工厂
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (IConnection connection = factory.CreateConnection()) //创建链接
{
using (var channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
//消费者准备就绪
try
{
//事件驱动模式
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接受到消息之后,触发事件--就会触发给事件注册的动作;
consumer.Received += (model, ea) => //接受消息处理的事件;
{
ReadOnlyMemory<byte> body = ea.Body;
string message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"{consumerName}: 接受消息: {message}");
};
//消息确认----自动确认---同时也明确我这里是接收哪个队列的消息
channel.BasicConsume(queue: "OnlyProducerMessage", autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}