3人参与 • 2025-10-24 • Java
下面将详细介绍如何在springboot中使用rocketmq实现延迟消息,包括基于延迟级别和基于具体时间两种方式的完整实现。
rocketmq提供了两种类型的延迟消息机制:
这两种机制在订单超时取消、会议提醒、定时任务调度等场景中有广泛应用。
<dependency>
<groupid>org.apache.rocketmq</groupid>
<artifactid>rocketmq-spring-boot-starter</artifactid>
<version>2.2.3</version>
</dependency>
在application.yml中配置rocketmq连接信息:
rocketmq:
name-server: localhost:9876
producer:
group: delay-message-producer-group
rocketmq默认提供18个延迟级别,定义在messagestoreconfig类中:
messagedelaylevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
对应关系:
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.stereotype.component;
@component
public class delaylevelproducer {
@autowired
private rocketmqtemplate rocketmqtemplate;
/**
* 发送基于延迟级别的消息
* @param topic 主题
* @param tag 标签
* @param message 消息内容
* @param delaylevel 延迟级别(1-18)
*/
public void sendmessagebydelaylevel(string topic, string tag, string message, int delaylevel) {
// 创建消息
message<string> springmessage = messagebuilder.withpayload(message).build();
// 发送延迟消息
sendresult sendresult = rocketmqtemplate.syncsend(
topic + ":" + tag,
springmessage,
3000, // 超时时间
delaylevel // 延迟级别
);
system.out.println("延迟级别消息发送成功: " + sendresult);
}
/**
* 发送订单超时取消消息(延迟15分钟)
*/
public void sendordertimeoutmessage(string orderid) {
string message = "订单超时取消: " + orderid;
// 15分钟对应level=14(根据默认配置)
sendmessagebydelaylevel("ordertopic", "timeout", message, 14);
}
}
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.messageconst;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.stereotype.component;
import java.util.date;
@component
public class scheduledmessageproducer {
@autowired
private rocketmqtemplate rocketmqtemplate;
/**
* 发送延迟指定毫秒数的消息
*/
public void sendmessagewithdelayms(string topic, string message, long delayms) {
// 计算投递时间
long delivertimems = system.currenttimemillis() + delayms;
// 创建消息并设置投递时间
message<string> springmessage = messagebuilder.withpayload(message)
.setheader(messageconst.property_delay_time_ms, string.valueof(delayms))
.setheader(messageconst.property_timer_deliver_ms, string.valueof(delivertimems))
.build();
sendresult sendresult = rocketmqtemplate.syncsend(topic, springmessage);
system.out.println("延迟毫秒消息发送成功: " + sendresult);
}
/**
* 发送指定时间点投递的消息
*/
public void sendmessageattime(string topic, string message, date delivertime) {
long delivertimems = delivertime.gettime();
// 创建消息并设置投递时间
message<string> springmessage = messagebuilder.withpayload(message)
.setheader(messageconst.property_timer_deliver_ms, string.valueof(delivertimems))
.build();
sendresult sendresult = rocketmqtemplate.syncsend(topic, springmessage);
system.out.println("定时投递消息发送成功: " + sendresult);
}
/**
* 发送10秒后投递的消息
*/
public void sendtensecondslatermessage(string topic, string message) {
sendmessagewithdelayms(topic, message, 10000l);
}
}
延迟消息的消费者与普通消息消费者相同,无需特殊配置:
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.component;
import java.time.localdatetime;
import java.time.format.datetimeformatter;
@component
@rocketmqmessagelistener(
topic = "ordertopic",
consumergroup = "delay-message-consumer-group",
selectorexpression = "timeout"
)
public class ordertimeoutconsumer implements rocketmqlistener<string> {
@override
public void onmessage(string message) {
string now = localdatetime.now().format(datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"));
system.out.println("[" + now + "] 接收到订单超时消息: " + message);
// 处理订单取消逻辑
processordercancellation(message);
}
private void processordercancellation(string message) {
// 提取订单id
string orderid = message.substring(message.indexof(":") + 2);
system.out.println("执行订单取消操作,订单id: " + orderid);
// 这里可以调用订单服务进行取消操作
}
}
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.format.annotation.datetimeformat;
import org.springframework.web.bind.annotation.*;
import java.util.date;
@restcontroller
@requestmapping("/api/delay")
public class delaymessagecontroller {
@autowired
private delaylevelproducer delaylevelproducer;
@autowired
private scheduledmessageproducer scheduledmessageproducer;
/**
* 发送基于延迟级别的消息
*/
@postmapping("/level")
public string sendbydelaylevel(
@requestparam string topic,
@requestparam string tag,
@requestparam string message,
@requestparam(defaultvalue = "3") int delaylevel) {
delaylevelproducer.sendmessagebydelaylevel(topic, tag, message, delaylevel);
return "延迟级别消息发送成功,延迟级别: " + delaylevel;
}
/**
* 发送订单超时取消消息
*/
@postmapping("/order/timeout")
public string sendordertimeout(@requestparam string orderid) {
delaylevelproducer.sendordertimeoutmessage(orderid);
return "订单超时取消消息已发送,订单id: " + orderid;
}
/**
* 发送延迟指定毫秒的消息
*/
@postmapping("/milliseconds")
public string sendbydelayms(
@requestparam string topic,
@requestparam string message,
@requestparam long delayms) {
scheduledmessageproducer.sendmessagewithdelayms(topic, message, delayms);
return "延迟毫秒消息发送成功,延迟: " + delayms + "ms";
}
/**
* 发送指定时间点的消息
*/
@postmapping("/scheduled")
public string sendscheduled(
@requestparam string topic,
@requestparam string message,
@requestparam @datetimeformat(pattern = "yyyy-mm-dd hh:mm:ss") date delivertime) {
scheduledmessageproducer.sendmessageattime(topic, message, delivertime);
return "定时消息发送成功,投递时间: " + delivertime;
}
}
在broker的配置文件中可以自定义延迟级别:
# 在broker.conf文件中添加 messagedelaylevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h
重启broker使其生效。注意,修改延迟级别后,所有使用延迟级别的消息都会使用新的配置。
| 特性 | 基于延迟级别 | 基于具体时间 |
|---|---|---|
| 灵活性 | 较低,只能使用预定义级别 | 高,可以精确到毫秒 |
| 适用版本 | 全版本支持 | rocketmq 5.x及以上版本完整支持 |
| 使用场景 | 固定延迟时间的场景 | 需要精确控制投递时间的场景 |
| 配置复杂度 | 简单,无需额外配置 | 可能需要在broker端开启相关功能 |
延迟精度:
版本兼容性:
性能考虑:
消息可靠性:
发送订单超时取消消息(延迟15分钟):
post /api/delay/order/timeout?orderid=order123456
发送10秒后投递的消息:
post /api/delay/milliseconds?topic=testtopic&message=hellodelay&delayms=10000
发送指定时间点的消息:
post /api/delay/scheduled?topic=testtopic&message=helloscheduled&delivertime=2024-12-25%2000:00:00
通过以上配置和代码,您可以在springboot项目中轻松实现基于rocketmq的延迟消息功能,满足各种定时任务和延迟处理的业务需求。
到此这篇关于springboot+rocketmq实现延迟消息的示例代码的文章就介绍到这了,更多相关springboot rocketmq 延迟内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论