31人参与 • 2025-09-30 • Redis
轻量级部署:无需单独部署 rabbitmq、kafka 等消息队列服务,可以直接复用现有 redis 集群。例如一个电商系统可能已经使用 redis 做缓存,现在只需增加消息队列功能,无需额外维护其他中间件,显著降低运维成本;
高性能:基于内存操作,单节点 qps 可达 10 万级,满足高吞吐场景。实测表明,在标准服务器配置下,redis 处理简单消息的延迟可低至 0.1ms,远优于传统磁盘存储的消息队列;
api 简洁:依托 redis 原生命令即可实现完整队列功能:
支持多语言:所有主流语言的 redis 客户端(java/jedis、python/redis-py、go/redigo 等)均原生支持消息队列相关命令。例如 java 开发者可以直接使用 jedis 的 lpush() 方法发送消息,无需额外依赖;
可扩展性:通过 redis cluster 可以轻松实现消息队列的横向扩展。例如可以将不同业务的消息分配到不同分片,同时利用 redis sentinel 实现高可用,确保消息服务不间断。
redis 的 list 数据结构是一个双向链表,具有以下特性使其非常适合实现消息队列:
| 角色 | 核心命令 | 作用说明 | 时间复杂度 |
|---|---|---|---|
| 生产者 | lpush key value1 value2 | 从 list 左侧插入消息(头部插入),支持批量插入,返回插入后 list 的长度 | o(1) |
| 生产者 | rpush key value1 value2 | 从 list 右侧插入消息(尾部插入),支持批量插入 | o(1) |
| 消费者 | blpop key timeout | 从 list 左侧阻塞获取消息(头部取出),若 list 为空则等待timeout秒 | o(1) |
| 消费者 | brpop key timeout | 从 list 右侧阻塞获取消息(尾部取出),若 list 为空则等待timeout秒 | o(1) |
| 监控 | llen key | 获取当前队列的消息数量 | o(1) |
| 监控 | lrange key start end | 查看队列中从start到end的消息(如lrange queue 0 9查看前10条) | o(s+n) |
首先引入 jedis 依赖(maven):
<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>4.4.3</version> <!-- 建议使用最新稳定版 -->
</dependency>import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispool;
import redis.clients.jedis.jedispoolconfig;
public class listmqproducer {
// 队列key命名规范:业务域:组件类型:数据结构:具体业务
private static final string queue_key = "redis:mq:list:order";
// 使用连接池提高性能
private static final jedispool jedispool = new jedispool(
new jedispoolconfig(),
"localhost",
6379,
2000, // 连接超时时间
null // 密码
);
public static void main(string[] args) throws interruptedexception {
try (jedis jedis = jedispool.getresource()) {
// 模拟发送10条订单消息
for (int i = 1; i <= 10; i++) {
// 消息内容格式:业务标识_序号_时间戳
string message = string.format("order_%d_%d", i, system.currenttimemillis());
// lpush命令将消息放入队列头部
long queuelength = jedis.lpush(queue_key, message);
system.out.printf("生产者发送消息:%s,当前队列长度:%d%n", message, queuelength);
// 模拟业务处理间隔
thread.sleep(500);
}
} catch (exception e) {
system.err.println("生产者异常:" + e.getmessage());
} finally {
jedispool.close();
}
}
}import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispool;
import redis.clients.jedis.jedispoolconfig;
import java.util.list;
public class listmqconsumer {
private static final string queue_key = "redis:mq:list:order";
private static final jedispool jedispool = new jedispool(
new jedispoolconfig(),
"localhost",
6379
);
public static void main(string[] args) {
system.out.println("消费者启动,等待接收消息...");
while (true) {
try (jedis jedis = jedispool.getresource()) {
// brpop命令参数:
// 1. 超时时间3秒(避免空轮询消耗cpu)
// 2. 可以监听多个队列
list<string> messages = jedis.brpop(3, queue_key);
if (messages != null) {
// brpop返回结果格式:
// 第一个元素是队列key
// 第二个元素是消息内容
string message = messages.get(1);
system.out.println("消费者接收消息:" + message);
// 业务处理逻辑示例
processmessage(message);
} else {
system.out.println("队列暂无消息,继续等待...");
}
} catch (exception e) {
system.err.println("消费者处理消息异常:" + e.getmessage());
// 异常处理策略:
// 1. 记录错误日志
// 2. 重试机制
// 3. 告警通知
try {
thread.sleep(5000); // 出错后暂停5秒
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
}
}
}
}
private static void processmessage(string message) throws interruptedexception {
// 模拟业务处理
system.out.println("处理消息:" + message);
// 解析消息内容
string[] parts = message.split("_");
string orderid = parts[1];
// 模拟业务处理耗时
thread.sleep(1000);
system.out.println("订单" + orderid + "处理完成");
}
}3.2.1 消息确认机制实现
private static final string confirm_queue_key = "redis:mq:list:order:confirm";
private static final string dead_queue_key = "redis:mq:list:order:dead";
private static final int max_retry = 3;
// 优化后的消费者处理逻辑
list<string> messages = jedis.brpop(3, queue_key);
if (messages != null) {
string message = messages.get(1);
// 1. 将消息移到待确认队列(使用rpush保持顺序)
jedis.rpush(confirm_queue_key, message);
try {
// 2. 处理业务逻辑
processmessage(message);
// 3. 处理成功,从待确认队列删除
jedis.lrem(confirm_queue_key, 1, message);
} catch (exception e) {
system.err.println("处理消息失败:" + message);
// 4. 检查重试次数
long retrycount = jedis.incr("retry:" + message);
if (retrycount <= max_retry) {
// 放回主队列重试
jedis.lpush(queue_key, message);
} else {
// 超过重试次数,放入死信队列
jedis.rpush(dead_queue_key, message);
}
// 无论重试还是加入死信队列,都要从待确认队列删除
jedis.lrem(confirm_queue_key, 1, message);
}
}3.2.2 定时补偿任务
// 定时检查待确认队列(每分钟执行)
public void checkconfirmqueue() {
try (jedis jedis = jedispool.getresource()) {
// 获取待确认队列所有消息
list<string> pendingmessages = jedis.lrange(confirm_queue_key, 0, -1);
for (string message : pendingmessages) {
// 检查消息滞留时间
long createtime = long.parselong(message.split("_")[2]);
long currenttime = system.currenttimemillis();
long delay = currenttime - createtime;
// 超过30秒未处理则重试
if (delay > 30000) {
jedis.lrem(confirm_queue_key, 1, message);
jedis.lpush(queue_key, message);
system.out.println("消息超时重试:" + message);
}
}
}
}// 生产者批量发送 jedis.lpush(queue_key, "msg1", "msg2", "msg3"); // 消费者批量获取(非阻塞) list<string> batch = jedis.rpop(queue_key, 10); // 获取最多10条
管道(pipeline)优化:
try (pipeline p = jedis.pipelined()) {
p.lpush(queue_key, "msg1");
p.lpush(queue_key, "msg2");
p.sync(); // 批量提交
}监控指标:
队列长度监控:llen key
消费者积压:比较生产和消费速率
异常告警:死信队列增长监控
appendonly yesmaxmemory-policy volatile-lrutimeout 300(秒)# 监控队列长度 redis-cli llen redis:mq:list:order # 监控redis内存 redis-cli info memory
示例:payment:mq:list:refund
redis 的 pub/sub(发布 - 订阅)模型是一种高效的"一对多"消息通信机制,它允许生产者将消息发布到特定的频道(channel),而所有订阅该频道的消费者都能即时接收到这些消息。这种模式特别适合需要实时广播的场景,如新闻推送、实时聊天系统等。
| 角色 | 核心命令 | 作用说明 |
|---|---|---|
| 生产者 | publish channel message | 向指定频道发布消息,返回接收消息的消费者数量 |
| 消费者 | subscribe channel1 channel2 | 订阅一个或多个频道,阻塞等待消息(订阅状态下只能接收消息,无法执行其他命令) |
| 消费者 | psubscribe pattern | 使用模式匹配订阅频道(如psubscribe redis:mq:pubsub:*订阅所有匹配前缀的频道) |
import redis.clients.jedis.jedis;
public class pubsubproducer {
// 定义频道名称,采用命名空间方式避免冲突
private static final string channel_key = "redis:mq:pubsub:news";
// 创建redis连接实例
private static final jedis jedis = new jedis("localhost", 6379);
public static void main(string[] args) throws interruptedexception {
// 模拟发布3条新闻消息,实际应用中可接入实时数据源
string[] news = {
"redis 7.2版本发布,新增stream增强功能",
"基于redis的消息队列在电商场景的实践",
"redis cluster集群部署最佳实践"
};
// 循环发布消息
for (string msg : news) {
// 发布消息并获取接收者数量
long receivercount = jedis.publish(channel_key, msg);
system.out.println(string.format(
"【生产者】发布消息:%s,当前订阅者数量:%d",
msg, receivercount));
// 模拟消息间隔
thread.sleep(1000);
}
// 关闭连接
jedis.close();
}
}
import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispubsub;
public class pubsubconsumer {
private static final string channel_key = "redis:mq:pubsub:news";
private static final jedis jedis = new jedis("localhost", 6379);
public static void main(string[] args) {
// 创建自定义的消息处理器
jedispubsub pubsub = new jedispubsub() {
// 接收到消息时的回调方法
@override
public void onmessage(string channel, string message) {
system.out.println(string.format(
"【消费者1】接收到新消息(频道:%s):%s",
channel, message));
// 此处可添加业务处理逻辑
// 例如:解析消息内容、写入数据库、触发其他操作等
}
// 成功订阅频道时的回调
@override
public void onsubscribe(string channel, int subscribedchannels) {
system.out.println(string.format(
"【消费者1】成功订阅频道:%s,当前订阅总数:%d",
channel, subscribedchannels));
}
// 可添加其他回调方法如onunsubscribe、onpsubscribe等
};
system.out.println("【消费者1】启动并开始监听...");
// 开始订阅(该方法会阻塞当前线程)
jedis.subscribe(pubsub, channel_key);
// 注意:在实际应用中,通常会将订阅逻辑放在独立线程中
// 以避免阻塞主线程
}
}
业务域:子系统:消息类型的层次结构,如trade:order:createredis 5.0 推出的 stream 数据结构是专门为消息队列场景设计的,它完美解决了传统 list 和 pub/sub 模式的诸多缺陷。stream 支持消息持久化存储、消息确认机制、消费者组管理、死信队列等企业级特性,是目前 redis 实现可靠消息队列的最佳方案。在实际应用中,如电商订单处理、支付流水记录、日志收集等场景都能发挥重要作用。
stream:消息队列的主体,每个 stream 有唯一的 key(如"order:stream")。消息以"条目(entry)"形式存储,每个条目包含:
消费者组(consumer group):通过将多个消费者归为一组,实现:
消息确认(ack)机制:
pending 列表:
死信队列:
| 操作类型 | 命令格式 | 说明 |
|---|---|---|
| 添加消息 | xadd key * field1 value1 [field2 value2...] | *表示自动生成id,可指定id保证顺序 |
| 创建消费者组 | xgroup create key groupname id [mkstream] | mkstream选项在stream不存在时自动创建 |
| 消费消息 | xreadgroup group group consumer [count n] [block ms] streams key [id] | id通常为>表示新消息,0表示pending消息 |
| 消息确认 | xack key groupname id [id...] | 支持批量确认多个消息 |
| 查看pending消息 | xpending key groupname [start end count] [consumer] | 可查看指定消费者的未确认消息 |
| 消息所有权转移 | xclaim key groupname consumer min-idle-time id [id...] | 将空闲超时的消息转给其他消费者处理 |
xread streams key 0-0从最早消息开始读取xrange key start end [count n]按id范围查询xinfo groups key查看消费者组信息// maven依赖
<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>4.3.1</version>
</dependency>
// 连接配置
jedispoolconfig config = new jedispoolconfig();
config.setmaxtotal(10);
try (jedispool pool = new jedispool(config, "localhost", 6379)) {
jedis jedis = pool.getresource();
// 业务代码...
}生产者增强版:
public class enhancedproducer {
private static final string[] order_status = {"pending", "paid", "shipped", "completed"};
public void sendorderevent(order order) {
try (jedis jedis = pool.getresource()) {
map<string, string> fields = new hashmap<>();
fields.put("order_id", order.getid());
fields.put("user_id", order.getuserid());
fields.put("amount", order.getamount().tostring());
fields.put("status", order_status[0]);
fields.put("create_time", instant.now().tostring());
// 使用事务保证原子性
transaction t = jedis.multi();
t.xadd(stream_key, streamentryid.new_entry, fields);
t.sadd("order:ids", order.getid()); // 记录订单id集合
t.exec();
// 添加监控埋点
metrics.counter("mq.produce.count").increment();
}
}
}消费者增强版:
public class reliableconsumer implements runnable {
private static final int max_retry = 3;
@override
public void run() {
while (!thread.currentthread().isinterrupted()) {
try {
map<string, list<streamentry>> messages = jedis.xreadgroup(
group_name, consumername,
new streamparams().count(1).block(2000),
new streamoffset(stream_key, ">")
);
if (messages != null) {
messages.foreach((stream, entries) -> {
entries.foreach(entry -> {
processwithretry(entry);
});
});
}
} catch (exception e) {
logger.error("消费异常", e);
sleep(1000);
}
}
}
private void processwithretry(streamentry entry) {
int retrycount = getretrycount(entry.getid());
if (retrycount >= max_retry) {
movetodeadletter(entry);
return;
}
try {
order order = parseorder(entry.getfields());
orderservice.process(order);
jedis.xack(stream_key, group_name, entry.getid());
} catch (exception e) {
logger.warn("处理失败准备重试", e);
sleep(1000 * (retrycount + 1));
}
}
}public class deadlettermonitor {
public void checkpendingmessages() {
// 获取所有超时未确认的消息
list<streamentry> pending = getpendingmessages(timeout_ms);
pending.foreach(entry -> {
// 检查重试次数
if (getretrycount(entry.getid()) > max_retry) {
// 转移到死信队列
jedis.xadd(dead_stream_key, streamentryid.new_entry, entry.getfields());
jedis.xack(stream_key, group_name, entry.getid());
logger.warn("消息转入死信队列: {}", entry.getid());
// 发送告警通知
alertservice.notifyadmin(entry);
}
});
}
public void reprocessdeadletters() {
// 从死信队列重新处理
list<streamentry> deadmessages = jedis.xrange(dead_stream_key, "-", "+");
deadmessages.foreach(entry -> {
try {
manualprocess(entry.getfields());
jedis.xdel(dead_stream_key, entry.getid());
} catch (exception e) {
logger.error("死信处理失败", e);
}
});
}
}// 批量消费提高吞吐量
jedis.xreadgroup(group_name, consumername,
new streamparams().count(100).block(1000),
new streamoffset(stream_key, ">"));
// 批量确认减少网络开销
jedis.xack(stream_key, group_name, id1, id2, id3);
// 消费者崩溃后的恢复处理
public void recoverconsumer(string failedconsumer) {
list<pendingentry> pendings = jedis.xpending(
stream_key, group_name, "-", "+", 100, failedconsumer);
pendings.foreach(pending -> {
jedis.xclaim(stream_key, group_name, currentconsumer,
timeout_ms, pending.getidasstring());
});
}通过以上实现,基于redis stream的消息队列可以达到:
| 对比维度 | list 方案 | pub/sub 方案 | stream 方案(推荐) |
|---|---|---|---|
| 消息持久化 | 支持(需手动处理) | 不支持 | 原生支持 |
| 消息确认 | 需自定义(如rpoplpush) | 不支持 | 原生支持(ack机制) |
| 广播能力 | 不支持 | 原生支持(全量广播) | 支持(通过多消费者组实现) |
| 消费者负载均衡 | 支持(竞争消费模式) | 不支持(全量推送) | 支持(消费者组内自动均衡) |
| 死信队列 | 需自定义(备份list) | 不支持 | 支持(通过xclaim命令) |
| 实现复杂度 | 低(基础命令即可) | 低(订阅/发布模式) | 中(需理解消费者组概念) |
| 内存占用 | 线性增长 | 瞬时内存 | 可控制(支持消息修剪) |
| 历史消息回溯 | 有限支持(需保存完整list) | 不支持 | 完整支持(消息id时间序列) |
| 适用场景 | 简单异步通信 | 实时广播通知 | 可靠消息、企业级场景 |
选型决策树:
stream方案实施细节:
xgroup create mystream mygroup $ mkstream
list方案优化建议:
rpoplpush source_list backup_list # 原子操作 # 处理成功后再lrem备份列表
集群环境特别注意事项:
运维管理建议:
电商平台的订单处理采用异步消息队列模式,通过redis stream实现可靠的消息传递和消费。整个流程包含以下关键环节:
订单创建阶段
{
"order_id": "ord20231125001",
"user_id": "u10086",
"product_id": "p8808",
"quantity": "2"
}并行消费阶段
通知服务(消费者1):专门处理用户通知
库存服务(消费者2):负责库存扣减
update inventory set stock = stock - ? where product_id = ? and stock >= ?异常处理机制
// 订单服务(生产者)发送消息 - 增强版
public void createorder(order order) {
// 1. 数据库事务确保数据一致性
transactionstatus status = transactionmanager.gettransaction(new defaulttransactiondefinition());
try {
// 1.1 保存主订单
ordermapper.insert(order);
// 1.2 保存订单明细
order.getitems().foreach(item -> {
item.setorderid(order.getid());
orderitemmapper.insert(item);
});
// 2. 构建消息体(添加时间戳和业务标识)
map<string, string> message = new hashmap<>();
message.put("order_id", order.getid());
message.put("user_id", order.getuserid());
message.put("product_id", order.getproductid());
message.put("quantity", order.getquantity() + "");
message.put("create_time", system.currenttimemillis() + "");
message.put("biz_type", "normal_order");
// 3. 发送消息(添加重试机制)
int retrytimes = 0;
while (retrytimes < 3) {
try {
jedis.xadd("redis:mq:stream:order_create", null, message);
break;
} catch (exception e) {
retrytimes++;
if (retrytimes == 3) {
throw new runtimeexception("消息发送失败", e);
}
thread.sleep(1000 * retrytimes);
}
}
transactionmanager.commit(status);
} catch (exception e) {
transactionmanager.rollback(status);
throw new businessexception("订单创建失败", e);
}
}// 通知服务(消费者)完整实现
public void handlenotification() {
// 初始化消费者组(幂等操作)
initconsumergroup("redis:mq:stream:order_create", "order_group");
while (!thread.currentthread().isinterrupted()) {
try {
map<string, list<streamentry>> messages = jedis.xreadgroup(
"order_group",
"notify_consumer_" + instanceid, // 使用实例id区分消费者
1,
5000,
false,
map.of("redis:mq:stream:order_create", streamentryid.unreceived_entry)
);
if (messages != null && !messages.isempty()) {
for (streamentry entry : messages.get("redis:mq:stream:order_create")) {
map<string, string> content = entry.getfields();
string userid = content.get("user_id");
string orderid = content.get("order_id");
// 1. 发送短信(带熔断机制)
boolean smssent = circuitbreaker.execute(() ->
smsservice.send(userid, "订单通知", "您的订单" + orderid + "已创建成功"));
// 2. 发送app推送
boolean pushsent = pushservice.send(userid, "订单创建通知",
map.of("orderid", orderid, "type", "order_created"));
if (smssent || pushsent) {
// 至少一个通知发送成功才确认消息
jedis.xack("redis:mq:stream:order_create", "order_group", entry.getid());
monitorservice.recordsuccess("order_notify");
} else {
monitorservice.recordfailure("order_notify");
}
}
}
} catch (exception e) {
log.error("通知处理异常", e);
monitorservice.recorderror("order_notify", e);
thread.sleep(5000); // 异常休眠避免循环异常
}
}
}
private void initconsumergroup(string streamkey, string groupname) {
try {
jedis.xgroupcreate(streamkey, groupname, streamentryid.last_entry, true);
} catch (redisbusyexception e) {
log.info("消费者组已存在: {}", groupname);
}
}// 库存服务(消费者)完整实现
public void handleinventory() {
// 初始化消费者组
initconsumergroup("redis:mq:stream:order_create", "order_group");
while (!thread.currentthread().isinterrupted()) {
try {
map<string, list<streamentry>> messages = jedis.xreadgroup(
"order_group",
"inventory_consumer_" + instanceid,
1,
5000,
false,
map.of("redis:mq:stream:order_create", streamentryid.unreceived_entry)
);
if (messages != null && !messages.isempty()) {
for (streamentry entry : messages.get("redis:mq:stream:order_create")) {
map<string, string> content = entry.getfields();
string productid = content.get("product_id");
int quantity = integer.parseint(content.get("quantity"));
string orderid = content.get("order_id");
// 1. 扣减库存(带事务)
boolean success = inventoryservice.deductwithlog(
productid,
quantity,
orderid,
"order_deduction"
);
if (success) {
// 2. 确认消息
jedis.xack("redis:mq:stream:order_create", "order_group", entry.getid());
monitorservice.recordsuccess("inventory_deduct");
} else {
// 3. 记录失败日志
log.warn("库存扣减失败 orderid={}, productid={}", orderid, productid);
monitorservice.recordfailure("inventory_deduct");
// 不确认消息,让其进入pending状态
}
}
}
} catch (exception e) {
log.error("库存处理异常", e);
monitorservice.recorderror("inventory_deduct", e);
thread.sleep(5000);
}
}
}
// 库存扣减服务方法
@transactional
public boolean deductwithlog(string productid, int quantity, string bizid, string biztype) {
// 1. 扣减库存
int affected = inventorymapper.deductwithversion(
productid,
quantity,
getcurrentversion(productid)
);
if (affected == 0) {
return false;
}
// 2. 记录操作流水
inventorylog log = new inventorylog();
log.setlogid(uuid.randomuuid().tostring());
log.setproductid(productid);
log.setchangedamount(-quantity);
log.setbizid(bizid);
log.setbiztype(biztype);
log.setremarks("订单扣减");
inventorylogmapper.insert(log);
return true;
}监控指标
xlen redis:mq:stream:order_createxpending redis:mq:stream:order_create order_group运维命令示例
# 查看消费者组信息 xinfo groups redis:mq:stream:order_create # 处理死信消息 xrange dlq:order_create - + count 10 xack dlq:order_create manual_group <entry_id>
自动恢复方案
到此这篇关于redis 是如何实现消息队列的?的文章就介绍到这了,更多相关redis消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论