自学内容网 自学内容网

SpringBoot集成RocketMQ实现六种消息

1. 简介

        RocketMQ 支持多种消息类型以满足不同的业务需求

  1. 普通消息(Standard Message)

    1. 这是最常用的消息类型,适用于大多数场景。

    2. 可以设置延迟级别(Delay Levels),但不支持消息轨迹。

  2. 顺序消息(Ordered Message)

    1. 用于需要保证消息顺序的场景,例如订单状态更新。

    2. 可以按照消息的发送顺序或者按照消息键(Message Key)的顺序来发送和消费消息。

    3. 顺序消息可以是单播或组播。

  3. 延时消息(Delayed Message)

    1. 支持不同级别的延时,例如 1s、5s、10s、1m、2m、3m 等。

    2. 消息实际发送后并不会立即被消费,而是在指定的延时时间后才可用。

    3. 可以用于需要延时处理的场景,如支付超时未支付自动取消订单。

  4. 事务消息(Transaction Message)

    1. 用于需要保证事务性的场景,确保本地事务和消息发送的原子性。

    2. 发送消息包含两个阶段:preparecommitrollback

    3. 需要用户实现本地事务逻辑,并在 prepare 阶段提交消息,在本地事务完成后再调用 commitrollback

  5. 批量消息(Batch Message)

    1. 允许一次发送多条消息,减少网络请求次数,提高吞吐量。

    2. 批量消息内部是多条独立的消息,消费时也是逐条消费。

  6. 过滤消息(Filter Message)

    1. 允许消费者订阅主题时指定标签(Tag),只消费带有指定标签的消息。

    2. 可以提高消费的效率,只关注感兴趣的消息内容。

2. 环境搭建

2.1 开发环境

JDK

1.8

Maven

3.6

rocketmq-client

4.9.6

springboot

2.7.12

docker

27.1.1

docker-compose

1.29.2

2.2 rocketmq组件(4.x架构只需要启动server和broker)

云服务器配置最好选用2核4G,如果是2核2G得话只能启动这两个容器(server和broker一共需要约1.3G内存)

在云服务器中创建一个rocketmq文件夹,进入文件夹(余下操作在文件夹中执行,隔离)

mkdir rocketmq cd rocketmq

配置config

# Configure the broker's IP address 
# 1,云服务器中,ip为服务器的地址 
# 2,在本地,ip为127.0.0.1 

echo "brokerIP1=127.0.0.1" > broker.conf

docker-compose.yml

version: '3.8'

services:
  namesrv:
    image: apache/rocketmq:4.9.6
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:4.9.6
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    volumes:
      - ./broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf

networks:
  rocketmq:
    driver: bridge

服务器需要开放端口--->server使用9876端口,broker使用10911端口

执行文件

docker-compose up -d

docker ps(查看启动容器)

2.3 引入依赖

没有使用springboot框架

<!--         https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
                <dependency>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                    <version>4.9.6</version>
                </dependency>

使用springboot集成rocketmq

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

3. 简单示例

