自学内容网 自学内容网

尚硅谷rabbitmq2024 工作模式路由篇 第11节 答疑

String exchangeName ="test_direct";
/! 创建交换机
人图全
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIREcT, b: true, b1: false, b2: false, map: null);
/1 创建队列
String queue1Name ="test_direct_queue1";
String queue2Name ="test direct queve2";
1惠明(创)&列
channel.queveDeclare(queue1Name,b: true, b1: false, b2: false, map: null);channel.queueDeclare(queue2Name,b: true,b1: false,b2: false, map: null);
/!队列绑定交换机//队列1绑定error
channel.queueBind(queue1Name,exchangeName,s2:"error");
//队列2锦定info error warning
channel.queveBind(queue2Name,exchangeName,s2:"info");
channel.gveveBind(aveve2Name,exchangeName,s2:"error"):channel.queueBind(queue2Name,exchangeName,s2:"warning");
String message ="日志信息:张三调用了delete方法,错误了,目志级别warning";
11 发送消息
channel.basicPublish(exchangeName, s1: "warning", basicProperties: null,message.getBytes());System.out.println(message);
// 释放资濛笮咸
----------发布订阅模式的时候绑定队列和交换机没有指定,error warn ifno等等,为什么这里工作队列模式(就是direct交换机)要指定这些,还有direct交换机为什么要绑定队列,不是这个关系应该有路由键决定的吗?

在 RabbitMQ 中,交换机(Exchange)和队列(Queue)的绑定关系和消息路由机制是消息传递的核心。不同类型的交换机(如 `direct`、`fanout`、`topic` 等)有不同的消息路由规则。让我们详细解释一下你的代码以及 `direct` 交换机的工作原理。

### 代码解析

首先,让我们修正并解释你的代码:

```java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DirectExchangeExample {
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_direct";

            // 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);

            // 创建队列
            String queue1Name = "test_direct_queue1";
            String queue2Name = "test_direct_queue2";

            channel.queueDeclare(queue1Name, true, false, false, null);
            channel.queueDeclare(queue2Name, true, false, false, null);

            // 队列绑定交换机
            // 队列1绑定"error"
            channel.queueBind(queue1Name, exchangeName, "error");

            // 队列2绑定"info", "error", "warning"
            channel.queueBind(queue2Name, exchangeName, "info");
            channel.queueBind(queue2Name, exchangeName, "error");
            channel.queueBind(queue2Name, exchangeName, "warning");

            String message = "日志信息:张三调用了delete方法,错误了,日志级别warning";

            // 发送消息
            channel.basicPublish(exchangeName, "warning", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
```

### `direct` 交换机的工作原理


`direct` 交换机通过路由键(routing key)将消息精确地路由到绑定了相同路由键的队列中。每个绑定(binding)包含一个队列和一个路由键。消息只有在路由键匹配绑定的路由键时,才会被路由到相应的队列。

### 为什么需要绑定队列

在 `direct` 交换机中,绑定队列和路由键的关系定义了消息的路由规则。你需要明确地将队列绑定到交换机,并指定路由键,这样交换机才能根据路由键将消息路由到正确的队列。

### 示例解释

1. **创建交换机和队列**:
   - 交换机 `test_direct` 是一个 `direct` 类型的交换机。
   - 创建了两个队列 `test_direct_queue1` 和 `test_direct_queue2`。

2. **绑定队列到交换机**:
   - `test_direct_queue1` 绑定了路由键 `error`。
   - `test_direct_queue2` 绑定了路由键 `info`、`error` 和 `warning`。

3. **发送消息**:
   - 发送了一条消息,路由键为 `warning`。
   - 根据绑定关系,消息会被路由到 `test_direct_queue2`,因为它绑定了 `warning` 路由键。

### 为什么需要指定路由键

在 `direct` 交换机中,路由键是用来精确匹配的。每个绑定需要明确的路由键,这样交换机才能将消息路由到相应的队列。不同的路由键可以用于区分不同类型的消息。例如,日志系统中可以用不同的路由键来区分不同级别的日志消息(如 `info`、`warning`、`error`)。

