redis消息队列
一 、Redis可以作为消息队列使用,支持几种常见的模式,包括:
发布/订阅模式(Pub/Sub):消息发布者将消息发送到一个频道,所有订阅了该频道的订阅者都会收到消息。这种模式比较简单,但没有持久化机制,消息发布后如果订阅者不在线则会丢失消息。
列表(List)模式:使用Redis的列表(List)结构,通常使用LPUSH和BRPOP命令。生产者将消息推入列表,消费者从列表中弹出消息。这种模式可以通过阻塞操作实现可靠的消息传递。
Redis的列表(List)数据结构本身是持久化存储的,Redis默认会将数据保存在内存中并周期性地将数据同步到磁盘上(持久化机制包括RDB快照和AOF日志)。但是,Redis并不是一个专门设计用于消息队列的系统,因此在一些情况下,可能会面临数据丢失的风险。例如,如果Redis服务器崩溃或重启,可能会丢失最近未持久化的数据。
使用 Redisson 的 RBlockingQueue 实现消息队列的消费者模式时,如果有多个消费者,消息不会被重复消费。每个消息只会被一个消费者处理。这是因为 RBlockingQueue 的 take 方法是阻塞的,每次只会有一个消费者成功获取消息,从而实现消息的负载均衡。
流(Stream)模式:这是Redis 5.0引入的新功能,类似于Kafka。消息会被记录到一个流中,可以有多个消费者组,每个组都有自己的消费位置。消息可以持久化存储,并支持复杂的消费模式。
二、依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
三 、列表(List)模式代码示例
生产者和消费者的兼容性
生产者:
使用RQueue来发送消息,这种方式适用于简单的非阻塞操作。
RQueue和RBlockingQueue共享相同的底层数据结构,所以使用RQueue将消息发送到队列中,RBlockingQueue也能够读取这些消息。
消费者:
使用RBlockingQueue来接收消息,这样可以利用阻塞操作来提高性能,避免空轮询。
RBlockingQueue继承自RQueue,所以它能够兼容RQueue的所有操作,并且增加了阻塞方法,如take。
创建消息对象
public class MessageVO {
private String id;
private String content;
private String type;
// Constructors, getters, and setters
public MessageVO() {
}
public MessageVO(String id, String content, String type) {
this.id = id;
this.content = content;
this.type = type;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "MessageVO{id='" + id + "', content='" + content + "', type='" + type + "'}";
}
}
生产者类
import com.alibaba.fastjson.JSON;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
@Autowired
private RedissonClient redissonClient;
private static final String QUEUE_NAME = "messageQueue";
public void sendMessage(MessageVO messageVO) {
String message = JSON.toJSONString(messageVO);
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(QUEUE_NAME);
queue.add(message);
System.out.println("Message sent: " + message);
}
}
消费者类
import com.alibaba.fastjson.JSON;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class MessageConsumer {
@Autowired
private RedissonClient redissonClient;
private static final String QUEUE_NAME = "messageQueue";
@PostConstruct
public void consumeMessages() {
new Thread(() -> {
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(QUEUE_NAME);
while (true) {
try {
String message = queue.take(); // This will block until a message is available
MessageVO messageVO = JSON.parseObject(message, MessageVO.class);
System.out.println("Message consumed: " + messageVO);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
控制器类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/send")
public String sendMessage(@RequestParam String id, @RequestParam String content, @RequestParam String type) {
MessageVO messageVO = new MessageVO(id, content, type);
messageProducer.sendMessage(messageVO);
return "Message sent: " + messageVO;
}
}
四、Redis Streams的示例
修改生产者类
import com.alibaba.fastjson.JSON;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamAddArgs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class MessageProducer {
private static final String STREAM_NAME = "messageStream";
@Autowired
private RedissonClient redissonClient;
public void sendMessage(MessageVO messageVO) {
String message = JSON.toJSONString(messageVO);
RStream<String, String> stream = redissonClient.getStream(STREAM_NAME);
Map<String, String> messageMap = new HashMap<>();
messageMap.put("message", message);
messageMap.put("type", messageVO.getType());
// Use StreamAddArgs.Builder to create an instance
StreamAddArgs<String, String> args = StreamAddArgs.<String, String>entries(messageMap);
stream.add(args);
System.out.println("Message sent: " + message);
}
}
修改消费者类
import com.alibaba.fastjson.JSON;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
@Service
public class MessageConsumer {
@Autowired
private RedissonClient redissonClient;
private static final String STREAM_NAME = "messageStream";
private static final String GROUP_NAME = "group1";
private static final String CONSUMER_NAME = "consumer1";
@PostConstruct
public void consumeMessages() {
new Thread(() -> {
RStream<String, String> stream = redissonClient.getStream(STREAM_NAME);
// 创建消费者组,如果消费者组已经存在,则忽略异常
try {
stream.createGroup(GROUP_NAME);
} catch (Exception e) {
// Group already exists
}
while (true) {
try {
Map<StreamMessageId, Map<String, String>> entries =
stream.readGroup(GROUP_NAME, CONSUMER_NAME, StreamReadGroupArgs.neverDelivered());
if (entries.isEmpty()) {
logger.info("No new messages.");
} else {
for (Map.Entry<StreamMessageId, Map<String, String>> entry : entries.entrySet()) {
Map<String, String> message = entry.getValue();
logger.info("Consumed message: " + message);
// Acknowledge the message
stream.ack(GROUP_NAME, entry.getKey());
logger.info("Message acknowledged: " + entry.getKey());
}
}
Thread.sleep(1000); // Adjust sleep time as needed
} catch (Exception e) {
logger.severe("Error consuming messages: " + e.getMessage());
try {
Thread.sleep(5000); // Sleep before retrying in case of an error
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}).start();
}
}
修改控制器类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/send")
public String sendMessage(@RequestParam String id, @RequestParam String content, @RequestParam String type) {
MessageVO messageVO = new MessageVO(id, content, type);
messageProducer.sendMessage(messageVO);
return "Message sent: " + messageVO;
}
}
List<Map.Entry<StreamMessageId, Map<String, String>>> entries =
stream.readGroup(GROUP_NAME, CONSUMER_NAME, StreamMessageId.NEVER_DELIVERED, 1, 0);
stream.readGroup方法的参数如下:
具体参数解释如下:
GROUP_NAME:
类型:String
说明:消费者组的名称。在这里是group1。
用途:指定从哪个消费者组读取消息。
CONSUMER_NAME:
类型:String
说明:消费者的名称。在这里是consumer1。
用途:指定哪个消费者读取消息。
StreamMessageId.NEVER_DELIVERED:
类型:StreamMessageId
说明:起始的消息ID。StreamMessageId.NEVER_DELIVERED用于指定从未被传递的消息开始读取,即读取所有新消息。
用途:指定从哪个ID开始读取消息。
1:
类型:int
说明:每次读取的消息数。在这里是1。
用途:指定一次读取多少条消息。在这个例子中,每次读取1条消息。
0:
类型:long
说明:阻塞等待时间,单位是毫秒。在这里是0。
用途:指定最大阻塞等待时间。如果设为0,表示无限期阻塞,直到有新消息到来。如果设为一个正整数,则表示阻塞等待指定的毫秒数。
原文地址:https://blog.csdn.net/jifgnie/article/details/140546942
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!