3.1 生产者

  • DefaultMQProducer:RocketMQ提供的一个生产者类,用于发送消息。

  • SendResult:发送消息后返回的结果对象,包含发送状态和消息队列等信息。

  • Message:代表要发送的消息对象。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProducerExample {
    public static void main(String[] args) throws Exception {
        // Create producer instance and set the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // Set the Name Server address (replace with actual Name Server address)
        producer.setNamesrvAddr("110.41.55.242:9876");
        producer.setSendMsgTimeout(30000);
        producer.start();

        try {
            // Create a message instance, specifying the topic, tag, and message body
            Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ unique").getBytes());
            // Send the message and get the send result
            SendResult sendResult = producer.send(msg);
            System.out.println("Message sent: " + new String(msg.getBody()));
            System.out.println("Send result: " + sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Message sending failed.");
        } finally {
            // Shutdown the producer
            producer.shutdown();
        }
    }
}
  • 创建一个DefaultMQProducer实例,并设置生产者组名称为producer_group。生产者组是逻辑上的分组,可以一个或多个生产者属于同一个组。

  • 设置NameServer的地址和端口,这是RocketMQ用于服务发现的组件。

  • 设置发送消息的超时时间为30秒(30000毫秒)。这是生产者在发送消息时等待响应的最长时间。

  • 启动生产者实例,使其能够发送消息。

  • 创建一个Message实例,指定主题为TestTopic,标签为TagA,消息体为字符串"Hello RocketMQ unique"

  • 调用send方法发送消息,并获取发送结果SendResult

3.2 消费者

  • DefaultMQPushConsumer:RocketMQ提供的一个消费者类,用于订阅并消费消息。

  • ConsumeConcurrentlyContext:并发消费消息时的上下文对象。

  • ConsumeConcurrentlyStatus:并发消费消息后的返回状态。

  • MessageListenerConcurrently:并发消息监听器接口。

  • MessageExt:代表单个消息的扩展对象。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerExample {
    public static void main(String[] args) throws Exception {
        // Create consumer instance and set the consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        // Set the Name Server address (replace with actual Name Server address)
        consumer.setNamesrvAddr("110.41.55.242:9876");
        // Subscribe to the specified topic and tag (* means all tags)
        consumer.subscribe("TestTopic", "*");

        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // Start the consumer
        consumer.start();
        System.out.println("Consumer started.");
    }
}
  • 创建一个DefaultMQPushConsumer实例,并设置消费者组名称为consumer_group。消费者组是逻辑上的分组,可以一个或多个消费者属于同一个组。

  • 设置NameServer的地址和端口,这是RocketMQ用于服务发现的组件。

  • 订阅TestTopic主题下所有标签(*表示所有标签)的消息(对于过滤消息,需要指定TAG而不是全匹配)。

  • 实现MessageListenerConcurrently接口:创建一个新的匿名内部类实例,实现MessageListenerConcurrently接口。

  • 消费消息:重写consumeMessage方法,该方法包含业务逻辑,用于处理接收到的消息。

    • 遍历消息:遍历批量消息msgs,打印每条消息的内容。

    • 返回消费状态:返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消息消费成功。

3.3 测试结果

启动消费者

Consumer started.

启动生产者

Message sent: Hello RocketMQ unique
Send result: SendResult [sendStatus=SEND_OK, msgId=7F0000018CB018B4AAC2229057A00000, offsetMsgId=6E2937F200002A9F0000000000000242, messageQueue=MessageQueue [topic=TestTopic, brokerName=95d68505f648, queueId=1], queueOffset=2]

此时消费者控制台输出

Consumer started. Received message: Hello RocketMQ unique

4. SpringBoot集成Rocket MQ发送消息

示例代码结合项目实际运用rocketmq去做封装。

4.1 工程结构

main
├── java
│   ├── com
│   │   └── xiaokai
│   │       ├── config--->一些配置类
│   │       │   └── LogMQConfig.java--->打印mq相关信息
│   │       ├── entity--->实体类
│   │       │   ├── orderEntity.java
│   │       │   └── UserEntity.java
│   │       ├── event--->事件
│   │       │   ├── BaseEvent.java--->基础事件
│   │       │   ├── commonMessage.java--->普通消息
│   │       │   └── EventPublisher.java--->事件发布
│   │       ├── listen--->事件监听
│   │       │   └── UserCommonConsumer.java--->消费消息
│   │       ├── service--->服务
│   │       │   └── UserService.java--->调用事件发布
│   │       └── RocketMQApplication.java--->启动程序
├── resources
│   └── application.yml--->配置文件
└── test
    ├── java
    │   ├── com
    │   │   └── xiaokai
    │           ├── RocketMQTest.java--->测试

4.2 基础代码

LogMQConfig.java

package com.xiaokai.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import java.util.Arrays;

/**
 * Author:yang
 * Date:2024-10-06 17:04
 */
@Component
@Slf4j
public class LogMQConfig {

    @Autowired
    private ApplicationContext applicationContext;

    public void printLog() {
        RocketMQProperties properties = applicationContext.getBean(RocketMQProperties.class);
        String nameServer = Arrays.toString(properties.getNameServer().split(","));
        String producerGroup = properties.getProducer().getGroup();
        // 打印配置信息
        log.info("RocketMQ NameServer: {}", nameServer);
        log.info("RocketMQ Producer Group: {}", producerGroup);
    }
}

UserEntity.java

package com.xiaokai.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;


/**
 * Author:yang
 * Date:2024-09-26 10:55
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserEntity {

    private int id;

    private String name;

    private String password;

    private String iphone;

    private String address;

    private int money;

}

Order.java

package com.xiaokai.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * Author:yang
 * Date:2024-09-26 10:52
 * Description:订单实体类
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderEntity {

    private Integer userId;

    private String name;

    private Integer number;
}

BaseEvent<T>.java

package com.xiaokai.event;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

/**
 * @description 基础事件
 */
@Data
public abstract class BaseEvent<T> {

    public abstract EventMessage<T> buildEventMessage(T data);

    public abstract String topic();

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class EventMessage<T> {
        private String id;
        private Date timestamp;
        private T data;
    }

}

CommonMessage.java

package com.xiaokai.event;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Date;

/**
 * Author:yang
 * Date:2024-10-06 16:08
 * Description:普通消息
 */
@Service
public class CommonMessage extends BaseEvent<CommonMessage.SendUserMessage> {

    @Value("${rocketmq.topic.common}")
    private String topic;

    // 构建消息
    @Override
    public EventMessage<SendUserMessage> buildEventMessage(SendUserMessage data) {
        return EventMessage.<SendUserMessage>builder()
                .data(data)
                .timestamp(new Date())
                .id(RandomStringUtils.randomNumeric(10))
                .build();
    }

    @Override
    public String topic() {
        return topic;
    }

    @Builder
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class SendUserMessage{
        private int id;

        private String name;

        private String password;
    }
}

EventPublisher.java

package com.xiaokai.event;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 15:58
 * Description:事件发布
 */
@Service
@Slf4j
public class EventPublisher {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void publish(String topic, BaseEvent.EventMessage<?> message) {
        try {
            // 转换消息
            String payload = JSON.toJSONString(message);
            // 发送消息
            rocketMQTemplate.convertAndSend(topic, payload);
            log.info("publish event success, topic:{}, message:{}", topic, payload);
        }catch (Exception e){
            log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
            throw e;
        }
    }

    public void publish(String topic, String message) {
        try {
            // 发送消息
            rocketMQTemplate.convertAndSend(topic, message);
            log.info("publish event success, topic:{}, message:{}", topic, message);
        }catch (Exception e){
            log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
            throw e;
        }
    }
}

UserService.java

package com.xiaokai.service;

import com.xiaokai.entity.UserEntity;
import com.xiaokai.event.BaseEvent;
import com.xiaokai.event.CommonMessage;
import com.xiaokai.event.EventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 16:20
 */
@Service
@Slf4j
public class UserService {

    @Autowired
    private CommonMessage commonMessage;
    @Autowired
    private EventPublisher eventPublisher;

    public void sendMsg(UserEntity user){
        log.info("构建普通发送消息");
        CommonMessage.SendUserMessage message = CommonMessage.SendUserMessage.builder()
                .id(user.getId())
                .name(user.getName())
                .password(user.getPassword()).build();
        BaseEvent.EventMessage<CommonMessage.SendUserMessage> sendUserMessageEventMessage = commonMessage.buildEventMessage(message);

        eventPublisher.publish(commonMessage.topic(),sendUserMessageEventMessage);
        log.info("发送普通消息完成");
    }
}

4.3 普通消息

添加监听器

UserCommonConsumer.java

package com.xiaokai.listen;

import apache.rocketmq.v2.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 16:31
 * Description:普通消息消费
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.common}", consumerGroup = "${rocketmq.consumer.group}")
public class UserCommonConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("Received message: {}", s);
    }
}