```java
// 绑定队列到交换机
// 队列1绑定"error"
channel.queueBind(queue


==========
为什么kafka的代码的java代码好像没这么多步骤?direct交换机的绑定工作要放到生产端吗?为什么?不同的类型的交换机分别应该把绑定的代码放到那一端?


在消息队列系统中,RabbitMQ 和 Kafka 的设计哲学和使用场景有所不同,这导致了它们在配置和使用上的差异。让我们详细讨论一下这些差异以及为什么在 RabbitMQ 中需要进行绑定,并讨论不同类型的交换机绑定代码应该放在哪一端。

### RabbitMQ vs. Kafka

#### RabbitMQ
RabbitMQ 是一个基于 AMQP 协议的消息代理,它的设计更关注消息的路由和灵活的消息传递模式。RabbitMQ 提供了多种类型的交换机(如 `direct`、`fanout`、`topic` 等),每种交换机都有不同的消息路由规则。使用 RabbitMQ 时,需要显式地声明交换机、队列,并将它们绑定在一起。

#### Kafka
Kafka 是一个分布式流处理平台,设计上更侧重于高吞吐量和持久化消息。Kafka 中没有交换机的概念,消息是按主题(topic)进行发布和订阅的。生产者将消息发送到主题,消费者从主题中读取消息。Kafka 的配置相对简单,因为它的设计更倾向于日志式存储和流处理。

### 交换机绑定在 RabbitMQ 中的重要性

在 RabbitMQ 中,交换机和队列的绑定定义了消息的路由规则。不同类型的交换机有不同的绑定和路由逻辑:

- **Direct Exchange**: 消息通过完全匹配的路由键发送到绑定的队列。
- **Fanout Exchange**: 消息广播到所有绑定的队列,忽略路由键。
- **Topic Exchange**: 消息通过模式匹配的路由键发送到绑定的队列。
- **Headers Exchange**: 通过匹配消息头属性来路由消息。

### 绑定代码应该放在哪一端

绑定代码可以放在生产端或消费端,具体取决于你的应用场景和架构设计。以下是一些建议:

#### Direct Exchange
对于 `direct` 交换机,绑定代码通常放在消费端,因为消费端知道需要接收哪些类型的消息。例如,日志系统中的不同服务可能会订阅不同级别的日志消息(如 `info`、`error`、`warning`)。

```java
// 消费端绑定代码示例
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "warning");
```

#### Fanout Exchange
对于 `fanout` 交换机,绑定代码也通常放在消费端,因为所有绑定的队列都会接收到消息,不需要路由键。

```java
// 消费端绑定代码示例
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
```

#### Topic Exchange
对于 `topic` 交换机,绑定代码通常放在消费端,因为消费端知道需要接收哪种模式的消息。例如,某个服务可能会订阅 `*.error` 或 `app.*` 这样的模式。

```java
// 消费端绑定代码示例
channel.queueBind(queue1Name, exchangeName, "*.error");
channel.queueBind(queue2Name, exchangeName, "app.*");
```

#### Headers Exchange
对于 `headers` 交换机,绑定代码通常放在消费端,因为消费端知道需要匹配哪些消息头。

```java
// 消费端绑定代码示例
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
channel.queueBind(queue1Name, exchangeName, "", headers);
```

### 为什么 Kafka 的代码步骤较少?

Kafka 的设计简洁且高效,因为它没有交换机和队列的概念,消息是直接发布到主题的。生产者和消费者只需要知道主题名称即可,不需要显式地进行绑定。Kafka 的这种设计减少了配置步骤,但也意味着它在消息路由和灵活性上不如 RabbitMQ 强大。

### 总结

- 在 RabbitMQ 中,绑定交换机和队列是必要的,因为这定义了消息的路由规则。
- 绑定代码通常放在消费端,因为消费端更清楚需要接收哪些类型的消息。
- 不同类型的交换机有不同的绑定逻辑,具体绑定代码应根据交换机类型和应用需求来放置。
- Kafka 的设计更简洁,减少了配置步骤,但也减少了路由灵活性。

希望这些解释能帮助你更好地理解 RabbitMQ 和 Kafka 的差异以及如何配置它们的绑定关系。

===========
direct的放到消费端,那如果有多个消费者呢?一个消费info,一个消费warn,那不是这些绑定交换机 队列和路由键的代码     分散到各处,如果写掉了一个、消息还会丢失吗?最佳实践是什么


你提到的场景确实是一个需要仔细考虑的问题。在 RabbitMQ 中,如果有多个消费者,每个消费者关注不同的路由键,那么绑定代码分散在各个消费者中是常见的做法。然而,这也带来了一些潜在的问题,比如代码重复、配置不一致等。

### 消息丢失问题

如果某个消费者的绑定代码遗漏了某个路由键,那么发送到该路由键的消息确实可能会找不到相应的队列,从而导致消息丢失。为了避免这种情况,以下是一些最佳实践:

### 最佳实践

1. **集中管理绑定配置**:
   - 将所有的绑定配置集中管理,例如在一个配置文件或集中管理的服务中。这样可以确保所有的绑定关系都在一个地方定义和维护,减少遗漏的风险。
   - 可以使用配置管理工具(如 Spring Boot 配置文件)来集中管理这些绑定关系。

2. **启动时自动绑定**:
   - 在应用启动时,自动执行绑定操作。这样可以确保每次启动时都会重新绑定,防止手动配置遗漏。
   - 例如,在 Spring Boot 应用中,可以使用 `@Bean` 注解来定义绑定关系。

3. **配置自动化脚本**:
   - 使用自动化脚本或工具来管理和执行绑定操作。例如,可以编写一个脚本来自动声明交换机、队列,并执行绑定操作。
   - 这种方法适用于需要频繁变更绑定关系的场景。

4. **监控和告警**:
   - 实时监控 RabbitMQ 的队列和交换机状态,确保所有绑定关系都正常工作。
   - 配置告警机制,如果发现某个队列没有绑定到交换机,或者消息积压严重,及时发送告警。

### 示例代码

以下是一个简单的示例,展示如何在 Spring Boot 中集中管理和自动绑定:

#### application.yml

```yaml
rabbitmq:
  exchanges:
    direct: test_direct
  queues:
    - name: test_direct_queue1
      bindings:
        - routingKey: error
    - name: test_direct_queue2
      bindings:
        - routingKey: info
        - routingKey: error
        - routingKey: warning
