SpringBoot集成RockerMQ
1.引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
2.配置服务器地址
#Rocketmq配置
rocketmq.name-server=192.168.11.99:9876
# 必须指定生产者组
rocketmq.producer.group=group01
# 消息发送超时时长,默认3s
rocketmq.producer.send-message-timeout=3000
# 同步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-failed=3
# 异步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-async-failed=3
3.创建生产者
package com.by.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_01",consumerGroup = "group_205")
public class RocketMqConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String massage) {
log.info("消费者1:{}"+massage);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 最大重试次数
consumer.setMaxReconsumeTimes(2);
// 设置消费者的属性
consumer.setPullBatchSize(250);
// 一次抓取的数量
}
}
4.创建消费者
package com.by.rocketmq.provider;
import com.by.moder.RegisterOk;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RocketMqProvider {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void send(String msg){
Message<String> msg1 = MessageBuilder.withPayload(msg).build();
//rocketMQTemplate.send("topic_01",msg1);
//rocketMQTemplate.convertAndSend("topic_01",msg);
//String destination, Message<?> message, long timeout 延迟消费
rocketMQTemplate.syncSend("topic_01", msg1,3000,3);
}
}
测试
package com.by;
import com.by.moder.RegisterOk;
import com.by.rocketmq.provider.RocketMqProvider;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
@Slf4j
@SpringBootTest
class RocketMqTests {
@Autowired
RocketMqProvider rocketMqProvider;
@Test
void Test1() throws IOException {
//RegisterOk build = RegisterOk.builder().id(1).build();
//String s = JSONUtil.toJsonStr(build);
for (int i = 1; i <=10 ; i++) {
rocketMqProvider.send("你好++"+i+"++");
log.info("发送成功:"+i);
}
System.in.read();
}
// 延迟消费
@Test
void Test2() throws IOException {
rocketMqProvider.send("你好++++");
log.info("发送成功:"+1);
System.in.read();
}
}
死信队列
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "topic_01")
public class Consumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
log.info("Received message: " + message);;
throw new RuntimeException("test");
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 设置最大重试次数
consumer.setMaxReconsumeTimes(2);
// 如下,设置其它consumer相关属性
consumer.setPullBatchSize(16);
}
}
原文地址:https://blog.csdn.net/w2144217940/article/details/137890594
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!