@RocketMQMessageListener不能放在方法上。

测试

package com.xiaokai;

import com.xiaokai.config.LogMQConfig;
import com.xiaokai.entity.UserEntity;
import com.xiaokai.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.PreDestroy;

/**
 * Author:yang
 * Date:2024-10-06 16:32
 */
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class RocketMQTest {

    @Autowired
    private UserService userService;
    @Autowired
    private LogMQConfig logMQConfig;


    @Test
    public void test_sendCommonMsg() {
        logMQConfig.printLog();
        UserEntity user = UserEntity.builder().id(100)
                .name("xiaokai")
                .password("111111")
                .build();
        userService.sendMsg(user);
    }
}
  • 启动RocketMQApplication.java(启动监听器)

  • 启动测试方法

测试结果

// test
 : RocketMQ NameServer: [110.41.55.242:9876]
 : RocketMQ Producer Group: producer-group
 : 构建普通发送消息
 : publish event success, topic:common-message, message:{"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"8401537837","timestamp":1728295047107}
 : 发送普通消息完成
 
 // application
  Received message: {"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"8401537837","timestamp":1728295047107}

4.4 过滤消息

过滤消息是在普通消息的topic后面跟上:tag

添加监听器:tag为xiaokai

package com.xiaokai.listen;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-06 16:31
 * Description:过滤消息消费
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.filter}", consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression = "xiaokai")
public class UserFilterConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("Received filter message: {}", s);
    }
}

