0人参与 • 2026-03-19 • Java
在现代分布式系统中,消息队列是实现系统解耦、异步处理和流量削峰的重要组件。redis stream作为redis 5.0引入的新数据类型,提供了完整的消息队列功能,成为轻量级消息中间件的优秀选择。
早期我使用 redis pubsub 的方式实现了消息订阅,但是在使用过程,发现如果部署多实例将会重复处理消息事件,导致业务重复处理的情况。
根本原因是 redis pubsub 都是直接广播,无法控制多实例重复订阅消息的情况。
为了解决这个问题,则需要改动 redis stream 的方式来构建消息队列。
以下我分别三个案例逐步实践,达到一个可以适应生产环境要求的成熟方案:
redis stream是redis 5.0版本专门为消息队列场景设计的数据结构,它借鉴了kafka的设计理念,提供了消息持久化、消费者组和消息确认机制等核心功能。
# 自动生成消息id xadd mystream * name "订单创建" order_id "1001" amount "299.99" # 限制stream最大长度(保留最新1000条消息) xadd mystream maxlen 1000 * name "订单支付" order_id "1001"
# 查询所有消息 xrange mystream - + # 分页查询,每次返回10条 xrange mystream - + count 10 # 反向查询(从新到旧) xrevrange mystream + - count 5
# 阻塞监听新消息(0表示不超时) xread block 0 streams mystream $
<parent>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-parent</artifactid>
<version>2.7.18</version>
<relativepath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
<dependency>
<groupid>org.apache.commons</groupid>
<artifactid>commons-pool2</artifactid>
</dependency>
</dependencies># redis 配置
spring:
redis:
host: localhost
port: 6379
database: 0
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
# redis stream 消费组配置
app:
stream:
key: "order_stream"
group: "order_group"package com.lijw.mp.event;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.data.redis.connection.stream.recordid;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.stereotype.service;
import java.util.map;
import java.util.uuid;
@service
@slf4j
public class messageproducerservice {
@autowired
private stringredistemplate stringredistemplate;
@value("${app.stream.key}")
private string streamkey;
/**
* 发送消息到redis stream
*/
public string sendmessage(string messagetype, map<string, string> data) {
try {
// 添加消息类型和时间戳
data.put("messagetype", messagetype);
data.put("timestamp", string.valueof(system.currenttimemillis()));
data.put("messageid", uuid.randomuuid().tostring());
recordid messageid = stringredistemplate.opsforstream()
.add(streamkey, data);
log.info("消息发送成功: {}", messageid);
return messageid.tostring();
} catch (exception e) {
log.error("消息发送失败: {}", e.getmessage());
throw new runtimeexception("消息发送失败", e);
}
}
}package com.lijw.mp.event;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.data.redis.connection.stream.maprecord;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.data.redis.stream.streamlistener;
import org.springframework.stereotype.component;
import java.time.duration;
import java.util.map;
@component
@slf4j
public class messageconsumerservice implements streamlistener<string, maprecord<string, string, string>> {
@autowired
private stringredistemplate stringredistemplate;
@value("${app.stream.group}")
private string groupname;
@value("${app.stream.key}")
private string streamkey;
@override
public void onmessage(maprecord<string, string, string> message) {
string messageid = message.getid().tostring();
map<string, string> messagebody = message.getvalue();
// 幂等性检查:防止重复处理[1](@ref)
if (ismessageprocessed(messageid)) {
log.info("消息已处理,跳过: {}", messageid);
acknowledgemessage(message);
return;
}
try {
log.info("消费者收到消息 - id: {}, 内容: {}", messageid, messagebody);
// 处理业务逻辑
boolean processsuccess = processbusiness(messagebody);
if (processsuccess) {
// 标记消息已处理
markmessageprocessed(messageid);
// 清除重试次数
clearretrycount(messageid);
// 手动确认消息
acknowledgemessage(message);
log.info("消息处理完成: {}", messageid);
} else {
log.error("业务处理失败,消息将重试: {}", messageid);
handleretry(messageid, messagebody, message, "业务处理失败");
}
} catch (exception e) {
log.error("消息处理异常: {}", messageid, e);
handleretry(messageid, messagebody, message, "消息处理异常");
}
}
/**
* 幂等性检查
*/
private boolean ismessageprocessed(string messageid) {
// 使用redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
return stringredistemplate.opsforvalue().get("processed:" + messageid) != null;
}
/**
* 标记消息已处理
*/
private void markmessageprocessed(string messageid) {
// 使用redis存储事件处理id,设置过期时间
stringredistemplate.opsforvalue().set("processed:" + messageid, "1",
duration.ofhours(24));
}
/**
* 业务处理逻辑
*/
private boolean processbusiness(map<string, string> messagebody) {
try {
string messagetype = messagebody.get("messagetype");
string orderid = messagebody.get("orderid");
log.info("处理{}消息,订单id: {}", messagetype, orderid);
// 模拟业务处理
thread.sleep(5000);
return true;
} catch (exception e) {
log.error("业务处理异常", e);
return false;
}
}
/**
* 手动确认消息
*/
private void acknowledgemessage(maprecord<string, string, string> message) {
try {
stringredistemplate.opsforstream()
.acknowledge(groupname, message);
} catch (exception e) {
log.error("消息确认失败: {}", message.getid(), e);
}
}
/**
* 增加重试次数
*/
private int incrementretrycount(string messageid) {
string retrykey = "retry:count:" + messageid;
string countstr = stringredistemplate.opsforvalue().get(retrykey);
int retrycount = countstr == null ? 0 : integer.parseint(countstr);
retrycount++;
// 设置重试次数,过期时间为24小时
stringredistemplate.opsforvalue().set(retrykey, string.valueof(retrycount), duration.ofhours(24));
return retrycount;
}
/**
* 清除重试次数
*/
private void clearretrycount(string messageid) {
string retrykey = "retry:count:" + messageid;
stringredistemplate.delete(retrykey);
}
/**
* 处理消息重试逻辑
*/
private void handleretry(string messageid, map<string, string> messagebody,
maprecord<string, string, string> message, string errordescription) {
// 记录重试次数
int retrycount = incrementretrycount(messageid);
// 检查是否超过最大重试次数
int maxretrycount = 3; // 最大重试次数,可根据业务需求调整
if (retrycount >= maxretrycount) {
log.error("消息{}重试次数已达上限({}),将停止重试并记录: {}", errordescription, maxretrycount, messageid);
// 记录失败消息到死信队列或告警(可根据业务需求实现)
handlemaxretryexceeded(messageid, messagebody, retrycount);
// 确认消息,避免无限重试
acknowledgemessage(message);
} else {
log.warn("消息{},当前重试次数: {}/{}, 消息id: {}", errordescription, retrycount, maxretrycount, messageid);
// 不确认消息,等待重试
}
}
/**
* 处理超过最大重试次数的消息
*/
private void handlemaxretryexceeded(string messageid, map<string, string> messagebody, int retrycount) {
try {
// 记录失败消息详情(可根据业务需求实现,如存储到数据库、发送告警等)
string failedkey = "failed:message:" + messageid;
string failedinfo = string.format("消息id: %s, 重试次数: %d, 消息内容: %s, 失败时间: %s",
messageid, retrycount, messagebody, system.currenttimemillis());
stringredistemplate.opsforvalue().set(failedkey, failedinfo, duration.ofdays(7));
log.error("失败消息已记录: {}", failedinfo);
// todo: 可根据业务需求添加其他处理逻辑,如:
// 1. 发送告警通知
// 2. 存储到数据库死信表
// 3. 发送到死信队列
} catch (exception e) {
log.error("处理超过最大重试次数消息异常: {}", messageid, e);
}
}
}package com.lijw.mp.config.redisstream;
import com.lijw.mp.event.messageconsumerservice;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.boot.context.event.applicationreadyevent;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.context.event.eventlistener;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.connection.stream.consumer;
import org.springframework.data.redis.connection.stream.maprecord;
import org.springframework.data.redis.connection.stream.readoffset;
import org.springframework.data.redis.connection.stream.streamoffset;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.data.redis.stream.streammessagelistenercontainer;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
import java.net.inetaddress;
import java.time.duration;
import java.lang.management.managementfactory;
import java.util.uuid;
import java.util.concurrent.executorservice;
@configuration
@slf4j
public class redisstreamconfig {
@autowired
private redisconnectionfactory redisconnectionfactory;
@autowired
private stringredistemplate stringredistemplate;
@value("${app.stream.key}")
private string streamkey;
@value("${app.stream.group}")
private string groupname;
/**
* 创建消费者组(如果不存在)
*/
@eventlistener(applicationreadyevent.class)
public void createconsumergroup() {
try {
stringredistemplate.opsforstream()
.creategroup(streamkey, groupname);
log.info("创建消费者组成功: {}", groupname);
} catch (exception e) {
log.info("消费者组已存在: {}", e.getmessage());
}
}
/**
* 配置stream消息监听容器
*/
@bean
public streammessagelistenercontainer<string, maprecord<string, string, string>>
streammessagelistenercontainer(messageconsumerservice messageconsumerservice) {
// 容器配置
streammessagelistenercontainer<string, maprecord<string, string, string>> container =
streammessagelistenercontainer.create(redisconnectionfactory,
streammessagelistenercontainer.streammessagelistenercontaineroptions.builder()
.polltimeout(duration.ofseconds(2))
.batchsize(10) // 批量处理提高性能
.executor(createthreadpool()) // 自定义线程池
.build());
// 为每个实例生成唯一消费者名称
string consumername = generateuniqueconsumername();
// 配置消费偏移量
streamoffset<string> offset = streamoffset.create(streamkey, readoffset.lastconsumed());
// 创建消费者
consumer consumer = consumer.from(groupname, consumername);
// 构建读取请求(手动确认模式)
streammessagelistenercontainer.streamreadrequest<string> request =
streammessagelistenercontainer.streamreadrequest.builder(offset)
.consumer(consumer)
.autoacknowledge(false) // 手动确认确保可靠性[2](@ref)
.cancelonerror(e -> false) // 错误时不停止消费
.build();
container.register(request, messageconsumerservice);
container.start();
log.info("redis stream消费者启动成功 - 消费者名称: {}", consumername);
return container;
}
/**
* 生成唯一消费者名称(支持多实例部署的关键)
* 使用ip+进程id确保集群环境下唯一性
*/
private string generateuniqueconsumername() {
try {
string hostaddress = inetaddress.getlocalhost().gethostaddress();
string processid = managementfactory.getruntimemxbean().getname().split("@")[0];
return hostaddress + "_" + processid + "_" + system.currenttimemillis();
} catch (exception e) {
// fallback:使用uuid
return "consumer_" + uuid.randomuuid().tostring().substring(0, 8);
}
}
/**
* 创建专用线程池
*/
private executorservice createthreadpool() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
executor.setcorepoolsize(2);
executor.setmaxpoolsize(5);
executor.setqueuecapacity(100);
executor.setthreadnameprefix("redis-stream-");
executor.setdaemon(true);
executor.initialize();
return executor.getthreadpoolexecutor();
}
}package com.lijw.mp.config.redisstream;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.data.redis.connection.stream.pendingmessage;
import org.springframework.data.redis.connection.stream.pendingmessages;
import org.springframework.data.redis.connection.stream.pendingmessagessummary;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.scheduling.annotation.scheduled;
import org.springframework.stereotype.component;
import java.time.duration;
@component
@slf4j
public class pendingmessageretryservice {
@autowired
private stringredistemplate stringredistemplate;
@value("${app.stream.key}")
private string streamkey;
@value("${app.stream.group}")
private string groupname;
/**
* 定时处理未确认的消息
* 执行时机:每30秒执行一次(通过@scheduled注解配置)
*/
@scheduled(fixeddelay = 30000) // 每30秒执行一次
public void retrypendingmessages() {
try {
// 获取待处理消息摘要
pendingmessagessummary pendingsummary = stringredistemplate.opsforstream()
.pending(streamkey, groupname);
if (pendingsummary != null) {
log.info("检查待处理消息,消费者组: {}", groupname);
// todo: 根据spring data redis版本,使用正确的api获取详细的pending消息列表
// 例如:使用pendingrange方法或其他方法获取pendingmessages
// 获取到pendingmessages后,调用processpendingmessages方法进行处理
//
// 示例调用(需要根据实际api调整):
// pendingmessages pendingmessages = stringredistemplate.opsforstream()
// .pendingrange(streamkey, groupname, ...);
// if (pendingmessages != null && pendingmessages.size() > 0) {
// processpendingmessages(pendingmessages);
// }
}
} catch (exception e) {
log.error("处理待处理消息异常", e);
}
}
/**
* 处理待处理消息列表
*
* 执行时机:
* 1. 由 retrypendingmessages() 定时任务调用(每30秒执行一次)
* 2. 当 retrypendingmessages() 获取到 pendingmessages 列表后调用
* 3. 用于处理redis stream中未被确认(ack)的消息
*
* 处理逻辑:
* - 遍历每条pending消息
* - 记录消息id和消费者名称
* - 检查消息空闲时间,如果超过阈值则重新分配
*/
private void processpendingmessages(pendingmessages pendingmessages) {
pendingmessages.foreach(message -> {
string messageid = message.getid().tostring();
string consumername = message.getconsumername();
// 注意:根据spring data redis版本,pendingmessage的api可能不同
// 获取空闲时间的方法名可能是getidletimems()、getelapsedtimems()等
// 这里提供基础框架,需要根据实际api调整
log.info("处理待处理消息: {}, 消费者: {}", messageid, consumername);
// 如果消息空闲时间超过阈值(如5分钟),重新分配
// 示例逻辑(需要根据实际api调整):
// duration idletime = duration.ofmillis(message.getidletimems());
// if (idletime.tominutes() > 5) {
// log.info("重新分配超时消息: {}, 原消费者: {}, 空闲时间: {}分钟",
// messageid, consumername, idletime.tominutes());
// // 可以使用xclaim命令将消息重新分配给其他消费者
// // stringredistemplate.opsforstream().claim(...);
// }
});
}
}package com.lijw.mp.controller;
import com.lijw.mp.event.messageproducerservice;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.http.responseentity;
import org.springframework.web.bind.annotation.*;
import java.util.hashmap;
import java.util.map;
@restcontroller
@requestmapping("/api/message")
@slf4j
public class messagecontroller {
@autowired
private messageproducerservice messageproducerservice;
@postmapping("/send-order")
public responseentity<map<string, object>> sendordermessage(
@requestparam string orderid,
@requestparam string amount) {
map<string, string> message = new hashmap<>();
message.put("orderid", orderid);
message.put("amount", amount);
message.put("messagetype", "order_created");
try {
string messageid = messageproducerservice.sendmessage("order", message);
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid);
result.put("timestamp", system.currenttimemillis());
return responseentity.ok(result);
} catch (exception e) {
log.error("发送消息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(500).body(result);
}
}
@postmapping("/send-custom")
public responseentity<map<string, object>> sendcustommessage(
@requestbody map<string, string> messagedata) {
try {
string messageid = messageproducerservice.sendmessage("custom", messagedata);
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid);
return responseentity.ok(result);
} catch (exception e) {
log.error("发送自定义消息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(500).body(result);
}
}
}
post http://localhost:8083/api/message/send-order orderid=order123456 amount=500

