24人参与 • 2025-06-26 • Redis
在redis5.0之前,如果想使用它作为简单的消息队列,最好的选择就是自身提供的pub/sub模式.它支持简单的发布/订阅模式,发布一个channel绑定一条消息,然后可以有多个消费者监听这个channel,每个消费者都能收到相同的消息。不支持持久化,不支持查询,不支持分组,不支持分片消费,也没有提供很好的监控手段(有简单的pubsub容器命令,可以看有哪些channel,订阅者数量等)。但是5.0之后,倘若我们人仍选择redis作为简单消息队列,就可以使用新的数据类型stream
xadd 向指定的 stream 添加一条新消息。xadd key [maxlen [~] count] * field1 value1 [field2 value2 ...]
参数说明:
key:stream 的名称。
maxlen [~] count:可选,限制 stream 最大长度,超出自动裁剪最老消息。~ 表示近似修剪,性能更优。【实际上使用要注意,超过最大值直接丢弃,也就是“消失了“】
*:让 redis 自动生成消息id,也可自定义id。
field value:消息体的键值对。
用法举例:xadd mystream * name alice age 20
xrange 按id范围读取 stream 中的消息xrange key start end [count count]
参数说明:
start、end:起止id,- 表示最小id,+ 表示最大id。
count:可选,限制返回条数。
用法举例:xrange mystream - + # 读取所有消息
xread 从一个或多个 stream 读取新消息,可阻塞等待xread [block milliseconds] streams key [key ...] id [id ...]
参数说明:
block:可选,阻塞等待新消息的毫秒数。
streams:后面跟 stream 名称和起始id。
用法举例:xread block 5000 streams mystream $
$ 表示只读新消息
xgroup 创建、删除、管理 stream 的消费者组。xgroup create mystream mygroup 0-0 mkstream
常用子命令:
xgroup create mystream mygroup 0-0 mkstream
0-0:从头消费;$:只消费新消息。xgroup destroy mystream mygroup
xgroup createconsumer mystream mygroup consumer-1
xgroup createconsumer mystream mygroup consumer-1
xreadgroup 以消费者组身份读取消息,实现分布式并发消费xreadgroup group group consumer [block milliseconds] streams key [key ...] id [id ...]
参数说明:
group group consumer:指定组名和消费者名。
id:> 表示只读未分配的新消息,其他id(如0)可用于补偿pending。
举例:xreadgroup group mygroup consumer-1 block 5000 streams mystream >
xpending 查看某个组下所有未ack的消息(即已分配但未确认)注意这里不是消息的快照,它只是存储消息的id列表,并不会复制一份消息内容xpending key group [start end count [consumer]]
举例:xpending mystream mygroup - + 10
xpending mystream mygroup - + 10 consumer-1
xack 用于确认消息已被消费,也就是从pending状态pel中移除
举例:xack mystream mygroup 1680000000000-0
xclaim/xautoclaim 将长时间未ack的pending消息转移到其他消费者/实现自动补偿。
举例:xclaim mystream mygroup consumer-2 60000 1680000000000-0
xautoclaim mystream mygroup consumer-2 60000 0-0 count 10
xtrim 限制流的最大长度,自动删除最老的消息。无论是否被ack的消息,都会被裁减。
语法:xtrim key maxlen [~] count
举例:xtrim mystream maxlen ~ 1000
xdel 从stream中删除指定id的消息,可以一次删除多个,用空格隔开即可
xdel mystream 1680000000000-0
results obtained: 结果: processed between 0 and 1 ms -> 74.11% processed between 1 and 2 ms -> 25.80% processed between 2 and 3 ms -> 0.06% processed between 3 and 4 ms -> 0.01% processed between 4 and 5 ms -> 0.02% 因此,99.9%的请求的延迟<= 2毫秒,异常值仍然非常接近平均值。
特殊的 > id,表示消费者只想接收从未发送给其他消费者的信息。它的意思是,给我新邮件。
任何其他 id,即 0 或任何其他有效 id 或不完整 id(仅毫秒时间部分),都将导致返回发送命令的用户的待处理条目,且 id 大于所提供的 id。因此,基本上如果 id 不大于,那么命令将只允许客户访问其待处理条目:已向其发送但尚未确认的信息。请注意,在这种情况下,block 和 noack 都会被忽略。
pending状态的消息是可以被删除的,redis并没有设计未确认的消息不允许删除。如果采用xdel删除消息后,pending列表将仍然保留待消费消息的id,但是消息内容没有了。因此,在读取此类pel条目时,redis会返回一个空值。
x. 如果要“广播”效果(每个消费者都收到同一条消息),需要每个消费者用不同的 group。或者都广播了,就使用pub/sub吧,,~~
更加详细stream的细节介绍,可以参考官网:https://redis.io/docs/latest/develop/data-types/streams
稍后我将具体介绍如何在代码中使用stream来作为消息队列。
到此这篇关于使用redis的stream数据类型做消息队列的文章就介绍到这了,更多相关redis消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论