在事件发布中添加方法

/**
 * 发布事件
 *
 * @param topic   主题
 * @param message 消息
 * @param tag     标签
 */
public void publish(String topic, BaseEvent.EventMessage<?> message, String tag) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);
        // 发送过滤消息--->在主题后面加上":",然后加上标签:destination = topic:tag
        rocketMQTemplate.convertAndSend(topic + ":" + tag, payload);
        log.info("publish event success, topic:{}, message:{}", topic, payload);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;

    }
}

服务中添加方法

public void sendFilterMsg(UserEntity user,String tag){
    log.info("构建过滤发送消息");
    FilterMessage.SendUserMessage message = FilterMessage.SendUserMessage.builder()
            .id(user.getId())
            .name(user.getName())
            .password(user.getPassword()).build();
    BaseEvent.EventMessage<FilterMessage.SendUserMessage> sendUserMessageEventMessage = filterMessage.buildEventMessage(message);

    eventPublisher.publish(filterMessage.topic(),sendUserMessageEventMessage,tag);
    log.info("发送过滤消息完成");
}

启动application:启动了两个监听器

running container: DefaultRocketMQListenerContainer{consumerGroup='consumer-group', namespace='', nameServer='110.41.55.242:9876', topic='common-message', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', 
Register the listener to container, listenerBeanName:userCommonConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
running container: DefaultRocketMQListenerContainer{consumerGroup='consumer-group', namespace='', nameServer='110.41.55.242:9876', topic='filter-message', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='xiao
Register the listener to container, listenerBeanName:userFilterConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2

测试

/**
 * 发送带tag消息
 */
@Test
public void test_sendFilterMsg() {
    logMQConfig.printLog();
    UserEntity user = UserEntity.builder().id(100)
            .name("xiaokai")
            .password("111111")
            .build();
    userService.sendFilterMsg(user,"xiaokai");
}

: 构建过滤发送消息
: publish event success, topic:filter-message, message:{"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"4163167680","timestamp":1728303830159}
: 发送过滤消息完成
2024-10-07 20:24:03.610  INFO 43732 --- [onsumer-group_1] com.xiaokai.listen.UserFilterConsumer    : Received filter message: {"data":{"id":100,"name":"xiaokai","password":"111111"},"id":"4163167680","timestamp":1728303830159}

4.5 延迟消息

Apache RocketMQ 一共支持18种级别的延迟投递,具体如下:

延迟级别延迟时间延迟级别延迟时间
11秒106分钟
25秒117分钟
310 秒128分钟
430 秒139分钟
51分钟1410分钟
62分钟1520分钟
73分钟1630分钟
84分钟171小时
95分钟182小时

添加延迟方法

/**
 * 发布事件
 *
 * @param topic   主题
 * @param message 消息
 * @param level   delay级别
 */
public void publishDelay(String topic, BaseEvent.EventMessage<?> message, Integer level) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);

        Message<String> msg = MessageBuilder.withPayload(payload)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, level)
                .build();
        // 设置延时级别
        rocketMQTemplate.syncSend(topic, msg, 3000, level);
        log.info("publish event success, topic:{}, message:{}", topic, payload);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

4.6 批量消息

添加发送消息方法

/**
 * 发布事件
 * @param topic   主题
 * @param message 消息集合
 */
public void publishBatch(String topic, List<BaseEvent.EventMessage<?>> message) {
    try {
        // 消息集合
        List<Message<String>> messages = new ArrayList<>();
        // 转换消息
        for (BaseEvent.EventMessage<?> eventMessage : message) {
            String payload = JSON.toJSONString(eventMessage);
            Message<String> msg = MessageBuilder.withPayload(payload)
                    .build();
            messages.add(msg);
        }
        rocketMQTemplate.syncSend(topic, messages);
        log.info("publish event success, topic:{}, message:{}", topic, message);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

添加服务方法

public void sendBatchMsg(UserEntity user){
    log.info("构建批量发送消息");
    List<BaseEvent.EventMessage<?>> messages = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        CommonMessage.SendUserMessage message = CommonMessage.SendUserMessage.builder()
                .id(user.getId())
                .name(user.getName())
                .password(user.getPassword()).build();
        BaseEvent.EventMessage<CommonMessage.SendUserMessage> sendUserMessageEventMessage = commonMessage.buildEventMessage(message);
        messages.add(sendUserMessageEventMessage);
    }
    eventPublisher.publishBatch(commonMessage.topic(),messages);
    log.info("发送批量消息完成");
}

测试:往common-message的主题中发送批量消息

 /**
 * 发送批量消息
 */
@Test
public void test_sendBatchMsg() {
    logMQConfig.printLog();
    UserEntity user = UserEntity.builder().id(100)
            .name("xiaokai")
            .password("111111")
            .build();
    userService.sendBatchMsg(user);
}

测试结果:批量发送,单独消费

4.7 顺序消息

发送消息方法
/**
 * 发布顺序事件
 *
 * @param topic   主题
 * @param message 消息
 */
public void publishOrder(String topic, BaseEvent.EventMessage<?> message , String orderId) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            /**
             * 选择队列
             * @param list 默认的消息队列列表
             * @param message 传输的消息
             * @param o 传输消息时额外的参数 - send(topic, message, o)
             * @return
             */
            @Override
            public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
                // 全部的消息发送到第一个队列
                return list.get(0);
            }
        });
        // orderId 相同的消息会被顺序消费
        // orderId在选择器中是 o
        // payload在选择器中是 message.getPayload()
        rocketMQTemplate.syncSendOrderly(topic, payload, orderId);
        log.info("publish event success, topic:{}, message:{}", topic, message);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