```

#### RabbitMQConfig.java

```java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("test_direct", true, false);
    }

    @Bean
    public Queue queue1() {
        return new Queue("test_direct_queue1", true);
    }

    @Bean
    public Queue queue2() {
        return new Queue("test_direct_queue2", true);
    }

    @Bean
    public Binding binding1(Queue queue1, DirectExchange directExchange) {
        return BindingBuilder.bind(queue1).to(directExchange).with("error");
    }

    @Bean
    public Binding binding2(Queue queue2, DirectExchange directExchange) {
        return BindingBuilder.bind(queue2).to(directExchange).with("info");
    }

    @Bean
    public Binding binding3(Queue queue2, DirectExchange directExchange) {
        return BindingBuilder.bind(queue2).to(directExchange).with("error");
    }

    @Bean
    public Binding binding4(Queue queue2, DirectExchange directExchange) {
        return BindingBuilder.bind(queue2).to(directExchange).with("warning");
    }
}
```

#### 消费者代码

```java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQConsumer {

    @RabbitListener(queues = "test_direct_queue1")
    public void handleErrorMessages(String message) {
        System.out.println("Received error message: " + message);
    }

    @RabbitListener(queues = "test_direct_queue2")
    public void handleInfoMessages(String message) {
        System.out.println("Received info/warning message: " + message);
    }
}
```

### 总结

- **集中管理**:将绑定关系集中管理,避免分散在各个消费者中。
- **自动绑定**:在应用启动时自动执行绑定操作,确保配置一致。
- **自动化脚本**:使用自动化脚本管理和执行绑定操作,适用于频繁变更的场景。
- **监控和告警**:实时监控 RabbitMQ 状态,配置告警机制,防止遗漏。

通过这些最佳实践,可以有效降低消息丢失的风险,并确保系统的健壮性和可维护性。

=====topic类型交换机里面#和*怎么用的?通配符怎么使用?

在 RabbitMQ 中,`topic` 类型的交换机允许使用通配符来实现更复杂和灵活的消息路由。通配符主要有两个:`*` 和 `#`。

