151人参与 • 2024-07-31 • Erlang
优点:时效性强,等待到结果后才返回
缺点:扩展性差、性能下降(调用链越长耗时越久)、级联失败问题(一个调用点卡住,后面的链路都不能执行)
异步调用通常是基于消息通知的方式,包含三个角色:
消息发送者:投递消息的人,就是原来的调用者
消息接收者:接收和处理消息的人,就是原来的服务提供者
消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
优点:
耦合度低,拓展性强
异步调用,无需等待,性能好
故障隔离,下游服务故障不影响上游业务
缓存消息,流量削峰填谷
缺点:
不能立即得到调用结果,时效性差
不确定下游业务执行是否成功
业务安全依赖于broker(消息代理)的可靠性
查找镜像:docker search rabbitmq
拉取镜像:docker pull rabbitmq:3.8.19
,指定拉取版本为3.18.19,如果不指定则默认拉取latest
查看镜像:docker images
启动镜像:设置账号登录为admin,登录密码为admin,不指定镜像版本,默认启动rabbitmq:latest
docker run \
-e rabbitmq_default_user=admin \
-e rabbitmq_default_pass=admin \
-v mq-plugins:/plugins \
--name mq \
--hostname localhost \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8.19
查看容器:docker ps
进入rabbitmq容器:docker exec -it 4df /bin/bash
开启rabbitmq后台访问:rabbitmq-plugins enable rabbitmq_management
退出容器bash:exit
网页访问rabbitmq后台:访问http://localhost:15672,账号admin,密码admin
常见问题:
后台管理系统的可视化界面中出现:all stable feature flags must be enabled after completing an upgrade
**解决方案:**点击admin -> feature flags,确保所有稳定的特性标志都是启用状态。如果有任何标志未启用,请将其启用。
后台管理系统的可视化界面中出现:stats in management ui are disabled on this node
**解决方案:**进入rabbitmq容器,运行命令:echo management_agent.disable_metrics_collector=false>/etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf
,退出rabbitmq容器,然后运行docker restart 容器id
重启rabbitmq容器。
后台管理系统的可视化界面中 overview 不显示图形的问题
解决方案:同《2. 后台管理系统的可视化界面中出现:stats in management ui are disabled on this node》
publisher:消息发送者
comsumer:消息消费者
queue:队列-存储消息
exchange:交换机-接收发送者发送的消息,并将消息路由到与其绑定的队列
virtual-host:虚拟主机-将数据隔离(多个项目使用同一个rabbitmq时,可以为每个项目建立一个virtual-host,将不同项目之间的exchange和queue隔离)
任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。同一个消息只会被一个消费者处理。多个消费者绑定到一个队列,可以加快消息处理速度。
默认情况下,rabbitmq的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。因此我们需要修改消费者的application.yml,设置prefetch值为1,确保同一时刻最多投递给消费者1条消息,消费者处理完后再投递下一条消息。
fanout exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。 fanout交换机会将收到的消息复制成n份,然后将消息发送到n个与其绑定的队列中。
应用场景:用户支付成功后,交易服务更新订单状态,短信服务通知用户,积分服务为用户增加积分。
实现:交易服务的queue、短信服务的queue、积分服务的queue都绑定到fanout交换机,用户支付成功后,支付服务将消息发送到fanout交换机,这样交易服务、短信服务、积分服务九都能收到这条消息了。
案例演示:
实现思路:
代码实现:
发送者:
@springboottest
class springamqptest {
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
string exchangename="hmall.fanout";
string message="hello everyone"; rabbittemplate.convertandsend(exchangename,null,message);
}
}
消费者:
@component
@slf4j
public class springrabbitlistener {
@rabbitlistener(queues = "fanout.queue1")
public void listenerworkqueue1(string message){
log.info("消费者1接收到消息:{},---{}",message, localtime.now());
}
@rabbitlistener(queues = "fanout.queue2")
public void listenerworkqueue2(string message){
log.info("消费者2接收到消息:{},---{}",message,localtime.now());
}
}
消费者输出:
direct exchange 会将接收到的消息根据规则路由到指定的queue,因此称为定向路由。
应用场景:用户取消后,只需要给交易服务发送消息,通知交易服务更新订单状态,而不需要给短信服务和积分服务发送消息。
案例演示:
实现思路:
代码实现:
消费者
@component
@slf4j
public class springrabbitlistener {
@rabbitlistener(queues = "direct.queue1")
public void listenerworkqueue1(string message){
log.info("消费者1接收到消息:{},---{}",message, localtime.now());
}
@rabbitlistener(queues = "direct.queue2")
public void listenerworkqueue2(string message){
log.info("消费者2接收到消息:{},---{}",message,localtime.now());
}
}
发送者
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
//交换机名称
string exchangename="hmall.direct";
//消息
string message_blue="hello blue";
string message_yellow="hello yellow";
string message_red="hello red";
//发送消息
rabbittemplate.convertandsend(exchangename,"blue",message_blue);
rabbittemplate.convertandsend(exchangename,"yellow",message_yellow);
rabbittemplate.convertandsend(exchangename,"red",message_red);
}
消费者输出:
topicexchange也是基于routingkey做消息路由,但是routingkey通常是多个单词的组合,并且以.分割。
queue与exchange指定routingkey时可以使用通配符:
案例演示:
实现思路:
代码实现:
消费者:
@component
@slf4j
public class springrabbitlistener {
@rabbitlistener(queues = "topic.queue1")
public void listenerworkqueue1(string message){
log.info("消费者1接收到消息:{},---{}",message, localtime.now());
}
@rabbitlistener(queues = "topic.queue2")
public void listenerworkqueue2(string message){
log.info("消费者2接收到消息:{},---{}",message,localtime.now());
}
}
发送者1:
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
//交换机名称
string exchangename="hmall.topic";
//消息
string message="中国新闻";
//发送消息
rabbittemplate.convertandsend(exchangename,"china.news",message);
}
消费者输出:
发送者2:
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
//交换机名称
string exchangename="hmall.topic";
//消息
string message="中国天气";
//发送消息
rabbittemplate.convertandsend(exchangename,"china.weather",message);
}
消费者输出:
advanced message queuing protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
spring amqp是基于amqp协议定义的一套api规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
amqptemplate
是一个接口,定义了基本的 amqp 操作,如发送消息、接收消息、转换消息等。它提供了与 amqp(包括 rabbitmq)通信的基本功能的抽象。
rabbittemplate
是 amqptemplate
的默认实现类,专门用于与 rabbitmq 进行交互。它实现了 amqptemplate
接口,并提供了更多与 rabbitmq 交互的具体功能和配置选项。
rabbittemplate
比 amqptemplate
更加丰富,提供了一些额外的高级特性和配置选项,如事务支持、消息确认机制、消息转换器等。这些功能可以更好地满足与 rabbitmq 高级交互需求。
综上所述,amqptemplate
是一个通用的 amqp 操作接口,而 rabbittemplate
是对其的具体实现,提供了更多与 rabbitmq 交互的功能和默认配置,使得在 spring 应用中使用 rabbitmq 变得更加简单和方便。
创建用户
创建虚拟主机
为用户添加可访问的虚拟主机
注意:当前登录用户默认有权限访问其创建的所有虚拟主机。
创建队列
durability:
durable:持久化队列,rabbit服务器重启后,这个队列还会存在
transient:临时队列,rabbit服务器重启后,这个队列将会被删除
查看队列的消费者
向队列中发布消息
获取队列中消息
队列中可以存储消息。当队列中的消息未被消费时,消息将存储在队列中,此时可以查看队列中的消息。
act mode:
nack message requeue true:获取消息,但是不做ack应答确认,消息重新入队
ack message requeue false:获取消息,应答确认,消息不重新入队,将会从队列中删除
reject requeue true:拒绝获取消息,消息重新入队
reject requeue false:拒绝获取消息,消息不重新入队,将会被删除
encoding:可以选择将消息进行base64编码
messages:从队列中获取的消息数量
清理消息
引入依赖
<!-- amqp依赖,包含rabbitmq-->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
application.yaml中配置rabbitmq
spring:
rabbitmq:
host: 192.168.1.2 # rabbitmq地址
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: jack # 用户名
password: jack # 密码
springamqp提供了几个类,用来声明队列、交换机及其绑定关系:
如果已经存在交换机、队列、绑定关系,运行代码时则不会进行创建,而且也不会报错。
通常发送者只需要关心消息发送,消费者关心队列、交换机、以及绑定关系,所以创建操作一般写在消费者中。
sping提供了基于java bean和基于@rabbitlistener
注解两种方式创建。
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class fanoutconfiguration {
//声明交换机
@bean
public fanoutexchange fanoutexchange(){
// 方式1
// return new fanoutexchange("hmall.fanout");
// 方式2
return exchangebuilder.fanoutexchange("hmall.fanout").build();
}
//声明队列
@bean
public queue fanoutqueue1(){
// 方式1
// return new queue("fanout.queue1",true);
// 方式2
return queuebuilder.durable("fanout.queue1").build();
}
//将队列和交换机绑定
@bean
public binding fanoutqueue1binding(queue fanoutqueue1,fanoutexchange fanoutexchange){//spring会去找beanname='fanoutqueue1'的bean作为参数传进来
return bindingbuilder.bind(fanoutqueue1).to(fanoutexchange);
}
@bean
public queue fanoutqueue2(){
return queuebuilder.durable("fanout.queue2").build();
}
@bean
public binding fanoutqueue2binding(queue fanoutqueue2,fanoutexchange fanoutexchange){//spring会去找beanname='fanoutqueue2'的bean作为参数传进来
return bindingbuilder.bind(fanoutqueue2).to(fanoutexchange);
}
}
@rabbitlistener
注解代码演示:@component
@slf4j
public class springrabbitlistener {
@rabbitlistener(bindings = @queuebinding( //将交换机和队列绑定
value = @queue(name="direct.queue1",durable = "true"), //如果没有队列direct.queue1则创建队列,并监听队列direct.queue1
exchange = @exchange(name = "hmall.direct",type = exchangetypes.direct), //如果没有交换机hmall.direct则创建交换机
key = {"blue","red"} //routingkey
))
public void listenerworkqueue1(string message){
log.info("消费者1接收到消息:{},---{}",message, localtime.now());
}
@rabbitlistener(bindings = @queuebinding(
value = @queue(name="direct.queue2",durable = "true"),
exchange = @exchange(name = "hmall.direct",type = exchangetypes.direct),
key = {"yellow","red"}
))
public void listenerworkqueue2(string message){
log.info("消费者2接收到消息:{},---{}",message,localtime.now());
}
}
直接发送给队列
方法:public void convertandsend(string routingkey, final object object)
,直接发给队列时,routingkey相当于队列名。
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimp lequeue() {
//队列名称
string queuename = "simple. queue";
//消息
string message = "hello, spring amqp!";
//发送消息
rabbittemplate.convertandsend(queuename, message);
}
注意:队列不显示绑定交换机时,默认还是会绑定到defalut exchange上
发送给fanout exchange
方法:public void convertandsend(string exchange, string routingkey, final object object)
,使用fanout exchange时,routingkey相当于队列名,发送给fanout exchange时,routingkey传null或""
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
//交换机名称
string exchangename="hmall.fanout";
//消息
string message="hello everyone";
//发送消息
rabbittemplate.convertandsend(exchangename,null,message);
}
发送给direct交换机
方法:public void convertandsend(string exchange, string routingkey, final object object)
,routingkey就是交换机和队列绑定时的routingkey
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
//交换机名称
string exchangename="hmall.direct";
//消息
string message_blue="hello blue";
string message_yellow="hello yellow";
string message_red="hello red";
//发送消息
rabbittemplate.convertandsend(exchangename,"blue",message_blue);
rabbittemplate.convertandsend(exchangename,"yellow",message_yellow);
rabbittemplate.convertandsend(exchangename,"red",message_red);
}
发送给topic交换机
方法:方法:public void convertandsend(string exchange, string routingkey, final object object)
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
//交换机名称
string exchangename="hmall.topic";
//消息
string message="中国新闻";
//发送消息
rabbittemplate.convertandsend(exchangename,"china.news",message);
}
@component
@slf4j
public class springrabbitlistener {
@rabbitlistener(queues = "队列名")
public void listenersimplequeue(string message){
log.info("消费者收到消息:{}",message);
}
}
convertandsend方法会先将消息进行序列化,然后再发送。
spring的对消息对象的处理是由org.springframework.amap.support.converter.messageconverter来处理的。而
默认实现是simplemessageconverter,如果消息实现了serializable接口,则会使用serialize方法进行序列化,而serialize方法是基于jdk的objectoutputstream完成序列化的。存在下列问题:
建议采用json序列化代替默认的jdk序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖,发送者和消费者要使用相同的消息转换器:
<dependency>
<groupid>com.fasterxml.jackson.core</groupid>
<artifactid>jackson-databind</artifactid>
</dependency>
在publisher和consumer中都要配置messageconverter:
@bean
public messageconverter messageconverter() {
return new jackson2jsonmessageconverter();
}
测试:
使用默认的消息转换器
发送者:
package com.itheima.publisher;
import lombok.allargsconstructor;
import lombok.data;
import org.junit.jupiter.api.test;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import java.io.serializable;
@springboottest
class springamqptest {
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue() {
user jack = new user("jack", 18);
rabbittemplate.convertandsend("testconvertmessage", jack);
}
}
@data
@allargsconstructor
class user implements serializable { //要实现serializable接口,否则convertandsend方法进行消息转换时会抛出异常
private string name;
private integer age;
}
查看消息:
消费者:
package com.itheima.consumer.mq;
import lombok.allargsconstructor;
import lombok.data;
import lombok.noargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
import java.io.serializable;
@component
@slf4j
public class springrabbitlistener {
@rabbitlistener(queues = "testconvertmessage")
public void listenerworkqueue(user message){
log.info("消费者接收到消息:{}",message);
}
}
@data
@allargsconstructor
@noargsconstructor
class user implements serializable {
private string name;
private integer age;
}
消费者输出:
配置消息转换器
发送者:
package com.itheima.publisher;
import lombok.allargsconstructor;
import lombok.data;
import org.junit.jupiter.api.test;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
@springboottest
class springamqptest {
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue() {
user jack = new user("jack", 18);
rabbittemplate.convertandsend("testconvertmessage", jack);
}
}
@data
@allargsconstructor
class user {
private string name;
private integer age;
}
查看消息:
消费者:
package com.itheima.consumer.mq;
import lombok.allargsconstructor;
import lombok.data;
import lombok.noargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
@slf4j
public class springrabbitlistener {
@rabbitlistener(queues = "testconvertmessage")
public void listenerworkqueue(user message){ //自动将json字符串转为user独享
log.info("消费者接收到消息:{}",message);
}
}
@data
@allargsconstructor
@noargsconstructor //消费者将消息转为user对象时,user对象一定要有空参构造器
class user {
private string name;
private integer age;
}
消费者输出:
消息丢失三种情况:
有的时候由于网络波动,可能会出现发送者连接mq失败的情况。通过配置我们可以开启连接失败后的重连机制,默认重连机制是关闭的。
spring:
rabbitmq:
connection-timeout: 1s #设置mq的连接超时时间,超过1秒钟还没有连上mq则表示连接超时
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 initial-interval * multiplier
max-attempts: 3 # 最大重试次数
案例演示:
停止mq
开启重连
spring:
rabbitmq:
host: 192.168.1.2
port: 5672
virtual-host: /hmall
username: jack
password: jack
connection-timeout: 1s
template:
retry:
enabled: true
initial-interval: 1000ms
multiplier: 1
max-attempts: 3
发送者发送消息
package com.itheima.publisher;
import org.junit.jupiter.api.test;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
@springboottest
class springamqptest {
@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue() {
rabbittemplate.convertandsend("testconvertmessage", "你好");
}
}
消息发送失败
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过springamqp提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能,如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
springamqp提供了publisher confirm和publisher return两种确认机制。开启确人机制后,当发送者发送消息给mq后,mq会返回确认结果给发送者。返回的结果有以下几种情况:
开启发送者确认机制:
开启配置
spring:
rabbitmq:
publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型
publisher-returns: true # 开局publisher return机制
这里publisher-confirm-type有三种模式可选:
none:关闭confirm机制
simple:同步阻塞等待mq的回执消息
correlated:mq异步回调方式返回回执消息
为rabbittemplate配置returnscallback
每个rabbittemplate只能配置一个returnscallback,因此需要在项目启动过程中配置:
每次发送消息时,指定消息id、消息confirmcallback
案例演示:
开启发送者确认配置
spring:
rabbitmq:
host: 192.168.1.2 # rabbitmq地址
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: jack # 用户名
password: jack # 密码
publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型
publisher-returns: true # 开局publisher return机制
定义returnscallback
package com.itheima.publisher;
import lombok.allargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.context.annotation.configuration;
import javax.annotation.postconstruct;
@configuration
@slf4j
@allargsconstructor
public class mqconfig {
private final rabbittemplate rabbittemplate;
@postconstruct
public void init() {
rabbittemplate.setreturnscallback(returnedmessage -> {
log.info("监听到了消息return callback");
log.info("exchange: {}", returnedmessage.getexchange());
log.info("routingkey: {}", returnedmessage.getroutingkey());
log.info("message:{}", returnedmessage.getmessage());
log.info("replycode: {}", returnedmessage.getreplycode());
log.info("replytext: {}", returnedmessage.getreplytext());
});
}
}
定义confirmcallback并发送消息
3.1 发送成功
package com.itheima.publisher;
import lombok.extern.slf4j.slf4j;
import org.junit.jupiter.api.test;
import org.springframework.amqp.rabbit.connection.correlationdata;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.util.concurrent.listenablefuturecallback;
import java.util.uuid;
import java.util.concurrent.timeunit;
@springboottest
@slf4j
class springamqptest {
@autowired
private rabbittemplate rabbittemplate;
@test
public void testconfirmcallback() {
//0. 创建correlationdata,并设置消息id
correlationdata cd = new correlationdata(uuid.randomuuid().tostring());
cd.getfuture().addcallback(new listenablefuturecallback<correlationdata.confirm>() {
@override
public void onfailure(throwable ex) {
log.error("spring amqp 处理确认结果异常", ex);
}
@override
public void onsuccess(correlationdata.confirm result) {
if (result.isack()) {
log.info("收到confirmcallback ack,消息发送成功!");
} else {
log.info("收到confirmcallback nack,消息发送失败!", result.getreason());
}
}
});
//1. 交换机名称
string exchangename = "hmall.direct";
//2. 消息
string message = "测试发送者确认";
//3. 发送消息
rabbittemplate.convertandsend(exchangename, "blue", message, cd);
//4. 此单元测试方法执行完,main线程就结束了,因此需要睡眠2s接收回调函数
try {
timeunit.seconds.sleep(2);
} catch (interruptedexception e) {
e.printstacktrace();
}
}
}
3.2 发送失败-路由失败
rabbittemplate.convertandsend(exchangename, "blue22", message, cd);
注意:发送者确认机制需要发送者和mq进行确认,会大大影响消息发送的效率,通常情况下不建议开启发送者确认机制。
在默认情况下,rabbitmq会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
rabbitmq实现数据持久化包括3个方面,设置为持久化后,重启mq,交换机、队列、消息也不会丢失。
交换机持久化(新建交换机默认就是持久化)
d表示持久化
队列持久化(新建队列默认就是持久化)
消息持久化(可视化界面发送消息时默认是非持久化,springamqp发送消息时默认是持久化的)
案例演示:
mq接收非持久化消息
发送者发送1百万条非持久化消息
发送耗时:
mq收到了一百万条非持久化消息
注意:本测试使用的mq是3.13.3,默认使用的是lazy queue模式:所有的消息直接存入磁盘,不再存储到内存,所以in memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,persistent表示存入磁盘且持久化的消息的数量)
重启mq后,一百万条非持久化消息全部丢失
mq接收持久化消息
发送者发送1百万条持久化消息
发送耗时:
mq收到了一百万条持久化消息
注意:本测试使用的mq是3.13.3,默认使用的是lazy queue模式:所有的消息直接存入磁盘,不再存储到内存,所以in memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,persistent表示存入磁盘且持久化的消息的数量)
重启mq后,一百万条持久化消息不会丢失
结论:
在接收非持久化消息时,mq收到消息后会先将消息存到内存中的队列中,队列满了之后会把先收到的消息存到磁盘中(这个行为称为paged out,paged out会导致mq阻塞),然后再继续接收消息,把消息存进内存中的队列中,队列满了之后再把队列中的消息存入磁盘中,以此类推。
在接收持久化消息时,mq会直接将消息存到磁盘中,不会等内存中的队列满了之后再将消息保存到磁盘中。
发送一千万条非持久化消息耗时:
发送一千万条持久化消息耗时:
从上面发送者发送一百万条消息的耗时来看,发送持久化消息比发送非持久化消息耗时更少(不需要paged out),而且持久化消息在mq重启后不会丢失,所以建议发送持久化消息。
从rabbitmq的3.6.0版本开始,就增加了lazy queue的概念,也就是惰性队列。
惰性队列的特征如下:
在3.12版本后,所有队列都是lazy queue模式,无法更改。
3.12版本之前的mq设置lazy queue模式有三种方式:
可视化界面设置
要设置一个队列为情性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
spring bean方式设置
注解方式设置
非lazy queue模式+持久化消息和lazy queue模式+持久化消息mq接收消息速度对比:
消费者确认机制(consumer acknowledgement)是为了确认消费者是否成功处理消息。mq将一条消息发送给消费者后,mq上的这条消息处理待确认状态,当消费者处理消息结束后,应该向rabbitmo发送一个回执,告知rabbitmq自己消息处理状态:
springamqp已经实现了消息确认功能。并允许我们通过配置文件选择ack处理方式,有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从mq删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式(默认模式)。springamqp利用 aop对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:
案例演示-自动模式:
消费者配置
spring:
rabbitmq:
host: 192.168.1.2 # rabbitmq地址
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: jack # 用户名
password: jack # 密码
listener:
simple:
prefetch: 1
acknowledge-mode: auto
消费者
发送者
查看消息状态:
因为消费者抛出业务异常,所以会给mq发送nack,然后mq不停地向消费者投递消息
查看消息内容
案例演示-手动模式:
消费者配置
spring:
rabbitmq:
host: 192.168.1.2 # rabbitmq地址
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: jack # 用户名
password: jack # 密码
listener:
simple:
prefetch: 1
acknowledge-mode: manual
消费者
发送者
3.1 发送者发送ackxxxx
程序会运行到消费者在21行的断点处,消费者输出
查看消息状态
放行消费者21行断点,查看消息状态
3.2 发送者发送nackxxxx,程序会运行到消费者在26行的断点处,消费者输出
查看消息状态
放行断点,消费者输出
查看消息状态,消息又被重新放回了队列,并且mq又将消息投递给了消费者
取消断点26行断点,消费者不停地输出,说明mq不停地向消费者进行消息重投
查看消息状态
停掉消费者进程,查看消息状态
3.3 发送者发送xxxx,消费者停在30行的断点处,消费者输出
查看消息状态
放行断点,查看消息状态
springamqp提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:
案例演示:
spring:
rabbitmq:
host: 192.168.1.2 # rabbitmq地址
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: jack # 用户名
password: jack # 密码
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true
initial-interval: 1000ms
multiplier: 1
max-attempts: 3
stateless: true
消费者
发送者
消费者输出
查看消息状态
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有messagerecoverer接口来处理,它包含三种不同的实现:
将失败处理策略改为republishmessagerecoverer:
首先,定义接收失败消息的交换机、队列及其绑定关系。
然后,定义republishmessagerecoverer:
案例演示:
消费者
消费者输出
查看error.queue上的消息
幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),例如求绝对值的函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
消除非幂等性的手段:
唯一消息id
案例演示:
配置消息转换器
发送者发送消息
查看消息
消费者使用message接收
业务判断
延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
当一个队列中的消息满足下列情况之一时,就会成为死信 (dead letter)
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(dead letter exchange,简称dlx)
案例演示:
消费者中定义交换机和队列,并监听
定义交换机和队列,并将交换机dlx.direct声明为死信交换机,并与队列normal.queue绑定。
查看队列、交换机、绑定关系
发送者,发送消息时设置消息的死亡时间
大约10s后消费者收到消息
说明
向交换机normal.direct中投递消息m,指定routingkey为hi,消息m的死亡时间设置为10s,消息m会被路由到队列normal.queue中,因为队列normal.queue没有消费者监听,碰巧队列normal.queue绑定了死信交换机dlx.direct,所以投递到队列normal.queue的消息m死亡后,会被转投到死信交换机dlx.direct中,因为指定的routingkey为hi,所以死信交换机dlx.direct会将消息m路由到队列dlx.queue中,而队列dlx.queue有消费者c监听,所以消费者c会消费消息m。这就实现了消息m延迟10秒后被消费。
使用死信队列可以实现延迟消息,但这种方法过于繁琐。为了简化这一过程,rabbitmq的官方推出了一款插件,该插件原生支持延迟消息功能。该插件的运作原理是设计了一种特殊的交换机,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。这种设计大大简化了延迟消息的处理过程,提高了系统的效率和可靠性。
下载:
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择下载的版本,插件的版本要和rabbitmq的版本保持一致
查看rabbitmq的版本:
docker run -d --name containerid -p 5672:5672 rabbitmq:3-management
选择3.13.x版本的插件
安装:
将插件复制到rabbitmq容器中
docker cp 插件路径 容器id或名称:/plugins/
安装
docker exec -it 容器id或名称 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用:
创建延迟交换机,三种方式
图形化界面操作
注解方式
springbean方式
查看延迟交换机
发送消息时需要通过消息头x-delay来设置过期时间
消费者大约10s后收到消息
https://b11et3un53m.feishu.cn/wiki/a9sawkuxsikj6dk3icacvwb4n3g
https://blog.csdn.net/karry_zzj/article/details/119513541
https://blog.csdn.net/weixin_42050545/article/details/121487823
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论