Redis数据结构(四)— 流
流(stream)是Redis 5.0 新增的数据结构。 它本质是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的id和其对应的内容。
它主要用于实现消息队列,与Redis之前的发布订阅功能相比,流提供了消息的持久化、主备复制功能,支持ack确认消息的模式以及支持消费组等特性,使得消息队列更加稳定可靠。
1 基础功能
添加删除 | XADD、XDEL |
队列 | 修剪:XTRIM 长度:XLEN 访问元素:XRANGE、XREVRANGE |
读取 | XREAD,可以以阻塞或非阻塞方式获取流元素。 |
表 流的基础功能
流的ID格式一般为:毫秒级时间戳 + ‘-’ + 序号。 要插入的id必须大于已有的id。
2 消费者组
流的消费者组允许用户将一个流从逻辑上划分为不同的流,并让消费者组属下的消费者去处理组中的消息。
消费者组 | 是一种将流从逻辑上分成多个不同部分,并让组下的消费者去处理这些部分中的消息的机制。 |
消费者 | 是负责处理消息的客户端,是消费者组内的成员。消费流中的消息。 |
表 流的消费者组及消费者概念
一个消费者组只能有一个流,一个流可以被多个消费者组读取。不同流的消费者组名可以一样。
不同消费者组的消费者可以同名。
XGROUP | 创建消费者组:XGROUP CREATE stream group id 修改消费者组的最后递送id: XGROUP SETID stream group id 删除消费者:XGROUP DELCONSUMER stream group consumer 删除消费者组:XGROUP DESTORY stream group |
XREADGROUP | 读取消费者组中的消息。 XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS stream [stream ...] id [id ...] |
XPENDING | 获取待处理消息的相关信息 XPENDING stream group [start stop count] [consumer] |
XACK | 将消息标记为“已处理”,标记后消息会从待处理队列删除。但是如果用户手动修改了消费者组的最后递送id,则这条消息可能会被重新读取到带处理的消息队列中。 XACK stream group id [id id ...] |
XCLAIM | 转移消息的归属权。 XCLAIIM stream group other_consumer max_pending_time id [id ...] |
XINFO | 查看消费者信息:XINFO CONSUMERS stream group 查看消费者组信息:XINFO GROUPS stream 查看流信息:XINFO STREAM stream |
表 消费者组相关命令
2.1 注意事项
2.1.1 修改消费者组最后递送消息id
修改值大于当前值:可能漏掉信息。
修改值小于当前值:可能会重复读,及被XACK处理的消息可能被再次读取到。
2.1.2 删除消费者
其待处理的消息也会从消费者组的待处理队列删除。
2.1.3 消息递送次数与XREADGROUP
指这条消息被消费者读取的次数。 当这条消息被新的消费者获取后,它的递送次数会被重置为1. 如果通过XCLAIM转移的,递送次数会+1。
XREADGROUP 读取消息时,不能直接读取未递送过的新消息。
假设,消费者组的最后递送消息id未0,流中有3条数据还未被递送过。
XREADGROUP GROUP g1 c1 STREAMS s1 0 // 这条命令不不能读取任何的信息。
图 消费者读取信息
这个命令只能读取已被递送到消费者c1,且待处理的信息。
将id参数的值设置为特殊符号“>”,命令会自动地向消费者返回尚未被递送过的新消息,且将这些消息读取到指定的消费者。
图 消费者读取未被递送的消息
3 消息队列的对比
列表 | 非有序结构,如果需要查找指定数据的元素,需要遍历整个列表。 |
有序集合 | 性能瓶颈,其需要维护一个有序的结构,插入和删除操作都需要对元素进行重新排序。 |
发布与订阅 | 其“发送即忘”的策略会导致离线的客户端丢失信息。不可靠。 |
图 Redis的其他三个数据结构实现消息队列的缺陷
这三种结构还有个共同的问题:它们的元素只能是单个值。如果用户想要用用它们实现消息队列来传递多项消息,那么必须使用JSON之类的序列化格式。
3.1 与Kafka等消息队列框架的对比
redis 的流 | Kafka | |
数据持久化 | Redis的主要设计初衷是基于内存的。流虽然支持数据的持久性,但是机制相对简单,可能无法满足高可靠性需求。 | 将消息持久化存储到磁盘上,即使服务器重启,消息也不会丢失。其可靠性和容错性更好。 |
消息顺序 | 消息是按照插入顺序进行存储的,这保证了消费者可以按照消息的顺序进行处理。 | 能保证同一分区内消息的顺序性,但不保证全局消息的顺序性。 |
集群扩展性 | Redis的集群模式在扩展性方面存在一定的限制。 | 支持集群扩展,具有良好的水平扩展能力。可以通过添加更多的节点来增强系统的吞吐量和容错性。 |
表 Redis 的流实现消息队列与Kafka的对比
原文地址:https://blog.csdn.net/qq_25308331/article/details/144457293
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!