post http://localhost:8083/api/message/send-custom
{
"key1": "value1",
"key2": "valuie2"
}实例1
server: port: 8083
实例2
server: port: 8084

发送多条事件消息
实例1

实例2

可以看到 1764271408044-0 只在实例2处理,并没有在实例1处理。说明多个实例并不会重复消费同一事件。
配置多个消费组key
# redis 配置
spring:
redis:
host: localhost
port: 6379
database: 0
lettuce:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
# redis stream 配置多个消费组key
app:
stream:
groups:
- key: "order_stream"
group: "order_group"
consumer-prefix: "consumer_"
- key: "payment_stream"
group: "payment_group"
consumer-prefix: "consumer_"
- key: "notification_stream"
group: "notification_group"
consumer-prefix: "consumer_"package com.lijw.mp.config.redisstream;
import lombok.data;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.stereotype.component;
import java.util.list;
/**
* redis stream消费组配置属性
* - 读取 app.stream 配置前缀下的所有配置项
*
* @author aron.li
* @date 2025/11/30 11:21
*/
@data
@component
@configurationproperties(prefix = "app.stream")
public class streamgroupproperties {
/**
* 消费组列表
*/
private list<streamgroupconfig> groups;
/**
* 单个消费组配置
*/
@data
public static class streamgroupconfig {
/**
* stream键名
*/
private string key;
/**
* 消费组名称
*/
private string group;
/**
* 消费者名称前缀
*/
private string consumerprefix;
}
}package com.lijw.mp.event;
import com.lijw.mp.config.redisstream.streamgroupproperties;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.connection.stream.recordid;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.stereotype.service;
import java.util.map;
import java.util.uuid;
/**
* redis stream消息生产者服务
* 负责向redis stream发送消息
*/
@service
@slf4j
public class messageproducerservice {
/**
* redis字符串模板
*/
@autowired
private stringredistemplate stringredistemplate;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 发送消息到redis stream(根据消息类型自动选择stream)
*
* @param messagetype 消息类型
* @param data 消息数据
* @return 消息id
*/
public string sendmessage(string messagetype, map<string, string> data) {
// 根据消息类型自动选择stream key
string streamkey = getstreamkeybymessagetype(messagetype);
return sendmessage(streamkey, messagetype, data);
}
/**
* 发送消息到指定的redis stream
*
* @param streamkey stream键名
* @param messagetype 消息类型
* @param data 消息数据
* @return 消息id
*/
public string sendmessage(string streamkey, string messagetype, map<string, string> data) {
try {
// 添加消息类型和时间戳
data.put("messagetype", messagetype);
data.put("timestamp", string.valueof(system.currenttimemillis()));
data.put("messageid", uuid.randomuuid().tostring());
recordid messageid = stringredistemplate.opsforstream()
.add(streamkey, data);
log.info("消息发送成功 - stream: {}, messageid: {}", streamkey, messageid);
return messageid.tostring();
} catch (exception e) {
log.error("消息发送失败 - stream: {}", streamkey, e);
throw new runtimeexception("消息发送失败", e);
}
}
/**
* 根据消息类型获取对应的stream key
*
* @param messagetype 消息类型
* @return stream键名
*/
private string getstreamkeybymessagetype(string messagetype) {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
throw new runtimeexception("未配置任何消费组");
}
// 根据消息类型前缀匹配stream key
string uppermessagetype = messagetype.touppercase();
if (uppermessagetype.contains("order")) {
return findstreamkey("order");
} else if (uppermessagetype.contains("payment") || uppermessagetype.contains("pay")) {
return findstreamkey("payment");
} else if (uppermessagetype.contains("notification") || uppermessagetype.contains("notify")) {
return findstreamkey("notification");
} else {
// 默认使用第一个stream
log.warn("未匹配到消息类型对应的stream,使用默认stream: {}", messagetype);
return streamgroupproperties.getgroups().get(0).getkey();
}
}
/**
* 根据关键字查找stream key
*
* @param keyword 关键字
* @return stream键名
*/
private string findstreamkey(string keyword) {
if (streamgroupproperties.getgroups() != null) {
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
if (groupconfig.getkey().tolowercase().contains(keyword.tolowercase())) {
return groupconfig.getkey();
}
}
}
// 如果找不到,返回第一个stream
if (streamgroupproperties.getgroups() != null && !streamgroupproperties.getgroups().isempty()) {
return streamgroupproperties.getgroups().get(0).getkey();
}
throw new runtimeexception("未找到匹配的stream key: " + keyword);
}
}package com.lijw.mp.event;
import com.lijw.mp.config.redisstream.streamgroupproperties;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.connection.stream.maprecord;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.data.redis.stream.streamlistener;
import org.springframework.stereotype.component;
import java.time.duration;
import java.util.map;
import java.util.concurrent.concurrenthashmap;
/**
* redis stream消息消费者服务
* 负责消费redis stream中的消息并进行业务处理
*/
@component
@slf4j
public class messageconsumerservice implements streamlistener<string, maprecord<string, string, string>> {
/**
* redis字符串模板
*/
@autowired
private stringredistemplate stringredistemplate;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 缓存stream key到group name的映射关系
*/
private final map<string, string> streamtogroupmap = new concurrenthashmap<>();
/**
* 消息消费处理方法
* 当redis stream中有新消息时,会调用此方法
*
* @param message stream消息记录
*/
@override
public void onmessage(maprecord<string, string, string> message) {
string messageid = message.getid().tostring();
string streamkey = message.getstream();
map<string, string> messagebody = message.getvalue();
// 获取对应的消费组名称
string groupname = getgroupnamebystreamkey(streamkey);
if (groupname == null) {
log.error("未找到stream key对应的消费组: {}, 消息id: {}", streamkey, messageid);
return;
}
// 幂等性检查:防止重复处理
if (ismessageprocessed(messageid)) {
log.info("消息已处理,跳过: {}", messageid);
acknowledgemessage(message, groupname);
return;
}
try {
log.info("消费者收到消息 - stream: {}, group: {}, id: {}, 内容: {}", streamkey, groupname, messageid, messagebody);
// 处理业务逻辑(根据stream key路由到不同的业务处理)
boolean processsuccess = processbusiness(streamkey, messagebody);
if (processsuccess) {
// 标记消息已处理
markmessageprocessed(messageid);
// 清除重试次数
clearretrycount(messageid);
// 手动确认消息
acknowledgemessage(message, groupname);
log.info("消息处理完成: {}", messageid);
} else {
log.error("业务处理失败,消息将重试: {}", messageid);
handleretry(messageid, messagebody, message, groupname, "业务处理失败");
}
} catch (exception e) {
log.error("消息处理异常: {}", messageid, e);
handleretry(messageid, messagebody, message, groupname, "消息处理异常");
}
}
/**
* 根据stream key获取对应的消费组名称
*
* @param streamkey stream键名
* @return 消费组名称,如果未找到返回null
*/
private string getgroupnamebystreamkey(string streamkey) {
// 先从缓存中获取
if (streamtogroupmap.containskey(streamkey)) {
return streamtogroupmap.get(streamkey);
}
// 从配置中查找
if (streamgroupproperties.getgroups() != null) {
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
if (groupconfig.getkey().equals(streamkey)) {
streamtogroupmap.put(streamkey, groupconfig.getgroup());
return groupconfig.getgroup();
}
}
}
return null;
}
/**
* 幂等性检查
* 使用redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
*
* @param messageid 消息id
* @return 如果消息已处理返回true,否则返回false
*/
private boolean ismessageprocessed(string messageid) {
// 使用redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
return stringredistemplate.opsforvalue().get("processed:" + messageid) != null;
}
/**
* 标记消息已处理
* 使用redis存储事件处理id,设置过期时间
*
* @param messageid 消息id
*/
private void markmessageprocessed(string messageid) {
// 使用redis存储事件处理id,设置过期时间
stringredistemplate.opsforvalue().set("processed:" + messageid, "1",
duration.ofhours(24));
}
/**
* 业务处理逻辑(根据stream key路由到不同的业务处理)
*/
private boolean processbusiness(string streamkey, map<string, string> messagebody) {
try {
// 根据stream key路由到不同的业务处理
if (streamkey.contains("order")) {
return processorderbusiness(messagebody);
} else if (streamkey.contains("payment")) {
return processpaymentbusiness(messagebody);
} else if (streamkey.contains("notification")) {
return processnotificationbusiness(messagebody);
} else {
log.warn("未识别的stream key: {}, 使用默认处理逻辑", streamkey);
return processdefaultbusiness(messagebody);
}
} catch (exception e) {
log.error("业务处理异常", e);
return false;
}
}
/**
* 处理订单业务
*/
private boolean processorderbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
string orderid = messagebody.get("orderid");
log.info("处理订单{}消息,订单id: {}", messagetype, orderid);
// todo: 实现订单业务处理逻辑
try {
// 模拟10秒处理业务
thread.sleep(10000);
} catch (interruptedexception e) {
throw new runtimeexception(e);
}
return true;
}
/**
* 处理支付业务
*/
private boolean processpaymentbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
string paymentid = messagebody.get("paymentid");
log.info("处理支付{}消息,支付id: {}", messagetype, paymentid);
// todo: 实现支付业务处理逻辑
return true;
}
/**
* 处理通知业务
*/
private boolean processnotificationbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
string userid = messagebody.get("userid");
log.info("处理通知{}消息,用户id: {}", messagetype, userid);
// todo: 实现通知业务处理逻辑
return true;
}
/**
* 默认业务处理
*/
private boolean processdefaultbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
log.info("处理默认业务消息: {}", messagetype);
// todo: 实现默认业务处理逻辑
return true;
}
/**
* 手动确认消息
*
* @param message 消息记录
* @param groupname 消费组名称
*/
private void acknowledgemessage(maprecord<string, string, string> message, string groupname) {
try {
stringredistemplate.opsforstream()
.acknowledge(groupname, message);
} catch (exception e) {
log.error("消息确认失败: {}", message.getid(), e);
}
}
/**
* 增加重试次数
*
* @param messageid 消息id
* @return 当前重试次数
*/
private int incrementretrycount(string messageid) {
string retrykey = "retry:count:" + messageid;
string countstr = stringredistemplate.opsforvalue().get(retrykey);
int retrycount = countstr == null ? 0 : integer.parseint(countstr);
retrycount++;
// 设置重试次数,过期时间为24小时
stringredistemplate.opsforvalue().set(retrykey, string.valueof(retrycount), duration.ofhours(24));
return retrycount;
}
/**
* 清除重试次数
*
* @param messageid 消息id
*/
private void clearretrycount(string messageid) {
string retrykey = "retry:count:" + messageid;
stringredistemplate.delete(retrykey);
}
/**
* 处理消息重试逻辑
*
* @param messageid 消息id
* @param messagebody 消息内容
* @param message 消息记录
* @param groupname 消费组名称
* @param errordescription 错误描述
*/
private void handleretry(string messageid, map<string, string> messagebody,
maprecord<string, string, string> message, string groupname, string errordescription) {
// 记录重试次数
int retrycount = incrementretrycount(messageid);
// 检查是否超过最大重试次数
int maxretrycount = 3; // 最大重试次数,可根据业务需求调整
if (retrycount >= maxretrycount) {
log.error("消息{}重试次数已达上限({}),将停止重试并记录: {}", errordescription, maxretrycount, messageid);
// 记录失败消息到死信队列或告警(可根据业务需求实现)
handlemaxretryexceeded(messageid, messagebody, retrycount);
// 确认消息,避免无限重试
acknowledgemessage(message, groupname);
} else {
log.warn("消息{},当前重试次数: {}/{}, 消息id: {}", errordescription, retrycount, maxretrycount, messageid);
// 不确认消息,等待重试
}
}
/**
* 处理超过最大重试次数的消息
* 记录失败消息详情,可根据业务需求扩展(如存储到数据库、发送告警等)
*
* @param messageid 消息id
* @param messagebody 消息内容
* @param retrycount 重试次数
*/
private void handlemaxretryexceeded(string messageid, map<string, string> messagebody, int retrycount) {
try {
// 记录失败消息详情(可根据业务需求实现,如存储到数据库、发送告警等)
string failedkey = "failed:message:" + messageid;
string failedinfo = string.format("消息id: %s, 重试次数: %d, 消息内容: %s, 失败时间: %s",
messageid, retrycount, messagebody, system.currenttimemillis());
stringredistemplate.opsforvalue().set(failedkey, failedinfo, duration.ofdays(7));
log.error("失败消息已记录: {}", failedinfo);
// todo: 可根据业务需求添加其他处理逻辑,如:
// 1. 发送告警通知
// 2. 存储到数据库死信表
// 3. 发送到死信队列
} catch (exception e) {
log.error("处理超过最大重试次数消息异常: {}", messageid, e);
}
}
}package com.lijw.mp.config.redisstream;
import com.lijw.mp.event.messageconsumerservice;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.context.event.applicationreadyevent;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.context.event.eventlistener;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.connection.stream.consumer;
import org.springframework.data.redis.connection.stream.maprecord;
import org.springframework.data.redis.connection.stream.readoffset;
import org.springframework.data.redis.connection.stream.streamoffset;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.data.redis.stream.streammessagelistenercontainer;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
import java.net.inetaddress;
import java.time.duration;
import java.lang.management.managementfactory;
import java.util.arraylist;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.uuid;
import java.util.concurrent.executorservice;
/**
* redis stream配置类
* 负责创建消费组、配置消息监听容器等
*/
@configuration
@slf4j
public class redisstreamconfig {
/**
* redis连接工厂
*/
@autowired
private redisconnectionfactory redisconnectionfactory;
/**
* redis字符串模板
*/
@autowired
private stringredistemplate stringredistemplate;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 应用启动完成后,验证所有消费组是否已创建
* 主要用于日志记录和验证
*/
@eventlistener(applicationreadyevent.class)
public void verifyconsumergroups() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
log.warn("未配置任何消费组");
return;
}
log.info("验证所有消费组状态...");
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
// 验证消费组是否存在
stringredistemplate.opsforstream().pending(groupconfig.getkey(), groupconfig.getgroup());
log.info("消费组验证成功: stream={}, group={}", groupconfig.getkey(), groupconfig.getgroup());
} catch (exception e) {
log.warn("消费组验证失败,尝试重新创建: stream={}, group={}",
groupconfig.getkey(), groupconfig.getgroup());
// 尝试重新创建
try {
ensurestreamexists(groupconfig.getkey());
stringredistemplate.opsforstream()
.creategroup(groupconfig.getkey(), groupconfig.getgroup());
log.info("重新创建消费者组成功: stream={}, group={}",
groupconfig.getkey(), groupconfig.getgroup());
} catch (exception ex) {
log.error("重新创建消费者组失败: stream={}, group={}, error={}",
groupconfig.getkey(), groupconfig.getgroup(), ex.getmessage());
}
}
}
}
/**
* 确保stream存在,如果不存在则创建
*
* @param streamkey stream键名
*/
private void ensurestreamexists(string streamkey) {
try {
// 检查stream是否存在
long size = stringredistemplate.opsforstream().size(streamkey);
if (size == null || size == 0) {
// stream不存在,创建一个空消息然后立即删除,以创建stream
// 或者直接使用xgroup create的mkstream选项
// 由于spring data redis可能不支持mkstream,我们通过添加一条临时消息来创建stream
map<string, string> tempdata = new hashmap<>();
tempdata.put("_init", "true");
tempdata.put("_timestamp", string.valueof(system.currenttimemillis()));
try {
stringredistemplate.opsforstream().add(streamkey, tempdata);
log.debug("创建stream成功: {}", streamkey);
} catch (exception ex) {
log.debug("stream可能已存在或创建失败: {}, {}", streamkey, ex.getmessage());
}
}
} catch (exception e) {
// stream不存在,创建它
try {
map<string, string> tempdata = new hashmap<>();
tempdata.put("_init", "true");
tempdata.put("_timestamp", string.valueof(system.currenttimemillis()));
stringredistemplate.opsforstream().add(streamkey, tempdata);
log.info("创建stream成功: {}", streamkey);
} catch (exception ex) {
log.warn("创建stream失败: {}, {}", streamkey, ex.getmessage());
}
}
}
/**
* 为每个消费组配置stream消息监听容器
* 在创建监听容器之前,先确保所有消费组都已创建
*/
@bean
public list<streammessagelistenercontainer<string, maprecord<string, string, string>>>
streammessagelistenercontainers(messageconsumerservice messageconsumerservice) {
list<streammessagelistenercontainer<string, maprecord<string, string, string>>> containers = new arraylist<>();
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
log.warn("未配置任何消费组,无法创建监听容器");
return containers;
}
// 在创建监听容器之前,先确保所有消费组都已创建
createallconsumergroups();
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
streammessagelistenercontainer<string, maprecord<string, string, string>> container =
createlistenercontainer(groupconfig, messageconsumerservice);
containers.add(container);
}
return containers;
}
/**
* 创建所有消费者组(如果不存在)
* 如果stream不存在,会先创建stream
*/
private void createallconsumergroups() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
log.warn("未配置任何消费组");
return;
}
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
// 先检查stream是否存在,如果不存在则创建
ensurestreamexists(groupconfig.getkey());
// 创建消费组
stringredistemplate.opsforstream()
.creategroup(groupconfig.getkey(), groupconfig.getgroup());
log.info("创建消费者组成功: stream={}, group={}", groupconfig.getkey(), groupconfig.getgroup());
} catch (exception e) {
// 如果消费组已存在,这是正常情况
string errormsg = e.getmessage() != null ? e.getmessage() : "";
if (errormsg.contains("busygroup") || errormsg.contains("already exists")) {
log.info("消费者组已存在: stream={}, group={}", groupconfig.getkey(), groupconfig.getgroup());
} else {
log.warn("创建消费者组失败: stream={}, group={}, error={}",
groupconfig.getkey(), groupconfig.getgroup(), errormsg);
// 即使创建失败,也继续处理其他消费组
}
}
}
}
/**
* 为单个消费组创建监听容器
*
* @param groupconfig 消费组配置
* @param messageconsumerservice 消息消费者服务
* @return stream消息监听容器
*/
private streammessagelistenercontainer<string, maprecord<string, string, string>>
createlistenercontainer(streamgroupproperties.streamgroupconfig groupconfig,
messageconsumerservice messageconsumerservice) {
// 容器配置
streammessagelistenercontainer<string, maprecord<string, string, string>> container =
streammessagelistenercontainer.create(redisconnectionfactory,
streammessagelistenercontainer.streammessagelistenercontaineroptions.builder()
.polltimeout(duration.ofseconds(2)) // 轮询超时时间
.batchsize(10) // 批量处理提高性能
.executor(createthreadpool()) // 自定义线程池
.build());
// 为每个实例生成唯一消费者名称
string consumername = generateuniqueconsumername(groupconfig.getconsumerprefix());
// 配置消费偏移量(从最后消费的位置开始)
streamoffset<string> offset = streamoffset.create(groupconfig.getkey(), readoffset.lastconsumed());
// 创建消费者
consumer consumer = consumer.from(groupconfig.getgroup(), consumername);
// 构建读取请求(手动确认模式)
streammessagelistenercontainer.streamreadrequest<string> request =
streammessagelistenercontainer.streamreadrequest.builder(offset)
.consumer(consumer)
.autoacknowledge(false) // 手动确认确保可靠性
.cancelonerror(e -> false) // 错误时不停止消费
.build();
// 注册监听器并启动容器
container.register(request, messageconsumerservice);
container.start();
log.info("redis stream消费者启动成功 - stream: {}, group: {}, consumer: {}",
groupconfig.getkey(), groupconfig.getgroup(), consumername);
return container;
}
/**
* 生成唯一消费者名称(支持多实例部署的关键)
* 使用ip+进程id确保集群环境下唯一性
*
* @param prefix 消费者名称前缀
* @return 唯一的消费者名称
*/
private string generateuniqueconsumername(string prefix) {
try {
string hostaddress = inetaddress.getlocalhost().gethostaddress();
string processid = managementfactory.getruntimemxbean().getname().split("@")[0];
return (prefix != null ? prefix : "consumer_") + hostaddress + "_" + processid + "_" + system.currenttimemillis();
} catch (exception e) {
// fallback:使用uuid
return (prefix != null ? prefix : "consumer_") + uuid.randomuuid().tostring().substring(0, 8);
}
}
/**
* 创建专用线程池
* 用于处理redis stream消息消费
*
* @return 线程池执行器
*/
private executorservice createthreadpool() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
executor.setcorepoolsize(2); // 核心线程数
executor.setmaxpoolsize(5); // 最大线程数
executor.setqueuecapacity(100); // 队列容量
executor.setthreadnameprefix("redis-stream-"); // 线程名前缀
executor.setdaemon(true); // 守护线程
executor.initialize();
return executor.getthreadpoolexecutor();
}
}当消息堵塞达到一定,则出发告警,提醒需要扩容实例服务
package com.lijw.mp.config.redisstream;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.connection.stream.pendingmessagessummary;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.scheduling.annotation.scheduled;
import org.springframework.stereotype.component;
/**
* 待处理消息告警服务
* 定时检查redis stream中未被确认(ack)的消息,仅用于监控和告警
* 注意:不进行实际的消息处理,通过增加消费实例来提升处理能力
*/
@component
@slf4j
public class pendingmessagealertservice {
/**
* redis字符串模板
*/
@autowired
private stringredistemplate stringredistemplate;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 定时监控未确认的消息
* 执行时机:每30秒执行一次(通过@scheduled注解配置)
* 仅用于监控和告警,不进行实际的消息处理
* 建议通过增加消费实例来提升处理能力
*/
@scheduled(fixeddelay = 30000) // 每30秒执行一次
public void monitorpendingmessages() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
return;
}
// 遍历所有配置的消费组
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
// 获取待处理消息摘要
pendingmessagessummary pendingsummary = stringredistemplate.opsforstream()
.pending(groupconfig.getkey(), groupconfig.getgroup());
if (pendingsummary != null && pendingsummary.gettotalpendingmessages() > 0) {
// 记录监控信息
logpendingmessageinfo(pendingsummary, groupconfig);
}
} catch (exception e) {
log.error("监控待处理消息异常 - stream: {}, group: {}",
groupconfig.getkey(), groupconfig.getgroup(), e);
}
}
}
/**
* 记录pending消息监控信息
* 根据pending消息数量进行不同级别的告警
*
* @param pendingsummary 待处理消息摘要
* @param groupconfig 消费组配置
*/
private void logpendingmessageinfo(pendingmessagessummary pendingsummary,
streamgroupproperties.streamgroupconfig groupconfig) {
long totalpending = pendingsummary.gettotalpendingmessages();
// 根据pending消息数量进行分级告警
if (totalpending > 2) {
// 严重告警:pending消息数量超过1000
log.error("【严重告警】待处理消息数量过多 - stream: {}, group: {}, 数量: {}, " +
"建议:1.检查消费者是否正常运行 2.增加消费实例 3.检查消息处理逻辑",
groupconfig.getkey(), groupconfig.getgroup(), totalpending);
} else if (totalpending > 1) {
// 警告:pending消息数量超过100
log.warn("【警告】待处理消息数量较多 - stream: {}, group: {}, 数量: {}, " +
"建议:考虑增加消费实例或检查消息处理性能",
groupconfig.getkey(), groupconfig.getgroup(), totalpending);
} else {
// 信息:pending消息数量正常
log.info("监控待处理消息 - stream: {}, group: {}, 数量: {}",
groupconfig.getkey(), groupconfig.getgroup(), totalpending);
}
}
}package com.lijw.mp.controller;
import com.lijw.mp.event.messageproducerservice;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.http.responseentity;
import org.springframework.web.bind.annotation.*;
import java.util.hashmap;
import java.util.map;
/**
* 测试发送redis stream事件消息
*/
@restcontroller
@requestmapping("/api/message")
@slf4j
public class messagecontroller {
@autowired
private messageproducerservice messageproducerservice;
/**
* 发送订单消息
*/
@postmapping("/send-order")
public responseentity<map<string, object>> sendordermessage(
@requestparam string orderid,
@requestparam string amount,
@requestparam(required = false) string streamkey) {
map<string, string> message = new hashmap<>();
message.put("orderid", orderid);
message.put("amount", amount);
message.put("messagetype", "order_created");
try {
string messageid;
if (streamkey != null && !streamkey.isempty()) {
// 指定stream key发送
messageid = messageproducerservice.sendmessage(streamkey, "order", message);
} else {
// 自动根据消息类型路由
messageid = messageproducerservice.sendmessage("order", message);
}
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid);
result.put("streamkey", streamkey != null ? streamkey : "auto-routed");
result.put("timestamp", system.currenttimemillis());
return responseentity.ok(result);
} catch (exception e) {
log.error("发送订单消息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(500).body(result);
}
}
/**
* 发送支付消息
*/
@postmapping("/send-payment")
public responseentity<map<string, object>> sendpaymentmessage(
@requestparam string paymentid,
@requestparam string amount,
@requestparam string status,
@requestparam(required = false) string streamkey) {
map<string, string> message = new hashmap<>();
message.put("paymentid", paymentid);
message.put("amount", amount);
message.put("status", status);
message.put("messagetype", "payment_" + status.touppercase());
try {
string messageid;
if (streamkey != null && !streamkey.isempty()) {
messageid = messageproducerservice.sendmessage(streamkey, "payment", message);
} else {
messageid = messageproducerservice.sendmessage("payment", message);
}
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid);
result.put("streamkey", streamkey != null ? streamkey : "auto-routed");
result.put("timestamp", system.currenttimemillis());
return responseentity.ok(result);
} catch (exception e) {
log.error("发送支付消息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(500).body(result);
}
}
/**
* 发送通知消息
*/
@postmapping("/send-notification")
public responseentity<map<string, object>> sendnotificationmessage(
@requestparam string userid,
@requestparam string title,
@requestparam string content,
@requestparam(required = false) string streamkey) {
map<string, string> message = new hashmap<>();
message.put("userid", userid);
message.put("title", title);
message.put("content", content);
message.put("messagetype", "notification");
try {
string messageid;
if (streamkey != null && !streamkey.isempty()) {
messageid = messageproducerservice.sendmessage(streamkey, "notification", message);
} else {
messageid = messageproducerservice.sendmessage("notification", message);
}
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid);
result.put("streamkey", streamkey != null ? streamkey : "auto-routed");
result.put("timestamp", system.currenttimemillis());
return responseentity.ok(result);
} catch (exception e) {
log.error("发送通知消息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(500).body(result);
}
}
/**
* 发送自定义消息(支持指定stream key和消息类型)
*/
@postmapping("/send-custom")
public responseentity<map<string, object>> sendcustommessage(
@requestbody map<string, string> messagedata,
@requestparam(required = false) string streamkey,
@requestparam(required = false) string messagetype) {
try {
// 如果未指定消息类型,从消息数据中获取或使用默认值
string msgtype = messagetype != null ? messagetype :
messagedata.getordefault("messagetype", "custom");
string messageid;
if (streamkey != null && !streamkey.isempty()) {
// 指定stream key发送
messageid = messageproducerservice.sendmessage(streamkey, msgtype, messagedata);
} else {
// 自动根据消息类型路由
messageid = messageproducerservice.sendmessage(msgtype, messagedata);
}
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid);
result.put("streamkey", streamkey != null ? streamkey : "auto-routed");
result.put("messagetype", msgtype);
result.put("timestamp", system.currenttimemillis());
return responseentity.ok(result);
} catch (exception e) {
log.error("发送自定义消息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(500).body(result);
}
}
}
post http://localhost:8083/api/message/send-order # 请求参数 streamkey:order_stream orderid:order123456 amount:500 status:ok