接口中list为队列,message为消息,o 为传入的对象,返回消息发送到哪个队列。上面的例子中以orderId为分片标准,所有队列的余数,即把orderId相同的消息发送到同一个队列。

4.8 事务消息

事务性消息的发送分为两个阶段,首先会有一个半消息被投递,即一条消息成功发送到MQ服务器,但是服务器没有收到Producer对该消息的第二次确认,此时该消息会被标记为“暂时无法投递”状态。

消息发送成功后会执行本地事务,并根据本地事务的结果向Broker传递半消息状态(提交或者回滚)。

如果由于网络闪退、Producer重启等原因导致某条事务消息的二次确认丢失,Broker 会发现这条长时间处于“半消息”状态的消息,并主动向 Producer 检查该消息的事务状态(Commit 或 Rollback)。因此,如果本地事务执行成功,下游就会收到该消息,否则不会收到。这样最终保证了上下游数据的一致性。

事务消息的详细执行流程如下图所示:

交易消息发送流程

  1. 生产者将半条消息发送给RocketMQ Broker

  2. 消息持久化成功后RocketMQ Broker,返回Ack给Producer,确认消息发送成功,并且是一条半消息。

  3. 生产者开始执行本地事务。

  4. Producer根据本地事务的结果,向服务端提交第二次确认(Commit或者Rollback),服务端收到确认后,处理如下逻辑。

    1. 如果第二次确认结果为Commit:服务端将该半消息标记为可交付,并交付给消费者。

    2. 如果第二次确认结果为Rollback,则服务端将回滚该事务,不会再向Consumer投递该半条消息。

  5. 在网络断线或者Producer重启等特殊情况下,如果Server没有收到Producer的第二次确认结果,或者Server收到的第二次确认结果为Unknown,则Server会在固定的时间之后向某个Producer发起回滚消息。

