it编程 > 数据库 > Redis

Redis处理MQ消费幂等的实现示例

115人参与 2025-05-16 Redis

本方案参考马哥短连接项目中的消息幂等处理方案

一.生成者模版代码

@slf4j
@component
@requiredargsconstructor
@conditionalonproperty(name = "message-queue.type", havingvalue = "rocketmq")
public class custommessageproducer implements messagequeueproducer {
    
    private final rocketmqtemplate rocketmqtemplate;

    @value("${rocketmq.producer.topic}")
    private string customtopic;

    /**
     * 通用的发送方法,允许自定义消息内容和处理逻辑
     * @param messagepayload 消息体数据
     */
    @override
    public void send(map<string, string> messagepayload) {
        // 生成唯一消息键,用于标识消息的幂等性
        string messageid = uuid.randomuuid().tostring();
        messagepayload.put("messageid", messageid);

        // 构建消息
        message<map<string, string>> message = messagebuilder
                .withpayload(messagepayload)
                .setheader(messageconst.property_keys, messageid)
                .build();

        // 发送消息并处理结果
        try {
            sendresult sendresult = rocketmqtemplate.syncsend(customtopic, message, 2000l);
            log.info("消息发送成功: 状态={}, 消息id={}, 消息键={}", sendresult.getsendstatus(), sendresult.getmsgid(), messageid);
        } catch (exception e) {
            log.error("消息发送失败: 消息内容={}", json.tojsonstring(messagepayload), e);
            // 添加自定义的失败处理逻辑
        }
    }
}

关键说明

这样设计有助于实现通用的消息发送,只需更改 messagepayload 数据结构和自定义处理逻辑即可适应不同业务。

二.消息幂等处理器模版代码

package com.example.project.mq.idempotent;

import lombok.requiredargsconstructor;
import org.springframework.data.redis.core.stringredistemplate;
import org.springframework.stereotype.component;

import java.util.objects;
import java.util.concurrent.timeunit;

/**
 * 消息幂等处理器
 */
@component
@requiredargsconstructor
public class messageidempotenthandler {

    private final stringredistemplate stringredistemplate;
    private static final string idempotent_key_prefix = "message:idempotent:";

    /**
     * 判断消息是否已被处理
     *
     * @param messageid 消息唯一标识
     * @return true 表示消息未处理,可以继续处理;false 表示消息已处理,避免重复消费
     */
    public boolean ismessagenotprocessed(string messageid) {
        string key = idempotent_key_prefix + messageid;
        // 尝试设置新键,如果不存在则返回 true 表示未处理,存在则返回 false
        return boolean.true.equals(stringredistemplate.opsforvalue().setifabsent(key, "0", 2, timeunit.minutes));
    }

    /**
     * 标记消息处理流程完成
     *
     * @param messageid 消息唯一标识
     */
    public void markasprocessed(string messageid) {
        string key = idempotent_key_prefix + messageid;
        stringredistemplate.opsforvalue().set(key, "1", 2, timeunit.minutes);
    }

    /**
     * 查询消息是否已经处理完成
     *
     * @param messageid 消息唯一标识
     * @return true 表示消息处理已完成,false 表示未完成
     */
    public boolean isprocessingcomplete(string messageid) {
        string key = idempotent_key_prefix + messageid;
        return objects.equals(stringredistemplate.opsforvalue().get(key), "1");
    }

    /**
     * 处理异常时删除幂等标识
     *
     * @param messageid 消息唯一标识
     */
    public void clearprocessedflag(string messageid) {
        string key = idempotent_key_prefix + messageid;
        stringredistemplate.delete(key);
    }
}

职责描述

三.生产者代码模版

@override
public void onmessage(map<string, string> producermap) {
    // 获取消息的唯一标识符
    string keys = producermap.get("keys");
    
    // 检查是否已处理过该消息,幂等性控制
    if (!messagequeueidempotenthandler.ismessageprocessed(keys)) {
        // 若该消息流程尚未完成
        if (messagequeueidempotenthandler.isaccomplish(keys)) {
            return; // 跳过已完成流程的消息
        }
        throw new serviceexception("消息未完成流程,需要消息队列重试");
    }
    
    // 业务逻辑处理
    try {
       	//调用业务方法代码
        }
    } catch (throwable ex) {
        log.error("消费异常", ex);
        throw ex;
    }
    
    // 标记消息处理完成
    messagequeueidempotenthandler.setaccomplish(keys);
}


职责描述

onmessage 模板的职责是接收并处理消息队列中的消息,确保幂等性,并在需要时抛出异常让消息队列进行重试。以下是该方法中关键步骤的职责和操作说明:

四.消息消费流程

假设消息消费应用场景如下:

五.总结

在本方案中,通过使用 redis 实现 mq 消息的幂等处理,确保了消息在消费过程中只会被处理一次,避免了重复消费带来的业务异常和资源浪费。其主要特点和优势如下:

总的来说,此方案为消息幂等性控制提供了一种可扩展、通用且高效的实现方式,非常适合在高并发分布式系统中应用,能够有效提高消息消费的稳定性和安全性

到此这篇关于redis处理mq消费幂等的实现示例的文章就介绍到这了,更多相关redis mq消费幂等内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

您想发表意见!!点此发布评论

推荐阅读

K8S redis 部署的项目实践

05-16

Nginx+keepalived配置的实现步骤

05-14

nginx中常见日志分析命令合集

05-14

Redis作为分布式锁的使用详解

05-14

Redis集群部署模式的不同实现过程

05-14

大数据量下Redis分片的5种策略分享

05-19

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论