1 交换机
1 工作生活中的交换机/路由器-------转发网络信号,且只是转发 网络信号。
2 可以控制黑名单,隐藏网络,权限控制---除了信号不是他提 供的,他只是转发信号服务外,其他的事儿,他是可以控制 的。
RabbitMq中的交换机--------转发数据消息
2 交换机核心设计
有了交换机的设计之后---交换机就决定了消息究竟往哪儿走. 程序设计的角度:
结构型设计模式: 代理模式, 适配器模式。
为什么要有这个设计呢?
消息如何入队, 进入那个队列, 就可以有交换机来决定了.
就可以在交换机这里定义一些规则;
就可以根据不同的业务场景, 来直接选用各自的规则. 更好的满 足业务需求;
业务需求是很多的。 规则定义,当然是为了更好的满足业务需 求了; 规则也可以复用;
3 RabbiMq交换机分类
交换机是负责消息入队列的,决定消息如何进入队列
1 Direct类型交换机
2 Topic类型交换机
3 Fanout类型交换机
4 Headers类型交换机
各自的类型就是各自一个入队规则. 为了更好满足业务的需求.
4 Direct交换机
Direct类型交换机
定点发送
规则:
1 交换机和队列必须建立联系,联系—绑定关系---routingkey(唯一标识)
2 生产者在发送消息的时候,也需要给定一个标识
3 交换机收到消息后,用生产者传递过来的routingkey去和交换机与 队列绑定标识做匹配,如果匹配到了;就把消息转发到匹配到的队列中去;
Direct案例实操
写日志
1 日志分为四种类型 info error warning debug
2 异常---敏感日志----特殊处理
3 全日制也需要记录
记录两部分日志;---日志记录.高并发执行; 思路:
1 定义两个队列,一个队列接受异常的日志,一个队列接受全部日志 1 定义一个Direct类型的交换机,把交换机和队列绑定
2 绑定的时候:接受异常消息的队列和交换机绑定一个routingkey-error
3 接受全部日志消息的队列,绑定四个routingkey
4 分别把info、 error、 warning、 debug都绑定建立
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
#region 声明路由和队列
//声明一个队列用来接受所有的日志
//DirectExchangeLogAllQueue:记录所有的日志
channel.QueueDeclare(queue: "DirectExchangeLogAllQueue", durable: true,
exclusive: false, autoDelete: false, arguments: null);
//声明一个队列用来接受Error类型的日志
//DirectExchangeErrorQueue:专门用来记录敏感的日志消息 ---Error
channel.QueueDeclare(queue: "DirectExchangeErrorQueue", durable: true,
exclusive: false, autoDelete: false, arguments: null);
//声明一个路由 type: ExchangeType.Direct Direct类型的路由
channel.ExchangeDeclare(type: ExchangeType.Direct, exchange: "DirectExChange",
durable: true, autoDelete: false, arguments: null);
#endregion
//日志四种分类
string[] logtypes = new string[] { "debug", "info", "warn", "error" };
foreach (string logtype in logtypes)
{
//把交换机和队列 DirectExchangeLogAllQueue 绑定,
//把四种类型的日志 字符串都作为routingkey
channel.QueueBind(queue: "DirectExchangeLogAllQueue",
exchange: "DirectExChange", routingKey: logtype);
}
channel.QueueBind(queue: "DirectExchangeErrorQueue",
exchange: "DirectExChange", routingKey: "error");
//模拟一堆日志消息
List<LogMsgModel> logList = new List<LogMsgModel>();
for (int i = 1; i <= 100; i++)
{
if (i % 4 == 0)
{
logList.Add(new LogMsgModel()
{ LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") });
}
if (i % 4 == 1)
{
logList.Add(new LogMsgModel()
{ LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") });
}
if (i % 4 == 2)
{
logList.Add(new LogMsgModel()
{ LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") });
}
if (i % 4 == 3)
{
logList.Add(new LogMsgModel()
{ LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") });
}
}
//logList 日志消息一共应该是100条 四种类型的消息 各自25条
Console.WriteLine("生产者发送100条日志信息");
//发送日志信息
foreach (var log in logList)
{
channel.BasicPublish(exchange: "DirectExChange",routingKey: log.LogType,
basicProperties: null, body: log.Msg);
Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} 已发送~~");
}
}
}
5 Fanout交换机
Fanout类型交换机
阳光普照,所有和当前交换机建立联系的队列都可以收到消息;
规则:
1 交换机和队列必然要建立联系---不需要标识了,只要建立联 系即可
2 只要是建立了联系以后,只要是经过这个交换机的消息,任 何和这个交换机建立联系的队列都可以收到消息
适合啥场景? 通知、 广播、发布订阅等, 一个点如果想要发到多个点;
Fanout案例分析
总公司向分公司发送通知,总公司武汉,分公司北京、上海、成都、杭州。
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "FanoutExchange_ShangHai", durable: true,
exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "FanoutExchange_BeiJing", durable: true,
exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "FanoutExchange_Hangzhou", durable: true,
exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "FanoutExchange_ChengDu", durable: true,
exclusive: false, autoDelete: false, arguments: null);
//声明一个Fanout 类型的交换机
channel.ExchangeDeclare(exchange: "FanoutExchange", type: ExchangeType.Fanout,
durable: true, autoDelete: false, arguments: null);
channel.QueueBind(queue: "FanoutExchange_ShangHai", exchange: "FanoutExchange",
routingKey: string.Empty, arguments: null);
channel.QueueBind(queue: "FanoutExchange_BeiJing", exchange: "FanoutExchange",
routingKey: string.Empty, arguments: null);
channel.QueueBind(queue: "FanoutExchange_Hangzhou", exchange: "FanoutExchange",
routingKey: string.Empty, arguments: null);
channel.QueueBind(queue: "FanoutExchange_ChengDu", exchange: "FanoutExchange",
routingKey: string.Empty, arguments: null);
//在控制台输入消息,按enter键发送消息
int i = 1;
while (true)
{
//Console.WriteLine("请输入通知~~");
//var message = Console.ReadLine();
var message = $"这里是一条测试消息。。通知{i}";
var body = Encoding.UTF8.GetBytes(message);
//基本发布
channel.BasicPublish(exchange: "FanoutExchange", routingKey: "",
basicProperties: null,body: body);
Console.WriteLine($"通知【{message}】已发送到队列");
Thread.Sleep(800);
i++;
}
}
}
6 Topic交换机
Topic类型交换机
模糊匹配;绑定关系的时候,支持了通配符;
发送消息的时候,带的routingkey 可以和通配符模糊匹配;
1 Topic交换机和Derect交换机很像,交换机和队列都必须绑定 且必须给定一个标识
2 标识可以支持通配符
3 “#”号和“*”号的通配符
4 “#”号可以匹配任何一串字符串
5 “*”号可以一串字符串
Topic案例
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
//声明一个ExchangeType.Topic类型的路由
channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic,
durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false,
autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false,
autoDelete: false, arguments: null);
channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange",
routingKey: "China.#", arguments: null);
channel.QueueBind(queue: "newsQueue", exchange: "TopicExchange",
routingKey: "#.news", arguments: null);
{
string message = "来自中国的新闻消息。。。。";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.news",
basicProperties: null, body: body);
Console.WriteLine($"消息【{message}】已发送到队列");
//ChinaQueue :应该是几消息
//newsQueue:应该是几消息
}
{
string message = "来自中国的天气消息。。。。";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.weather",
basicProperties: null, body: body);
Console.WriteLine($"消息【{message}】已发送到队列");
}
{
string message = "来自美国的新闻消息。。。。";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.news",
basicProperties: null, body: body);
Console.WriteLine($"消息【{message}】已发送到队列");
}
{
string message = "来自美国的天气消息。。。。";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "usa.weather",
basicProperties: null, body: body);
Console.WriteLine($"消息【{message}】已发送到队列");
}
}
}
7 Headers交换机
消息转发,不依赖于routingkey, 依赖于配置的header信息 "x-match","all" :header 信息中如果配置是的是 all---发送,配置的 头信息必须全部吻合。才会转发到对应的队列中去;---多个信息并且的关系"x-match","any":多个头信息只要是有一个满足就可以转发到绑定的队 列中去; 多个信息之间 或者的关系
Headers案例
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "HeaderExchange", type: ExchangeType.Headers,
durable: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "HeaderExchangeAllqueue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine("生产者准备就绪....");
channel.QueueBind(queue: "HeaderExchangeAllqueue", exchange: "HeaderExchange",
routingKey: string.Empty,
arguments: new Dictionary<string, object>
{
{ "x-match","all"},
{ "teacher","guest"},
{ "pass","123"},
});
{
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>
{
{ "teacher","guest"},
{ "pass","123"}
};
string message = "teacher和pass都相同时发送的消息";
var body = Encoding.UTF8.GetBytes(message);
//基本发布
channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty,
basicProperties: props, body: body);
Console.WriteLine($"消息【{message}】已发送");
}
{
string message = "teacher和pass有一个不相同时发送的消息";
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>()
{
{ "teacher","aaa"},
{ "pass","234"}
};
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "HeaderExchange",
routingKey: string.Empty,
basicProperties: props,
body: body);
Console.WriteLine($"消息【{message}】已发送");
}
Console.WriteLine("************************************************");
{
channel.QueueDeclare(queue: "HeaderExchangeAnyqueue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: "HeaderExchangeAnyqueue", exchange: "HeaderExchange",
routingKey: string.Empty,
arguments: new Dictionary<string, object>
{
{ "x-match","any"},
{ "teacher","guest2"},
{ "pass","123"},
});
string msg = "teacher和pass完全相同时发送的消息";
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>()
{
{ "teacher","guest2"},
{ "pass","123"}
};
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty,
basicProperties: props, body: body);
Console.WriteLine($"消息【{msg}】已发送");
}
{
string msg = "teacher和pass有一个不相同时发送的消息";
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>()
{
{ "teacher","guest2"},
{ "pass","234"}
};
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "HeaderExchange", routingKey: string.Empty,
basicProperties: props, body: body);
Console.WriteLine($"消息【{msg}】已发送");
}
}
}