115人参与 • 2025-05-16 • Redis
本方案参考马哥短连接项目中的消息幂等处理方案
@slf4j
@component
@requiredargsconstructor
@conditionalonproperty(name = "message-queue.type", havingvalue = "rocketmq")
public class custommessageproducer implements messagequeueproducer {
private final rocketmqtemplate rocketmqtemplate;
@value("${rocketmq.producer.topic}")
private string customtopic;
/**
* 通用的发送方法,允许自定义消息内容和处理逻辑
* @param messagepayload 消息体数据
*/
@override
public void send(map<string, string> messagepayload) {
// 生成唯一消息键,用于标识消息的幂等性
string messageid = uuid.randomuuid().tostring();
messagepayload.put("messageid", messageid);
// 构建消息
message<map<string, string>> message = messagebuilder
.withpayload(messagepayload)
.setheader(messageconst.property_keys, messageid)
.build();
// 发送消息并处理结果
try {
sendresult sendresult = rocketmqtemplate.syncsend(customtopic, message, 2000l);
log.info("消息发送成功: 状态={}, 消息id={}, 消息键={}", sendresult.getsendstatus(), sendresult.getmsgid(), messageid);
} catch (exception e) {
log.error("消息发送失败: 消息内容={}", json.tojsonstring(messagepayload), e);
// 添加自定义的失败处理逻辑
}
}
}
messagepayload 中加入 messageid,确保消息唯一性,便于后续在 redis 中验证和处理消息幂等。customtopic 及 messagepayload 内容。这样设计有助于实现通用的消息发送,只需更改 messagepayload 数据结构和自定义处理逻辑即可适应不同业务。
package com.example.project.mq.idempotent;
import lombok.requiredargsconstructor;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.stereotype.component;
import java.util.objects;
import java.util.concurrent.timeunit;
/**
* 消息幂等处理器
*/
@component
@requiredargsconstructor
public class messageidempotenthandler {
private final stringredistemplate stringredistemplate;
private static final string idempotent_key_prefix = "message:idempotent:";
/**
* 判断消息是否已被处理
*
* @param messageid 消息唯一标识
* @return true 表示消息未处理,可以继续处理;false 表示消息已处理,避免重复消费
*/
public boolean ismessagenotprocessed(string messageid) {
string key = idempotent_key_prefix + messageid;
// 尝试设置新键,如果不存在则返回 true 表示未处理,存在则返回 false
return boolean.true.equals(stringredistemplate.opsforvalue().setifabsent(key, "0", 2, timeunit.minutes));
}
/**
* 标记消息处理流程完成
*
* @param messageid 消息唯一标识
*/
public void markasprocessed(string messageid) {
string key = idempotent_key_prefix + messageid;
stringredistemplate.opsforvalue().set(key, "1", 2, timeunit.minutes);
}
/**
* 查询消息是否已经处理完成
*
* @param messageid 消息唯一标识
* @return true 表示消息处理已完成,false 表示未完成
*/
public boolean isprocessingcomplete(string messageid) {
string key = idempotent_key_prefix + messageid;
return objects.equals(stringredistemplate.opsforvalue().get(key), "1");
}
/**
* 处理异常时删除幂等标识
*
* @param messageid 消息唯一标识
*/
public void clearprocessedflag(string messageid) {
string key = idempotent_key_prefix + messageid;
stringredistemplate.delete(key);
}
}
整体职责:messageidempotenthandler 主要职责是保证消息在消费过程中只被处理一次,防止重复消费。它借助 redis 存储和检查消息的唯一标识符,以实现消息的幂等性控制。
方法职责:
ismessagenotprocessed:判断消息是否已处理。此方法尝试设置一个短期过期的标识,如果消息尚未被消费(即 redis 中不存在该键),则可以继续处理,否则表示消息已处理。markasprocessed:标记消息为已完成消费。成功消费后调用该方法,改变 redis 中的键值标识,标记为已完成状态,避免重复处理。isprocessingcomplete:检查消息消费流程是否完成。消费完成后,该键值会被设置为 "1",此方法用于验证该状态。clearprocessedflag:清除幂等标识。在消息消费失败或异常情况下,删除标识以便消息可重新消费。@override
public void onmessage(map<string, string> producermap) {
// 获取消息的唯一标识符
string keys = producermap.get("keys");
// 检查是否已处理过该消息,幂等性控制
if (!messagequeueidempotenthandler.ismessageprocessed(keys)) {
// 若该消息流程尚未完成
if (messagequeueidempotenthandler.isaccomplish(keys)) {
return; // 跳过已完成流程的消息
}
throw new serviceexception("消息未完成流程,需要消息队列重试");
}
// 业务逻辑处理
try {
//调用业务方法代码
}
} catch (throwable ex) {
log.error("消费异常", ex);
throw ex;
}
// 标记消息处理完成
messagequeueidempotenthandler.setaccomplish(keys);
}
onmessage 模板的职责是接收并处理消息队列中的消息,确保幂等性,并在需要时抛出异常让消息队列进行重试。以下是该方法中关键步骤的职责和操作说明:
keys 作为唯一标识符来检查消息是否已被处理过,避免重复消费。messagequeueidempotenthandler.ismessageprocessed(keys) 方法,如果消息已处理,则跳过;如果未完成处理流程,但状态为“已完成”,则直接返回;否则抛出 serviceexception,让消息队列进行重试。try 块中进行消息的实际业务处理,调用相关业务逻辑方法,避免未捕获的异常导致幂等性标记不一致。log.error("消费异常", ex)) 并再次抛出异常。messagequeueidempotenthandler.setaccomplish(keys) 将消息标记为已完成。假设消息消费应用场景如下:
messageid。ismessagenotprocessed 检查 redis 中是否有该消息的标识。true,则说明此消息未被处理,进入消费流程。false,则说明此消息已处理,直接跳过避免重复消费。markasprocessed,在 redis 中将该消息标记为已完成。clearprocessedflag 删除该消息的 redis 标识,允许系统稍后重新尝试处理该消息。在本方案中,通过使用 redis 实现 mq 消息的幂等处理,确保了消息在消费过程中只会被处理一次,避免了重复消费带来的业务异常和资源浪费。其主要特点和优势如下:
setifabsent 来判断消息是否已被处理,确保消息在消费过程中仅被执行一次,避免重复消费的风险。uuid 生成消息的唯一标识 messageid,并将其嵌入到消息体中,保证每条消息的唯一性。messageidempotenthandler,支持多种状态检查和异常处理。markasprocessed 和 clearprocessedflag 实现了消费完成状态标记和异常重试机制,确保消费的可靠性和一致性。总的来说,此方案为消息幂等性控制提供了一种可扩展、通用且高效的实现方式,非常适合在高并发分布式系统中应用,能够有效提高消息消费的稳定性和安全性
到此这篇关于redis处理mq消费幂等的实现示例的文章就介绍到这了,更多相关redis mq消费幂等内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论