demo:

实现事务监听器 TransactionListener

package com.xiaokai.listen;

import com.xiaokai.transactional.TransactionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Author:yang
 * Date:2024-10-09 17:33
 * Description:事务监听器 - 用于实现事务消息
 */

@Slf4j
@Component
public class TransactionListenerImpl implements TransactionListener {


    @Autowired
    private TransactionService transactionService;

    /**
     * 作用 - 用于执行本地事务,返回事务状态,如果返回提交状态,则消息会被提交,如果返回回滚状态,则消息会被丢弃。
     * 执行时间 - 在发送半消息之后调用
     * @param message 这个参数代表了需要进行事务状态检查的消息对象。它包含了消息的主题(topic)、消息体(body)、消息标签(tags)、属性(properties)等信息。
     *                这个对象是之前在调用sendMessageInTransaction方法时创建并发送的半消息。
     * @param o 这是一个用户自定义的对象,它在调用sendMessageInTransaction发送半消息时传递给TransactionListener的。
     *          可以利用这个对象来传递任何需要在执行本地事务检查时使用的上下文信息或状态数据。
     * @return 事务状态
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 调用事务服务,执行本地事务
        boolean success = transactionService.localTransaction(message);
        if (success) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }



    /**
     * 作用 - 定时(默认5s,可以设置)检查事务状态,如果返回提交状态,则消息会被提交,如果返回回滚状态,则消息会被丢弃。
     * 执行时间 - 在检查半消息状态之前调用
     * @param messageExt 消息
     * @return 事务状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 调用事务服务,检查本地事务状态
        boolean success = transactionService.checkLocalTransaction(messageExt);
        if (success) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

触发时机:

  • executeLocalTransaction(Message msg, Object arg)的触发时机

    • 当你调用sendMessageInTransaction方法发送事务性消息时,RocketMQ客户端库首先会发送一个半消息到消息服务器。

    • 然后,客户端库会调用executeLocalTransaction方法,传入消息msg和之前通过sendMessageInTransaction发送时附加的对象arg,以执行本地事务。

  • checkLocalTransaction(Message msg, Object arg)的触发时机

    • 这个方法会在半消息的回查周期内被调用。消息服务器会定时向消息生产者查询半消息对应的本地事务执行情况。

    • 客户端库会调用checkLocalTransaction方法,传入消息msg和之前附加的对象arg,以检查本地事务是否成功执行。

参数含义:

  1. Message msg

    1. 这个参数代表了需要进行事务状态检查的消息对象。它包含了消息的主题(topic)、消息体(body)、消息标签(tags)、属性(properties)等信息。这个对象是之前在调用sendMessageInTransaction方法时创建并发送的半消息。

  2. Object arg

    1. 这是一个用户自定义的对象,它在调用sendMessageInTransaction发送半消息时传递给TransactionListener的。你可以利用这个对象来传递任何需要在执行本地事务检查时使用的上下文信息或状态数据。

参数的使用场景:

  • Message msg

    • 你可以使用msg对象来获取与消息相关的信息,例如,可以通过msg.getMsgId()获取消息的唯一标识符,或者使用msg.getProperty方法获取消息的自定义属性,这些信息可能对检查本地事务状态有帮助。

  • Object arg

    • 在发送半消息之前,你可能会执行一些本地事务逻辑,比如数据库操作。在这些操作中,可能需要一些额外的上下文信息来完成事务的提交或回滚。这些信息可以通过arg参数在发送半消息时传递给executeLocalTransaction方法,然后executeLocalTransaction方法可以再将其传递给checkLocalTransaction方法。

本地事务方法

package com.xiaokai.transactional;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

/**
 * Author:yang
 * Date:2024-10-09 18:30
 */
@Slf4j
@Service
public class TransactionService {

