自学内容网 自学内容网

Redis数据结构(四)— 流

 流(stream)是Redis 5.0 新增的数据结构。 它本质是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的id和其对应的内容。

它主要用于实现消息队列,与Redis之前的发布订阅功能相比,流提供了消息的持久化、主备复制功能,支持ack确认消息的模式以及支持消费组等特性,使得消息队列更加稳定可靠。

1 基础功能

添加删除

XADD、XDEL

队列

修剪:XTRIM

长度:XLEN

访问元素:XRANGE、XREVRANGE

读取

XREAD,可以以阻塞或非阻塞方式获取流元素。

表 流的基础功能

流的ID格式一般为:毫秒级时间戳 + ‘-’ + 序号。 要插入的id必须大于已有的id。

2 消费者组

流的消费者组允许用户将一个流从逻辑上划分为不同的流,并让消费者组属下的消费者去处理组中的消息。

消费者组

是一种将流从逻辑上分成多个不同部分,并让组下的消费者去处理这些部分中的消息的机制。

消费者

是负责处理消息的客户端,是消费者组内的成员。消费流中的消息。

表 流的消费者组及消费者概念

一个消费者组只能有一个流,一个流可以被多个消费者组读取。不同流的消费者组名可以一样。

不同消费者组的消费者可以同名。

XGROUP

创建消费者组:XGROUP CREATE stream group id

修改消费者组的最后递送id: XGROUP SETID stream group id

删除消费者:XGROUP DELCONSUMER stream group consumer

删除消费者组:XGROUP DESTORY stream group

XREADGROUP

读取消费者组中的消息。

XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS stream [stream ...] id [id ...]

XPENDING

获取待处理消息的相关信息

XPENDING stream group [start stop count] [consumer]

XACK

将消息标记为“已处理”,标记后消息会从待处理队列删除。但是如果用户手动修改了消费者组的最后递送id,则这条消息可能会被重新读取到带处理的消息队列中。

XACK stream group id [id id ...]

XCLAIM

转移消息的归属权。

XCLAIIM stream group other_consumer max_pending_time id [id ...]

XINFO

查看消费者信息:XINFO CONSUMERS stream group

查看消费者组信息:XINFO GROUPS stream

查看流信息:XINFO STREAM stream

表 消费者组相关命令

2.1 注意事项

2.1.1 修改消费者组最后递送消息id

修改值大于当前值:可能漏掉信息。

修改值小于当前值:可能会重复读,及被XACK处理的消息可能被再次读取到。

2.1.2 删除消费者

其待处理的消息也会从消费者组的待处理队列删除。

2.1.3 消息递送次数与XREADGROUP

指这条消息被消费者读取的次数。 当这条消息被新的消费者获取后,它的递送次数会被重置为1. 如果通过XCLAIM转移的,递送次数会+1。

XREADGROUP 读取消息时,不能直接读取未递送过的新消息。

假设,消费者组的最后递送消息id未0,流中有3条数据还未被递送过。

XREADGROUP GROUP g1 c1 STREAMS s1 0 // 这条命令不不能读取任何的信息。

图 消费者读取信息

这个命令只能读取已被递送到消费者c1,且待处理的信息。

将id参数的值设置为特殊符号“>”,命令会自动地向消费者返回尚未被递送过的新消息,且将这些消息读取到指定的消费者。

图 消费者读取未被递送的消息

3 消息队列的对比

列表

非有序结构,如果需要查找指定数据的元素,需要遍历整个列表。

有序集合

性能瓶颈,其需要维护一个有序的结构,插入和删除操作都需要对元素进行重新排序。

发布与订阅

其“发送即忘”的策略会导致离线的客户端丢失信息。不可靠。

图 Redis的其他三个数据结构实现消息队列的缺陷

这三种结构还有个共同的问题:它们的元素只能是单个值。如果用户想要用用它们实现消息队列来传递多项消息,那么必须使用JSON之类的序列化格式。

3.1 与Kafka等消息队列框架的对比

redis 的流

Kafka

数据持久化

Redis的主要设计初衷是基于内存的。流虽然支持数据的持久性,但是机制相对简单,可能无法满足高可靠性需求。

将消息持久化存储到磁盘上,即使服务器重启,消息也不会丢失。其可靠性和容错性更好。

消息顺序

消息是按照插入顺序进行存储的,这保证了消费者可以按照消息的顺序进行处理。

能保证同一分区内消息的顺序性,但不保证全局消息的顺序性。

集群扩展性

Redis的集群模式在扩展性方面存在一定的限制。

支持集群扩展,具有良好的水平扩展能力。可以通过添加更多的节点来增强系统的吞吐量和容错性。

表 Redis 的流实现消息队列与Kafka的对比


原文地址:https://blog.csdn.net/qq_25308331/article/details/144457293

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!