### 通配符的使用

- `*` (星号) 匹配一个单词。
- `#` (井号) 匹配零个或多个单词。

#### 示例解释

假设我们有一个 `topic` 交换机,路由键(routing key)使用点号(.)分隔单词。以下是一些示例:

1. **使用 `*` 通配符**:
   - 路由键:`log.*.error`
   - 绑定队列:`queue1`
   - 这个绑定会匹配以下路由键:
     - `log.user.error`
     - `log.system.error`
   - 不匹配以下路由键:
     - `log.error`
     - `log.user.system.error`

2. **使用 `#` 通配符**:
   - 路由键:`log.#`
   - 绑定队列:`queue2`
   - 这个绑定会匹配以下路由键:
     - `log.user.error`
     - `log.system.info`
     - `log.user.system.error`
     - `log.error`
   - 匹配所有以 `log.` 开头的路由键。

3. **组合使用**:
   - 路由键:`*.orange.*`
   - 绑定队列:`queue3`
   - 这个绑定会匹配以下路由键:
     - `quick.orange.rabbit`
     - `lazy.orange.fox`
   - 不匹配以下路由键:
     - `quick.orange`
     - `quick.brown.fox`
     - `lazy.orange.male.rabbit`

### 示例代码

以下是一个完整的示例,展示如何使用 `topic` 交换机和通配符进行消息路由。

#### RabbitMQConfig.java

```java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic_logs", true, false);
    }

    @Bean
    public Queue queue1() {
        return new Queue("queue1", true);
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue2", true);
    }

    @Bean
    public Queue queue3() {
        return new Queue("queue3", true);
    }

    @Bean
    public Binding binding1(Queue queue1, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue1).to(topicExchange).with("log.*.error");
    }

    @Bean
    public Binding binding2(Queue queue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue2).to(topicExchange).with("log.#");
    }

    @Bean
    public Binding binding3(Queue queue3, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue3).to(topicExchange).with("*.orange.*");
    }
}
```

#### 消费者代码

```java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQConsumer {

    @RabbitListener(queues = "queue1")
    public void handleQueue1Messages(String message) {
        System.out.println("Queue1 received: " + message);
    }

    @RabbitListener(queues = "queue2")
    public void handleQueue2Messages(String message) {
        System.out.println("Queue2 received: " + message);
    }

    @RabbitListener(queues = "queue3")
    public void handleQueue3Messages(String message) {
        System.out.println("Queue3 received: " + message);
    }
}
```

#### 生产者代码

```java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/send")
public class RabbitMQProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/log/error")
    public String sendErrorLog() {
        String message = "This is an error log";
        rabbitTemplate.convertAndSend("topic_logs", "log.user.error", message);
        return "Sent: " + message;
    }

    @GetMapping("/log/info")
    public String sendInfoLog() {
        String message = "This is an info log";
        rabbitTemplate.convertAndSend("topic_logs", "log.system.info", message);
        return "Sent: " + message;
    }

    @GetMapping("/log/orange")
    public String sendOrangeLog() {
        String message = "This is an orange log";
        rabbitTemplate.convertAndSend("topic_logs", "quick.orange.rabbit", message);
        return "Sent: " + message;
    }
}
```

### 总结

- **`*` 通配符**:匹配一个单词。
- **`#` 通配符**:匹配零个或多个单词。
- 通过使用通配符,可以实现灵活的消息路由,满足不同的业务需求。
- 绑定配置可以集中管理,确保配置一致,减少维护成本。

希望这些示例和解释能帮助你更好地理解和使用 `topic` 交换机及其通配符。


原文地址:https://blog.csdn.net/2401_86733530/article/details/142931053

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!