    /**
     * 本地事务执行
     * @param msg
     * @return
     */
    public boolean localTransaction(Message msg) {
        // 1. 执行1方法

        // 2. 执行2方法

        // 3. 本地事务提交
        return true;
    }

    /**
     * 事务回查
     * @param messageExt
     * @return
     */
    public boolean checkLocalTransaction(MessageExt messageExt) {
        // 用消息中的部分信息,入库查询事务执行结果 - 使用orderId查询订单表是否有这个这个记录
        // 1. 查询事务执行结果

        // 2. 根据查询结果决定是否提交事务

        return true;
    }
}

监听器配置

package com.xiaokai.config;

import com.xiaokai.listen.TransactionListenerImpl;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Author:yang
 * Date:2024-10-09 18:54
 * Description:发送事务消息,需要配置监听器
 */
@Component
public class MQTemplateConfig {

    @Autowired
    private TransactionListenerImpl transactionListener;

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.producer.group}")
    private String producerGroup;
    
    @Bean
    public TransactionMQProducer transactionMQProducer() {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
        producer.setTransactionListener(transactionListener);
        // 其他配置,如setNamesrvAddr等
        producer.setNamesrvAddr(nameServer);
        producer.setProducerGroup(producerGroup);
        return producer;
    }

    @Bean
    public RocketMQTemplate rocketTransactionMQTemplate(TransactionMQProducer transactionMQProducer) {
        RocketMQTemplate template = new RocketMQTemplate();
        template.setProducer(transactionMQProducer);
        // 其他配置
        return template;
    }
}

发送事务消息

/**
 * 发布事务事件
 *
 * @param topic   主题
 * @param message 消息
 */
public void publishTransaction(String topic, BaseEvent.EventMessage<?> message , String orderId) {
    try {
        // 转换消息
        String payload = JSON.toJSONString(message);
        Message<String> msg = MessageBuilder.withPayload(payload)
                .build();

        rocketMQTemplate.sendMessageInTransaction(topic, msg , orderId);
        log.info("publish event success, topic:{}, message:{}", topic, message);
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

注:

       1.  文章全篇使用了消息同步发送方式,建议采用消息异步发送方式(发送消息时,添加回调函数监听消息)。

/**
 * 异步发布事件
 * @param topic   主题
 * @param message 消息
 */
public void publish(String topic, String message) {
    try {
        // 发送消息
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("publish event success, topic:{}, message:{}", topic, message);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, throwable.getMessage());
            }
        });
    } catch (Exception e) {
        log.error("publish event error, topic:{}, message:{}, error:{}", topic, message, e.getMessage());
        throw e;
    }
}

        2. 刚开始启动出错误,优先检查rmqbroker容器是不是处于运行状态。


原文地址:https://blog.csdn.net/weixin_44929475/article/details/142795579

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!