post http://localhost:8083/api/message/send-payment # 请求参数 streamkey:payment_stream paymentid:payid123456 amount:500 status:ok

post http://localhost:8083/api/message/send-notification # 请求参数 streamkey:notification_stream userid:userid123456 title:通知消息 content:通知内容

post http://localhost:8083/api/message/send-custom
# 请求参数
streamkey:custom_stream
# 请求体
{
"key1": "value1",
"key2": "valuie2"
}跟案例1的操作一致,以发送订单消息的事件作为验证示例
实例1
server: port: 8083
实例2
server: port: 8084
实例1

实例2

可以看到只有一个实例处理 1764865303698-0 事件,并没有被重复消费

模拟大量出现订阅消息,导致事件堵塞的情况。
# 查看当前redis的所有keys 172.17.0.6:6379> keys * 1) "payment_stream" 2) "mystream" 3) "notification_stream" 4) "custom_stream" 5) "order_stream" 172.17.0.6:6379>
xrange order_stream - +
# 查询消息如下:
140) 1) "1764898453817-0"
2) 1) "amount"
2) "500"
3) "messagetype"
4) "order"
5) "orderid"
6) "order123456"
7) "messageid"
8) "98e48bde-88df-46c1-a0a1-ac917ac14fa6"
9) "timestamp"
10) "1764898453753"
141) 1) "1764898468568-0"
2) 1) "amount"
2) "500"
3) "messagetype"
4) "order"
5) "orderid"
6) "order123456"
7) "messageid"
8) "93978d1c-d18e-4760-8657-8cf0042906a2"
9) "timestamp"
10) "1764898468507"
172.17.0.6:6379> # 分页查询,每次返回3条
xrange order_stream - + count 3
# 执行如下:
172.17.0.6:6379> xrange order_stream - + count 3
1) 1) "1764268213474-0"
2) 1) "source"
2) "test_controller"
3) "type"
4) "test_message"
5) "content"
6) "test msg"
7) "timestamp"
8) "1764268202906"
2) 1) "1764268564758-0"
2) 1) "amount"
2) "500"
3) "messagetype"
4) "order"
5) "orderid"
6) "order123456"
7) "messageid"
8) "eb1a1fc1-12a9-4a80-84f4-f219a05094b5"
9) "timestamp"
10) "1764268564695"
3) 1) "1764268699963-0"
2) 1) "amount"
2) "500"
3) "messagetype"
4) "order"
5) "orderid"
6) "order123456"
7) "messageid"
8) "419c1c9a-7bcb-4f4e-b08c-70e86c5e7387"
9) "timestamp"
10) "1764268693593"
172.17.0.6:6379> # 反向查询(从新到旧) xrevrange order_stream + - count 3
172.17.0.6:6379> xlen order_stream (integer) 141 172.17.0.6:6379>
上面的项目是通过 stringredistemplate 做队列消息的读取,有些时候项目不能直接使用。
@autowired private stringredistemplate stringredistemplate;
所以下一章节,我将会使用 redissonclient 来做替代实现的方法,增加符合生产环境使用,并且还会增加消息队列总数的监控、查询消息队列总数接口、清除消息队列的运维接口。
<parent>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-parent</artifactid>
<version>2.7.18</version>
<relativepath/> <!-- lookup parent from repository -->
</parent>
<!-- spring boot 2.7.18 兼容的 redisson 版本 -->
<!-- 注意:redisson 3.20.1需要spring boot 3.x,spring boot 2.7使用3.17.x版本 -->
<dependency>
<groupid>org.redisson</groupid>
<artifactid>redisson-spring-boot-starter</artifactid>
<version>3.17.7</version>
</dependency>配置多个消费组key,跟上一个案例的配置一致
# redis stream 配置多个消费组key
app:
stream:
groups:
- key: "order_stream"
group: "order_group"
consumer-prefix: "consumer_"
- key: "payment_stream"
group: "payment_group"
consumer-prefix: "consumer_"
- key: "notification_stream"
group: "notification_group"
consumer-prefix: "consumer_"
spring:
# redisson配置
redisson:
config: |
singleserverconfig:
address: "redis://127.0.0.1:6379"
database: 0
# 连接超时时间(毫秒),默认3000
connectiontimeout: 10000
# 命令执行超时时间(毫秒),默认3000,增加超时时间避免ping超时
timeout: 10000
# 空闲连接超时时间(毫秒),默认10000
idleconnectiontimeout: 10000
# ping连接间隔(毫秒),默认0(禁用),设置为0可以禁用ping避免超时
# 如果redis服务器稳定,可以设置为0禁用ping
pingconnectioninterval: 0
# 重试次数
retryattempts: 3
# 重试间隔(毫秒)
retryinterval: 1500
# 保持连接活跃
keepalive: true
# tcp无延迟
tcpnodelay: true
# 连接池配置
connectionpoolsize: 64
connectionminimumidlesize: 24
# 如果redis设置了密码,取消下面的注释并填写密码
# password: your_passwordpackage com.lijw.mp.config.redisstream;
import lombok.data;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.stereotype.component;
import java.util.list;
/**
* redis stream消费组配置属性
* - 读取 app.stream 配置前缀下的所有配置项
*
* @author aron.li
* @date 2025/11/30 11:21
*/
@data
@component
@configurationproperties(prefix = "app.stream")
public class streamgroupproperties {
/**
* 消费组列表
*/
private list<streamgroupconfig> groups;
/**
* 单个消费组配置
*/
@data
public static class streamgroupconfig {
/**
* stream键名
*/
private string key;
/**
* 消费组名称
*/
private string group;
/**
* 消费者名称前缀
*/
private string consumerprefix;
}
}package com.lijw.mp.event;
import com.lijw.mp.config.redisstream.streamgroupproperties;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rstream;
import org.redisson.api.redissonclient;
import org.redisson.api.streammessageid;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import java.util.map;
import java.util.uuid;
/**
* redis stream消息生产者服务
* 负责向redis stream发送消息
*/
@service
@slf4j
public class messageproducerservice {
/**
* redisson客户端
*/
@autowired
private redissonclient redissonclient;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 发送消息到redis stream(根据消息类型自动选择stream)
*
* @param messagetype 消息类型
* @param data 消息数据
* @return 消息id
*/
public string sendmessage(string messagetype, map<string, string> data) {
// 根据消息类型自动选择stream key
string streamkey = getstreamkeybymessagetype(messagetype);
return sendmessage(streamkey, messagetype, data);
}
/**
* 发送消息到指定的redis stream
*
* @param streamkey stream键名
* @param messagetype 消息类型
* @param data 消息数据
* @return 消息id
*/
/**
* 发送消息到指定的redis stream
*
* @param streamkey stream键名
* @param messagetype 消息类型
* @param data 消息数据
* @return 消息id
*/
public string sendmessage(string streamkey, string messagetype, map<string, string> data) {
try {
// 添加消息类型和时间戳
data.put("messagetype", messagetype);
data.put("timestamp", string.valueof(system.currenttimemillis()));
data.put("messageid", uuid.randomuuid().tostring());
// 使用redisson发送消息
rstream<string, string> stream = redissonclient.getstream(streamkey);
// redisson 3.17.7中,add方法直接接受map参数
streammessageid messageid = stream.addall(data);
log.info("消息发送成功 - stream: {}, messageid: {}", streamkey, messageid);
return messageid.tostring();
} catch (exception e) {
log.error("消息发送失败 - stream: {}", streamkey, e);
throw new runtimeexception("消息发送失败", e);
}
}
/**
* 根据消息类型获取对应的stream key
*
* @param messagetype 消息类型
* @return stream键名
*/
private string getstreamkeybymessagetype(string messagetype) {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
throw new runtimeexception("未配置任何消费组");
}
// 根据消息类型前缀匹配stream key
string uppermessagetype = messagetype.touppercase();
if (uppermessagetype.contains("order")) {
return findstreamkey("order");
} else if (uppermessagetype.contains("payment") || uppermessagetype.contains("pay")) {
return findstreamkey("payment");
} else if (uppermessagetype.contains("notification") || uppermessagetype.contains("notify")) {
return findstreamkey("notification");
} else {
// 默认使用第一个stream
log.warn("未匹配到消息类型对应的stream,使用默认stream: {}", messagetype);
return streamgroupproperties.getgroups().get(0).getkey();
}
}
/**
* 根据关键字查找stream key
*
* @param keyword 关键字
* @return stream键名
*/
private string findstreamkey(string keyword) {
if (streamgroupproperties.getgroups() != null) {
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
if (groupconfig.getkey().tolowercase().contains(keyword.tolowercase())) {
return groupconfig.getkey();
}
}
}
// 如果找不到,返回第一个stream
if (streamgroupproperties.getgroups() != null && !streamgroupproperties.getgroups().isempty()) {
return streamgroupproperties.getgroups().get(0).getkey();
}
throw new runtimeexception("未找到匹配的stream key: " + keyword);
}
}package com.lijw.mp.event;
import com.lijw.mp.config.redisstream.streamgroupproperties;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rbucket;
import org.redisson.api.rstream;
import org.redisson.api.redissonclient;
import org.redisson.api.streammessageid;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import java.util.map;
import java.util.concurrent.completablefuture;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.timeunit;
/**
* redis stream消息消费者服务
* 负责消费redis stream中的消息并进行业务处理
*/
@component
@slf4j
public class messageconsumerservice {
/**
* redisson客户端
*/
@autowired
private redissonclient redissonclient;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 缓存stream key到group name的映射关系
*/
private final map<string, string> streamtogroupmap = new concurrenthashmap<>();
/**
* 消息消费处理方法
* 当redis stream中有新消息时,会调用此方法
*
* 多实例部署安全性:
* 1. redis stream consumer group 保证每条消息只会被一个消费者读取
* 2. 分布式锁防止多实例同时处理同一条消息
* 3. 幂等性检查作为双重保障
*
* @param streamkey stream键名
* @param messageid 消息id
* @param messagebody 消息内容
* @param groupname 消费组名称
*/
public void onmessage(string streamkey, string messageid, map<string, string> messagebody, string groupname) {
if (groupname == null) {
log.error("未找到stream key对应的消费组: {}, 消息id: {}", streamkey, messageid);
return;
}
// 幂等性检查:防止重复处理(第一道防线)
if (ismessageprocessed(messageid)) {
log.info("消息已处理,跳过: {}", messageid);
acknowledgemessage(streamkey, messageid, groupname);
return;
}
// 分布式锁:防止多实例同时处理同一条消息(第二道防线)
string lockkey = "message:process:lock:" + messageid;
rbucket<string> lockbucket = redissonclient.getbucket(lockkey);
boolean lockacquired = lockbucket.tryset("1", 5, timeunit.minutes);
if (!lockacquired) {
log.info("消息正在被其他实例处理,跳过: {}", messageid);
return;
}
try {
log.info("消费者收到消息 - stream: {}, group: {}, id: {}, 内容: {}", streamkey, groupname, messageid, messagebody);
// 再次幂等性检查(双重检查,防止在获取锁的过程中消息已被处理)
if (ismessageprocessed(messageid)) {
log.info("消息在处理过程中已被其他实例处理,跳过: {}", messageid);
acknowledgemessage(streamkey, messageid, groupname);
return;
}
// 处理业务逻辑(根据stream key路由到不同的业务处理)
boolean processsuccess = processbusiness(streamkey, messagebody);
if (processsuccess) {
// 标记消息已处理
markmessageprocessed(messageid);
// 清除重试次数
clearretrycount(messageid);
// 手动确认消息
acknowledgemessage(streamkey, messageid, groupname);
log.info("消息处理完成: {}", messageid);
} else {
log.error("业务处理失败,消息将重试: {}", messageid);
handleretry(streamkey, messageid, messagebody, groupname, "业务处理失败");
}
} catch (exception e) {
log.error("消息处理异常: {}", messageid, e);
handleretry(streamkey, messageid, messagebody, groupname, "消息处理异常");
} finally {
// 释放分布式锁
try {
lockbucket.delete();
} catch (exception e) {
log.warn("释放分布式锁失败: {}", messageid, e);
}
}
}
/**
* 根据stream key获取对应的消费组名称
*
* @param streamkey stream键名
* @return 消费组名称,如果未找到返回null
*/
private string getgroupnamebystreamkey(string streamkey) {
// 先从缓存中获取
if (streamtogroupmap.containskey(streamkey)) {
return streamtogroupmap.get(streamkey);
}
// 从配置中查找
if (streamgroupproperties.getgroups() != null) {
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
if (groupconfig.getkey().equals(streamkey)) {
streamtogroupmap.put(streamkey, groupconfig.getgroup());
return groupconfig.getgroup();
}
}
}
return null;
}
/**
* 幂等性检查
* 使用redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
*
* @param messageid 消息id
* @return 如果消息已处理返回true,否则返回false
*/
private boolean ismessageprocessed(string messageid) {
rbucket<string> bucket = redissonclient.getbucket("processed:" + messageid);
return bucket.isexists();
}
/**
* 标记消息已处理
* 使用redis存储事件处理id,设置过期时间
*
* @param messageid 消息id
*/
private void markmessageprocessed(string messageid) {
rbucket<string> bucket = redissonclient.getbucket("processed:" + messageid);
bucket.set("1", 24, timeunit.hours);
}
/**
* 业务处理逻辑(根据stream key路由到不同的业务处理)
*/
private boolean processbusiness(string streamkey, map<string, string> messagebody) {
try {
// 根据stream key路由到不同的业务处理
if (streamkey.contains("order")) {
return processorderbusiness(messagebody);
} else if (streamkey.contains("payment")) {
return processpaymentbusiness(messagebody);
} else if (streamkey.contains("notification")) {
return processnotificationbusiness(messagebody);
} else {
log.warn("未识别的stream key: {}, 使用默认处理逻辑", streamkey);
return processdefaultbusiness(messagebody);
}
} catch (exception e) {
log.error("业务处理异常", e);
return false;
}
}
/**
* 处理订单业务
*/
private boolean processorderbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
string orderid = messagebody.get("orderid");
log.info("处理订单{}消息,订单id: {}", messagetype, orderid);
// todo: 实现订单业务处理逻辑
// try {
// // 模拟10秒处理业务
// thread.sleep(100000);
// } catch (interruptedexception e) {
// throw new runtimeexception(e);
// }
// throw new runtimeexception("订单业务处理异常");
return true;
}
/**
* 处理支付业务
*/
private boolean processpaymentbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
string paymentid = messagebody.get("paymentid");
log.info("处理支付{}消息,支付id: {}", messagetype, paymentid);
// todo: 实现支付业务处理逻辑
return true;
}
/**
* 处理通知业务
*/
private boolean processnotificationbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
string userid = messagebody.get("userid");
log.info("处理通知{}消息,用户id: {}", messagetype, userid);
// todo: 实现通知业务处理逻辑
return true;
}
/**
* 默认业务处理
*/
private boolean processdefaultbusiness(map<string, string> messagebody) {
string messagetype = messagebody.get("messagetype");
log.info("处理默认业务消息: {}", messagetype);
// todo: 实现默认业务处理逻辑
return true;
}
/**
* 手动确认消息
*
* @param streamkey stream键名
* @param messageid 消息id
* @param groupname 消费组名称
*/
private void acknowledgemessage(string streamkey, string messageid, string groupname) {
try {
rstream<string, string> stream = redissonclient.getstream(streamkey);
// 解析消息id字符串为streammessageid
streammessageid streammessageid = parsestreammessageid(messageid);
stream.ack(groupname, streammessageid);
} catch (exception e) {
log.error("消息确认失败: {}", messageid, e);
}
}
/**
* 解析消息id字符串为streammessageid对象
*
* @param messageid 消息id字符串,格式如 "1234567890-0"
* @return streammessageid对象
*/
private org.redisson.api.streammessageid parsestreammessageid(string messageid) {
// streammessageid格式为 "timestamp-sequence"
string[] parts = messageid.split("-");
if (parts.length == 2) {
return new org.redisson.api.streammessageid(long.parselong(parts[0]), long.parselong(parts[1]));
}
// 如果格式不正确,尝试直接解析
return new org.redisson.api.streammessageid(long.parselong(messageid), 0);
}
/**
* 增加重试次数
*
* @param messageid 消息id
* @return 当前重试次数
*/
private int incrementretrycount(string messageid) {
string retrykey = "retry:count:" + messageid;
rbucket<string> bucket = redissonclient.getbucket(retrykey);
string countstr = bucket.get();
int retrycount = countstr == null ? 0 : integer.parseint(countstr);
retrycount++;
// 设置重试次数,过期时间为24小时
bucket.set(string.valueof(retrycount), 24, timeunit.hours);
return retrycount;
}
/**
* 清除重试次数
*
* @param messageid 消息id
*/
private void clearretrycount(string messageid) {
string retrykey = "retry:count:" + messageid;
rbucket<string> bucket = redissonclient.getbucket(retrykey);
bucket.delete();
}
/**
* 处理消息重试逻辑
*
* @param streamkey stream键名
* @param messageid 消息id
* @param messagebody 消息内容
* @param groupname 消费组名称
* @param errordescription 错误描述
*/
private void handleretry(string streamkey, string messageid, map<string, string> messagebody,
string groupname, string errordescription) {
// 记录重试次数
int retrycount = incrementretrycount(messageid);
// 检查是否超过最大重试次数
int maxretrycount = 3; // 最大重试次数,可根据业务需求调整
if (retrycount >= maxretrycount) {
log.error("消息{}重试次数已达上限({}),将停止重试并记录: {}", errordescription, maxretrycount, messageid);
// 记录失败消息到死信队列或告警(可根据业务需求实现)
handlemaxretryexceeded(messageid, messagebody, retrycount);
// 确认消息,避免无限重试
acknowledgemessage(streamkey, messageid, groupname);
return;
}
log.warn("消息{},当前重试次数: {}/{}, 消息id: {}", errordescription, retrycount, maxretrycount, messageid);
// 异步延迟重新处理消息
scheduleretry(streamkey, messageid, messagebody, groupname, retrycount);
}
/**
* 调度消息重试
* 使用异步方式延迟重新处理消息,避免立即重试
*
* @param streamkey stream键名
* @param messageid 消息id
* @param messagebody 消息内容
* @param groupname 消费组名称
* @param retrycount 当前重试次数
*/
private void scheduleretry(string streamkey, string messageid, map<string, string> messagebody,
string groupname, int retrycount) {
// 计算延迟时间:重试次数越多,延迟时间越长(指数退避策略)
long delayseconds = (long) math.pow(2, retrycount - 1);
log.info("消息将在{}秒后重新处理,消息id: {}", delayseconds, messageid);
// 异步延迟重新处理消息
completablefuture.runasync(() -> {
try {
thread.sleep(delayseconds * 1000);
log.info("开始重新处理消息,消息id: {}", messageid);
// 重新调用onmessage处理消息
onmessage(streamkey, messageid, messagebody, groupname);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
log.warn("消息重试被中断,消息id: {}", messageid);
} catch (exception e) {
log.error("消息重试处理异常,消息id: {}", messageid, e);
}
});
}
/**
* 处理超过最大重试次数的消息
* 记录失败消息详情,可根据业务需求扩展(如存储到数据库、发送告警等)
*
* @param messageid 消息id
* @param messagebody 消息内容
* @param retrycount 重试次数
*/
private void handlemaxretryexceeded(string messageid, map<string, string> messagebody, int retrycount) {
try {
// 记录失败消息详情(可根据业务需求实现,如存储到数据库、发送告警等)
string failedkey = "failed:message:" + messageid;
string failedinfo = string.format("消息id: %s, 重试次数: %d, 消息内容: %s, 失败时间: %s",
messageid, retrycount, messagebody, system.currenttimemillis());
rbucket<string> bucket = redissonclient.getbucket(failedkey);
bucket.set(failedinfo, 7, timeunit.days);
log.error("失败消息已记录: {}", failedinfo);
// todo: 可根据业务需求添加其他处理逻辑,如:
// 1. 发送告警通知
// 2. 存储到数据库死信表
// 3. 发送到死信队列
} catch (exception e) {
log.error("处理超过最大重试次数消息异常: {}", messageid, e);
}
}
}package com.lijw.mp.config.redisstream;
import com.lijw.mp.event.messageconsumerservice;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rstream;
import org.redisson.api.redissonclient;
import org.redisson.api.streammessageid;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.context.event.applicationreadyevent;
import org.springframework.context.annotation.configuration;
import org.springframework.context.event.eventlistener;
import org.springframework.scheduling.concurrent.threadpooltaskexecutor;
import java.net.inetaddress;
import java.lang.management.managementfactory;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.uuid;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
/**
* redis stream配置类
* 负责创建消费组、配置消息监听容器等
* 使用redisson实现消息消费
*/
@configuration
@slf4j
public class redisstreamconfig {
/**
* redisson客户端
*/
@autowired
private redissonclient redissonclient;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 消息消费者服务
*/
@autowired
private messageconsumerservice messageconsumerservice;
/**
* 消费者线程池
*/
private executorservice consumerexecutor;
/**
* 应用启动完成后,初始化消费组并启动消费者
*/
@eventlistener(applicationreadyevent.class)
public void initstreamconsumers() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
log.warn("未配置任何消费组");
return;
}
// 创建消费者线程池
consumerexecutor = createconsumerthreadpool();
// 创建所有消费组
createallconsumergroups();
// 启动所有消费者
startallconsumers();
}
/**
* 确保stream存在,如果不存在则创建
*
* @param streamkey stream键名
*/
private void ensurestreamexists(string streamkey) {
try {
rstream<string, string> stream = redissonclient.getstream(streamkey);
// 检查stream是否存在
long size = stream.size();
if (size == null || size == 0) {
// stream不存在,创建一条临时消息来初始化stream
map<string, string> tempdata = new hashmap<>();
tempdata.put("_init", "true");
tempdata.put("_timestamp", string.valueof(system.currenttimemillis()));
stream.addall(tempdata);
log.debug("创建stream成功: {}", streamkey);
}
} catch (exception e) {
// stream不存在,创建它
try {
rstream<string, string> stream = redissonclient.getstream(streamkey);
map<string, string> tempdata = new hashmap<>();
tempdata.put("_init", "true");
tempdata.put("_timestamp", string.valueof(system.currenttimemillis()));
stream.addall(tempdata);
log.info("创建stream成功: {}", streamkey);
} catch (exception ex) {
log.warn("创建stream失败: {}, {}", streamkey, ex.getmessage());
}
}
}
/**
* 启动所有消费者
*/
private void startallconsumers() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
log.warn("未配置任何消费组,无法启动消费者");
return;
}
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
startconsumer(groupconfig);
}
}
/**
* 创建所有消费者组(如果不存在)
* 如果stream不存在,会先创建stream
*/
private void createallconsumergroups() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
log.warn("未配置任何消费组");
return;
}
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
// 先检查stream是否存在,如果不存在则创建
ensurestreamexists(groupconfig.getkey());
// 创建消费组
rstream<string, string> stream = redissonclient.getstream(groupconfig.getkey());
stream.creategroup(groupconfig.getgroup(), streammessageid.all);
log.info("创建消费者组成功: stream={}, group={}", groupconfig.getkey(), groupconfig.getgroup());
} catch (exception e) {
// 如果消费组已存在,这是正常情况
string errormsg = e.getmessage() != null ? e.getmessage() : "";
if (errormsg.contains("busygroup") || errormsg.contains("already exists")) {
log.info("消费者组已存在: stream={}, group={}", groupconfig.getkey(), groupconfig.getgroup());
} else {
log.warn("创建消费者组失败: stream={}, group={}, error={}",
groupconfig.getkey(), groupconfig.getgroup(), errormsg);
// 即使创建失败,也继续处理其他消费组
}
}
}
}
/**
* 启动单个消费组的消费者
*
* @param groupconfig 消费组配置
*/
private void startconsumer(streamgroupproperties.streamgroupconfig groupconfig) {
// 为每个实例生成唯一消费者名称
string consumername = generateuniqueconsumername(groupconfig.getconsumerprefix());
// 启动消费者线程
consumerexecutor.submit(() -> {
consumemessages(groupconfig, consumername);
});
log.info("redis stream消费者启动成功 - stream: {}, group: {}, consumer: {}",
groupconfig.getkey(), groupconfig.getgroup(), consumername);
}
/**
* 消费消息的循环方法
*
* @param groupconfig 消费组配置
* @param consumername 消费者名称
*/
private void consumemessages(streamgroupproperties.streamgroupconfig groupconfig, string consumername) {
string streamkey = groupconfig.getkey();
string groupname = groupconfig.getgroup();
rstream<string, string> stream = redissonclient.getstream(streamkey);
log.info("开始消费消息 - stream: {}, group: {}, consumer: {}", streamkey, groupname, consumername);
while (!thread.currentthread().isinterrupted()) {
try {
// 读取消息(从未确认的消息开始,读取最多10条)
map<streammessageid, map<string, string>> messages = stream.readgroup(
groupname,
consumername,
10
);
if (messages != null && !messages.isempty()) {
// 处理每条消息
for (map.entry<streammessageid, map<string, string>> entry : messages.entryset()) {
streammessageid messageid = entry.getkey();
map<string, string> messagebody = entry.getvalue();
// 调用消费者服务处理消息
messageconsumerservice.onmessage(streamkey, messageid.tostring(), messagebody, groupname);
}
} else {
// 没有消息时,短暂休眠避免cpu空转
thread.sleep(100);
}
} catch (interruptedexception e) {
log.info("消费者线程被中断 - stream: {}, group: {}, consumer: {}", streamkey, groupname, consumername);
thread.currentthread().interrupt();
break;
} catch (exception e) {
log.error("消费消息异常 - stream: {}, group: {}, consumer: {}",
streamkey, groupname, consumername, e);
// 发生异常时短暂休眠,避免快速重试导致cpu占用过高
try {
thread.sleep(1000);
} catch (interruptedexception ie) {
thread.currentthread().interrupt();
break;
}
}
}
log.info("消费者线程结束 - stream: {}, group: {}, consumer: {}", streamkey, groupname, consumername);
}
/**
* 生成唯一消费者名称(支持多实例部署的关键)
* 使用ip+进程id确保集群环境下唯一性
*
* @param prefix 消费者名称前缀
* @return 唯一的消费者名称
*/
private string generateuniqueconsumername(string prefix) {
try {
string hostaddress = inetaddress.getlocalhost().gethostaddress();
string processid = managementfactory.getruntimemxbean().getname().split("@")[0];
return (prefix != null ? prefix : "consumer_") + hostaddress + "_" + processid + "_" + system.currenttimemillis();
} catch (exception e) {
// fallback:使用uuid
return (prefix != null ? prefix : "consumer_") + uuid.randomuuid().tostring().substring(0, 8);
}
}
/**
* 创建专用线程池
* 用于处理redis stream消息消费
*
* @return 线程池执行器
*/
private executorservice createconsumerthreadpool() {
threadpooltaskexecutor executor = new threadpooltaskexecutor();
executor.setcorepoolsize(2); // 核心线程数
executor.setmaxpoolsize(10); // 最大线程数
executor.setqueuecapacity(100); // 队列容量
executor.setthreadnameprefix("redis-stream-consumer-"); // 线程名前缀
executor.setdaemon(false); // 非守护线程,确保应用关闭时能正常结束
executor.initialize();
return executor.getthreadpoolexecutor();
}
}package com.lijw.mp.config.redisstream;
import com.lijw.mp.event.messageconsumerservice;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rbucket;
import org.redisson.api.rstream;
import org.redisson.api.redissonclient;
import org.redisson.api.streammessageid;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.scheduling.annotation.scheduled;
import org.springframework.stereotype.component;
import java.time.duration;
import java.util.hashmap;
import java.util.map;
import java.util.concurrent.timeunit;
/**
* 待处理消息重试服务
* 定时检查并处理redis stream中未被确认(ack)的消息
*
* 生产环境特性:
* 1. 定时扫描pending消息
* 2. 读取pending消息并重新处理
* 3. 复用messageconsumerservice进行业务处理
* 4. 支持消息空闲时间检查和告警
*/
@component
@slf4j
public class pendingmessageretryservice {
/**
* redisson客户端
*/
@autowired
private redissonclient redissonclient;
/**
* stream消费组配置属性
*/
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 消息消费者服务(用于重新处理pending消息)
*/
@autowired
private messageconsumerservice messageconsumerservice;
/**
* 定时监控各个消费队列的数量
* 执行时机:每10秒执行一次
*/
@scheduled(fixeddelay = 10000)
public void monitorqueuestats() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
return;
}
long totalmessages = 0;
long totalpendingmessages = 0;
stringbuilder statsbuilder = new stringbuilder("消息队列监控统计 | ");
// 遍历所有配置的消费组
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
rstream<string, string> stream = redissonclient.getstream(groupconfig.getkey());
// 获取总消息数
long messagecount = stream.size();
messagecount = messagecount != null ? messagecount : 0l;
// 获取pending消息数
long pendingcount = getpendingmessagecount(groupconfig);
// 计算已处理消息数
long processedcount = messagecount - pendingcount;
totalmessages += messagecount;
totalpendingmessages += pendingcount;
// 构建单条日志信息
statsbuilder.append(string.format("[%s/%s: 总数=%d, pending=%d, 已处理=%d] ",
groupconfig.getkey(),
groupconfig.getgroup(),
messagecount,
pendingcount,
math.max(0, processedcount)));
} catch (exception e) {
log.error("监控stream统计信息失败 - stream: {}, group: {}",
groupconfig.getkey(), groupconfig.getgroup(), e);
statsbuilder.append(string.format("[%s/%s: 统计失败] ",
groupconfig.getkey(), groupconfig.getgroup()));
}
}
// 添加总计信息
statsbuilder.append(string.format("| 总计: 总数=%d, pending=%d, 已处理=%d",
totalmessages,
totalpendingmessages,
math.max(0, totalmessages - totalpendingmessages)));
// 打印单条日志
log.info(statsbuilder.tostring());
}
/**
* 定时处理未确认的消息
* 执行时机:每30秒(30000)执行一次(通过@scheduled注解配置)
*/
@scheduled(fixeddelay = 30000)
public void retrypendingmessages() {
if (streamgroupproperties.getgroups() == null || streamgroupproperties.getgroups().isempty()) {
return;
}
// 遍历所有配置的消费组
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
// 获取待处理消息数量(通过读取pending消息来统计)
long totalpending = getpendingmessagecount(groupconfig);
if (totalpending > 0) {
log.info("发现待处理消息 - stream: {}, group: {}, 待处理数量: {}",
groupconfig.getkey(), groupconfig.getgroup(), totalpending);
// 根据pending消息数量进行告警
if (totalpending > 1000) {
log.error("【严重告警】待处理消息数量过多 - stream: {}, group: {}, 数量: {}, " +
"建议:1.检查消费者是否正常运行 2.增加消费实例 3.检查消息处理逻辑",
groupconfig.getkey(), groupconfig.getgroup(), totalpending);
} else if (totalpending > 100) {
log.warn("【警告】待处理消息数量较多 - stream: {}, group: {}, 数量: {}, " +
"建议:考虑增加消费实例或检查消息处理性能",
groupconfig.getkey(), groupconfig.getgroup(), totalpending);
}
// 读取并处理pending消息
readandprocesspendingmessages(groupconfig);
}
} catch (exception e) {
log.error("处理待处理消息异常 - stream: {}, group: {}",
groupconfig.getkey(), groupconfig.getgroup(), e);
}
}
}
/**
* 获取待处理消息数量
* 通过读取pending消息来统计数量
*
* @param groupconfig 消费组配置
* @return 待处理消息数量
*/
private long getpendingmessagecount(streamgroupproperties.streamgroupconfig groupconfig) {
try {
rstream<string, string> stream = redissonclient.getstream(groupconfig.getkey());
string consumername = "count-consumer";
// 尝试读取pending消息(使用0作为起始id,读取最多1000条来统计)
map<streammessageid, map<string, string>> messages = stream.readgroup(
groupconfig.getgroup(),
consumername,
1000,
new streammessageid(0, 0)
);
return messages != null ? messages.size() : 0;
} catch (exception e) {
log.warn("获取pending消息数量失败 - stream: {}, group: {}",
groupconfig.getkey(), groupconfig.getgroup(), e);
return 0;
}
}
/**
* 读取并处理pending消息
* 使用xreadgroup命令读取pending消息(使用0作为起始id)
*
* 在redis stream中,使用xreadgroup group group consumer streams key 0
* 可以读取该消费者组中所有pending的消息
*
* @param groupconfig 消费组配置
*/
private void readandprocesspendingmessages(streamgroupproperties.streamgroupconfig groupconfig) {
try {
// 使用固定的消费者名称来读取pending消息
string retryconsumername = "retry-consumer";
rstream<string, string> stream = redissonclient.getstream(groupconfig.getkey());
log.info("尝试读取pending消息 - stream: {}, group: {}, consumer: {}",
groupconfig.getkey(), groupconfig.getgroup(), retryconsumername);
// 读取pending消息(使用0作为起始id读取所有pending消息,最多100条)
// 在redis stream中,0表示读取所有pending消息
map<streammessageid, map<string, string>> messages = stream.readgroup(
groupconfig.getgroup(),
retryconsumername,
100,
new streammessageid(0, 0)
);
log.info("读取结果 - stream: {}, group: {}, consumer: {}, 记录数量: {}",
groupconfig.getkey(), groupconfig.getgroup(), retryconsumername,
messages != null ? messages.size() : 0);
if (messages != null && !messages.isempty()) {
log.info("读取到{}条pending消息,开始重新处理 - stream: {}, group: {}",
messages.size(), groupconfig.getkey(), groupconfig.getgroup());
int successcount = 0;
int failcount = 0;
// 复用messageconsumerservice的业务处理方法
for (map.entry<streammessageid, map<string, string>> entry : messages.entryset()) {
streammessageid streammessageid = entry.getkey();
string messageid = streammessageid.tostring();
map<string, string> messagebody = entry.getvalue();
string lockkey = "pending:retry:lock:" + messageid;
try {
// 使用分布式锁防止多实例重复处理
rbucket<string> lockbucket = redissonclient.getbucket(lockkey);
boolean lockacquired = lockbucket.tryset("1", 5, timeunit.minutes);
if (!lockacquired) {
// 锁已被其他实例获取,跳过处理
log.info("pending消息正在被其他实例处理,跳过 - stream: {}, group: {}, messageid: {}",
groupconfig.getkey(), groupconfig.getgroup(), messageid);
continue;
}
// 幂等性检查:检查消息是否已被处理
if (ismessageprocessed(messageid)) {
log.info("pending消息已处理,跳过 - stream: {}, group: {}, messageid: {}",
groupconfig.getkey(), groupconfig.getgroup(), messageid);
// 释放锁
lockbucket.delete();
continue;
}
log.info("开始处理pending消息 - stream: {}, group: {}, messageid: {}",
groupconfig.getkey(), groupconfig.getgroup(), messageid);
log.debug("消息内容: {}", messagebody);
// 复用messageconsumerservice的onmessage方法进行业务处理
messageconsumerservice.onmessage(groupconfig.getkey(), messageid, messagebody, groupconfig.getgroup());
successcount++;
log.info("pending消息重新处理成功 - stream: {}, group: {}, messageid: {}",
groupconfig.getkey(), groupconfig.getgroup(), messageid);
// 处理成功后释放锁
lockbucket.delete();
} catch (exception e) {
failcount++;
log.error("pending消息重新处理失败 - stream: {}, group: {}, messageid: {}, 错误: {}",
groupconfig.getkey(), groupconfig.getgroup(), messageid,
e.getmessage(), e);
// 处理失败后释放锁,允许重试
try {
rbucket<string> lockbucket = redissonclient.getbucket(lockkey);
lockbucket.delete();
} catch (exception ex) {
log.warn("释放锁失败: {}", ex.getmessage());
}
}
}
log.info("pending消息处理完成 - stream: {}, group: {}, 成功: {}, 失败: {}",
groupconfig.getkey(), groupconfig.getgroup(), successcount, failcount);
} else {
log.warn("未读取到pending消息 - stream: {}, group: {}, 可能原因:1.消息已被其他消费者处理 2.消费者名称不匹配 3.消息已被ack",
groupconfig.getkey(), groupconfig.getgroup());
}
} catch (exception e) {
log.error("读取pending消息失败 - stream: {}, group: {}, 错误类型: {}, 错误消息: {}",
groupconfig.getkey(), groupconfig.getgroup(),
e.getclass().getsimplename(), e.getmessage(), e);
}
}
/**
* 幂等性检查:检查消息是否已被处理
* 与messageconsumerservice使用相同的幂等性检查机制
*
* @param messageid 消息id
* @return 如果消息已处理返回true,否则返回false
*/
private boolean ismessageprocessed(string messageid) {
rbucket<string> bucket = redissonclient.getbucket("processed:" + messageid);
return bucket.isexists();
}
}package com.lijw.mp.controller;
import com.lijw.mp.config.redisstream.streamgroupproperties;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rstream;
import org.redisson.api.redissonclient;
import org.redisson.api.streammessageid;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.http.httpstatus;
import org.springframework.http.responseentity;
import org.springframework.web.bind.annotation.*;
import java.time.localdatetime;
import java.util.arraylist;
import java.util.hashmap;
import java.util.list;
import java.util.map;
@restcontroller
@requestmapping("/api/test/stream")
@slf4j
public class streamtestcontroller {
@autowired
private redissonclient redissonclient;
@autowired
private streamgroupproperties streamgroupproperties;
/**
* 获取默认的stream key(订单stream)
*/
private string getdefaultstreamkey() {
if (streamgroupproperties.getgroups() != null && !streamgroupproperties.getgroups().isempty()) {
// 优先查找order_stream,否则返回第一个
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
if (groupconfig.getkey().contains("order")) {
return groupconfig.getkey();
}
}
return streamgroupproperties.getgroups().get(0).getkey();
}
throw new runtimeexception("未配置任何消费组");
}
/**
* 发送json格式复杂消息
*/
@postmapping("/send-json")
public responseentity<map<string, object>> sendjsonmessage(
@requestbody map<string, object> messagedata,
@requestparam(required = false) string streamkey) {
try {
// 如果未指定stream key,使用默认的
if (streamkey == null || streamkey.isempty()) {
streamkey = getdefaultstreamkey();
}
// 添加元数据
messagedata.put("timestamp", system.currenttimemillis());
messagedata.put("messagetype", "custom_event");
// 转换map类型以适应redis template
map<string, string> stringmessage = new hashmap<>();
messagedata.foreach((k, v) -> stringmessage.put(k, v.tostring()));
rstream<string, string> stream = redissonclient.getstream(streamkey);
streammessageid messageid = stream.addall(stringmessage);
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid.tostring());
result.put("data", messagedata);
log.info("json消息发送成功: {}", messageid);
return responseentity.ok(result);
} catch (exception e) {
log.error("发送json消息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(httpstatus.internal_server_error).body(result);
}
}
/**
* 发送订单事件测试消息
*/
@postmapping("/send-order")
public responseentity<map<string, object>> sendordermessage(
@requestparam string orderid,
@requestparam double amount,
@requestparam string status,
@requestparam(required = false) string streamkey) {
try {
// 如果未指定stream key,使用默认的订单stream
if (streamkey == null || streamkey.isempty()) {
streamkey = getdefaultstreamkey();
}
map<string, string> message = new hashmap<>();
message.put("eventtype", "order_" + status.touppercase());
message.put("orderid", orderid);
message.put("amount", amount.tostring());
message.put("status", status);
message.put("eventtime", localdatetime.now().tostring());
rstream<string, string> stream = redissonclient.getstream(streamkey);
streammessageid messageid = stream.addall(message);
map<string, object> result = new hashmap<>();
result.put("success", true);
result.put("messageid", messageid.tostring());
result.put("eventtype", message.get("eventtype"));
result.put("orderid", orderid);
log.info("订单事件发送成功: {} - {}", message.get("eventtype"), orderid);
return responseentity.ok(result);
} catch (exception e) {
log.error("发送订单事件失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(httpstatus.internal_server_error).body(result);
}
}
/**
* 查看所有配置的stream信息
*/
@getmapping("/info/all")
public responseentity<map<string, object>> getallstreaminfo() {
try {
map<string, object> result = new hashmap<>();
map<string, long> streams = new hashmap<>();
if (streamgroupproperties.getgroups() != null) {
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
rstream<string, string> stream = redissonclient.getstream(groupconfig.getkey());
long streamsize = stream.size();
streams.put(groupconfig.getkey(), streamsize);
} catch (exception e) {
log.warn("获取stream信息失败: {}", groupconfig.getkey(), e);
streams.put(groupconfig.getkey(), -1l);
}
}
}
result.put("streams", streams);
result.put("timestamp", system.currenttimemillis());
return responseentity.ok(result);
} catch (exception e) {
log.error("获取所有stream信息失败", e);
return responseentity.status(httpstatus.internal_server_error).build();
}
}
/**
* 查询消息队列总数情况(监控接口)
* 返回所有stream的详细信息,包括总消息数、pending消息数、消费组信息等
*/
@getmapping("/stats")
public responseentity<map<string, object>> getstreamstats() {
try {
map<string, object> result = new hashmap<>();
list<map<string, object>> streamstats = new arraylist<>();
long totalmessages = 0;
long totalpendingmessages = 0;
if (streamgroupproperties.getgroups() != null) {
for (streamgroupproperties.streamgroupconfig groupconfig : streamgroupproperties.getgroups()) {
try {
map<string, object> stats = getstreamstatistics(groupconfig);
streamstats.add(stats);
// 累计统计
totalmessages += (long) stats.getordefault("messagecount", 0l);
totalpendingmessages += (long) stats.getordefault("pendingcount", 0l);
} catch (exception e) {
log.warn("获取stream统计信息失败: {}", groupconfig.getkey(), e);
map<string, object> errorstats = new hashmap<>();
errorstats.put("streamkey", groupconfig.getkey());
errorstats.put("groupname", groupconfig.getgroup());
errorstats.put("error", e.getmessage());
streamstats.add(errorstats);
}
}
}
result.put("streams", streamstats);
// 创建汇总信息
map<string, object> summary = new hashmap<>();
summary.put("totalstreams", streamstats.size());
summary.put("totalmessages", totalmessages);
summary.put("totalpendingmessages", totalpendingmessages);
result.put("summary", summary);
result.put("timestamp", system.currenttimemillis());
return responseentity.ok(result);
} catch (exception e) {
log.error("获取stream统计信息失败", e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(httpstatus.internal_server_error).body(result);
}
}
/**
* 获取单个stream的统计信息
*
* @param groupconfig 消费组配置
* @return stream统计信息
*/
private map<string, object> getstreamstatistics(streamgroupproperties.streamgroupconfig groupconfig) {
map<string, object> stats = new hashmap<>();
rstream<string, string> stream = redissonclient.getstream(groupconfig.getkey());
// 总消息数
long messagecount = stream.size();
stats.put("streamkey", groupconfig.getkey());
stats.put("groupname", groupconfig.getgroup());
stats.put("messagecount", messagecount != null ? messagecount : 0l);
// pending消息数(通过读取pending消息来统计)
long pendingcount = getpendingmessagecount(groupconfig);
stats.put("pendingcount", pendingcount);
// 已处理消息数(总消息数 - pending消息数)
long processedcount = (messagecount != null ? messagecount : 0l) - pendingcount;
stats.put("processedcount", math.max(0, processedcount));
return stats;
}
/**
* 获取待处理消息数量
*
* @param groupconfig 消费组配置
* @return 待处理消息数量
*/
private long getpendingmessagecount(streamgroupproperties.streamgroupconfig groupconfig) {
try {
rstream<string, string> stream = redissonclient.getstream(groupconfig.getkey());
string consumername = "stats-consumer";
// 尝试读取pending消息(使用0作为起始id,读取最多1000条来统计)
map<streammessageid, map<string, string>> messages = stream.readgroup(
groupconfig.getgroup(),
consumername,
1000,
new streammessageid(0, 0)
);
return messages != null ? messages.size() : 0;
} catch (exception e) {
log.warn("获取pending消息数量失败 - stream: {}, group: {}",
groupconfig.getkey(), groupconfig.getgroup(), e);
return 0;
}
}
/**
* 清除消息队列(运维接口)
* 注意:此操作会删除stream中的所有消息,请谨慎使用
*
* @param streamkey stream键名(必填,防止误操作)
* @param confirm 确认参数,必须为"delete"才会执行删除操作
* @return 操作结果
*/
@deletemapping("/clear")
public responseentity<map<string, object>> clearstream(
@requestparam string streamkey,
@requestparam(required = false, defaultvalue = "") string confirm) {
try {
map<string, object> result = new hashmap<>();
// 安全检查:必须提供确认参数
if (!"delete".equals(confirm)) {
result.put("success", false);
result.put("error", "清除操作需要确认,请在confirm参数中传入'delete'");
result.put("message", "此操作会删除stream中的所有消息,请谨慎使用");
return responseentity.status(httpstatus.bad_request).body(result);
}
// 检查stream是否存在
rstream<string, string> stream = redissonclient.getstream(streamkey);
long sizebefore = stream.size();
if (sizebefore == null || sizebefore == 0) {
result.put("success", true);
result.put("message", "stream为空,无需清除");
result.put("streamkey", streamkey);
result.put("deletedcount", 0);
return responseentity.ok(result);
}
// 删除stream(删除整个stream会清除所有消息)
// 注意:redisson中删除stream需要使用redis命令,这里使用trim到0的方式
// 或者直接删除key
redissonclient.getkeys().delete(streamkey);
result.put("success", true);
result.put("message", "stream清除成功");
result.put("streamkey", streamkey);
result.put("deletedcount", sizebefore);
result.put("timestamp", system.currenttimemillis());
log.warn("stream已清除 - stream: {}, 删除消息数: {}", streamkey, sizebefore);
return responseentity.ok(result);
} catch (exception e) {
log.error("清除stream失败 - stream: {}", streamkey, e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(httpstatus.internal_server_error).body(result);
}
}
/**
* 清除指定stream的pending消息(运维接口)
* 只清除pending消息,保留已处理的消息
*
* @param streamkey stream键名
* @param groupname 消费组名称
* @param confirm 确认参数,必须为"delete_pending"才会执行删除操作
* @return 操作结果
*/
@deletemapping("/clear-pending")
public responseentity<map<string, object>> clearpendingmessages(
@requestparam string streamkey,
@requestparam string groupname,
@requestparam(required = false, defaultvalue = "") string confirm) {
try {
map<string, object> result = new hashmap<>();
// 安全检查:必须提供确认参数
if (!"delete_pending".equals(confirm)) {
result.put("success", false);
result.put("error", "清除pending消息操作需要确认,请在confirm参数中传入'delete_pending'");
result.put("message", "此操作会删除stream中所有pending消息,请谨慎使用");
return responseentity.status(httpstatus.bad_request).body(result);
}
rstream<string, string> stream = redissonclient.getstream(streamkey);
// 读取所有pending消息并确认它们(通过ack来清除pending状态)
string consumername = "clear-pending-consumer";
int clearedcount = 0;
int maxiterations = 100; // 防止无限循环
int iteration = 0;
while (iteration < maxiterations) {
map<streammessageid, map<string, string>> messages = stream.readgroup(
groupname,
consumername,
100,
new streammessageid(0, 0)
);
if (messages == null || messages.isempty()) {
break;
}
// 确认所有pending消息
for (streammessageid messageid : messages.keyset()) {
stream.ack(groupname, messageid);
clearedcount++;
}
iteration++;
}
result.put("success", true);
result.put("message", "pending消息清除成功");
result.put("streamkey", streamkey);
result.put("groupname", groupname);
result.put("clearedcount", clearedcount);
result.put("timestamp", system.currenttimemillis());
log.warn("pending消息已清除 - stream: {}, group: {}, 清除数量: {}",
streamkey, groupname, clearedcount);
return responseentity.ok(result);
} catch (exception e) {
log.error("清除pending消息失败 - stream: {}, group: {}", streamkey, groupname, e);
map<string, object> result = new hashmap<>();
result.put("success", false);
result.put("error", e.getmessage());
return responseentity.status(httpstatus.internal_server_error).body(result);
}
}
}get http://localhost:8083/api/test/stream/stats

get http://localhost:8083/api/test/stream/info/all

post http://localhost:8083/api/test/stream/send-json
# param
streamkey=order_stream
# body
{
"orderid": "orderid123345123",
"amount": 500,
"messagetype": "order"
}
post http://localhost:8083/api/test/stream/send-order # param streamkey:order_stream orderid:orderid2025120701 amount:500 status:ok messagetype:order

上面的几个接口已经说明了使用方式,下面截图展示一下相应的执行日志:
消息的发送以及处理

消息的定时监控

消息队列监控统计 | [order_stream/order_group: 总数=212, pending=0, 已处理=212] [payment_stream/payment_group: 总数=2, pending=0, 已处理=2] [notification_stream/notification_group: 总数=2, pending=0, 已处理=2] | 总计: 总数=216, pending=0, 已处理=216
到此这篇关于spring boot集成redis stream消息队列从入门到实战指南的文章就介绍到这了,更多相关spring boot集成redis stream消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论