it编程 > 前端脚本 > Erlang

【RabbitMQ】使用手册

151人参与 2024-07-31 Erlang

rabbitmq

同步调用

优点:时效性强,等待到结果后才返回

缺点:扩展性差、性能下降(调用链越长耗时越久)、级联失败问题(一个调用点卡住,后面的链路都不能执行)

异步调用

异步调用通常是基于消息通知的方式,包含三个角色:

消息发送者:投递消息的人,就是原来的调用者

消息接收者:接收和处理消息的人,就是原来的服务提供者

消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器

image-20240620225011677

优点:

耦合度低,拓展性强

异步调用,无需等待,性能好

故障隔离,下游服务故障不影响上游业务

缓存消息,流量削峰填谷

缺点:

不能立即得到调用结果,时效性差

不确定下游业务执行是否成功

业务安全依赖于broker(消息代理)的可靠性

mq技术选型

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

rabbitmq安装

  1. 查找镜像:docker search rabbitmq

    image-20240622142056336

  2. 拉取镜像:docker pull rabbitmq:3.8.19,指定拉取版本为3.18.19,如果不指定则默认拉取latest

    image-20240622142226656

  3. 查看镜像:docker images

    image-20240622142422222

  4. 启动镜像:设置账号登录为admin,登录密码为admin,不指定镜像版本,默认启动rabbitmq:latest

    docker run \
    -e rabbitmq_default_user=admin \
    -e rabbitmq_default_pass=admin \
    -v mq-plugins:/plugins \
    --name mq \
    --hostname localhost \
    -p 15672:15672 \
    -p 5672:5672 \
    -d \
    rabbitmq:3.8.19
    
    • 15672是rabbitmq的后台管理端口
    • 5672是rabbitmq的aqmp端口
    • /plugins是rabbitmq容器存放插件的路径

    image-20240622142648269

  5. 查看容器:docker ps

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  6. 进入rabbitmq容器:docker exec -it 4df /bin/bash

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  7. 开启rabbitmq后台访问:rabbitmq-plugins enable rabbitmq_management

    image-20240622143150816

  8. 退出容器bash:exit

    image-20240622143240346

  9. 网页访问rabbitmq后台:访问http://localhost:15672,账号admin,密码admin

    image-20240622143517023

常见问题:

  1. 后台管理系统的可视化界面中出现:all stable feature flags must be enabled after completing an upgrade

    **解决方案:**点击admin -> feature flags,确保所有稳定的特性标志都是启用状态。如果有任何标志未启用,请将其启用。

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  2. 后台管理系统的可视化界面中出现:stats in management ui are disabled on this node

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    **解决方案:**进入rabbitmq容器,运行命令:echo management_agent.disable_metrics_collector=false>/etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf,退出rabbitmq容器,然后运行docker restart 容器id重启rabbitmq容器。

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  3. 后台管理系统的可视化界面中 overview 不显示图形的问题

    解决方案:同《2. 后台管理系统的可视化界面中出现:stats in management ui are disabled on this node》

rabbitmq介绍

image-20240621002222015

work queues(任务模型)

任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。同一个消息只会被一个消费者处理。多个消费者绑定到一个队列,可以加快消息处理速度。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

默认情况下,rabbitmq的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。因此我们需要修改消费者的application.yml,设置prefetch值为1,确保同一时刻最多投递给消费者1条消息,消费者处理完后再投递下一条消息。

image-20240623005915858

fanout交换机

fanout exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。 fanout交换机会将收到的消息复制成n份,然后将消息发送到n个与其绑定的队列中。

image-20240623010648252

应用场景:用户支付成功后,交易服务更新订单状态,短信服务通知用户,积分服务为用户增加积分。

实现:交易服务的queue、短信服务的queue、积分服务的queue都绑定到fanout交换机,用户支付成功后,支付服务将消息发送到fanout交换机,这样交易服务、短信服务、积分服务九都能收到这条消息了。

案例演示

实现思路:

  1. 在rabbitmq控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在rabbitmq控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向hmall.fanout发送消息

代码实现:

image-20240623013810879

发送者:

@springboottest
class springamqptest {
    @autowired
    private rabbittemplate rabbittemplate;
    @test
    public void testsimplequeue(){
        string exchangename="hmall.fanout";
        string message="hello everyone";    						rabbittemplate.convertandsend(exchangename,null,message);
    }
}

消费者:

@component
@slf4j
public class springrabbitlistener {
    @rabbitlistener(queues = "fanout.queue1")
    public void listenerworkqueue1(string message){
        log.info("消费者1接收到消息:{},---{}",message, localtime.now());
    }
    @rabbitlistener(queues = "fanout.queue2")
    public void listenerworkqueue2(string message){
        log.info("消费者2接收到消息:{},---{}",message,localtime.now());
    }
}

消费者输出:

image-20240623015031061

direct交换机

direct exchange 会将接收到的消息根据规则路由到指定的queue,因此称为定向路由。

image-20240623124250467

应用场景:用户取消后,只需要给交易服务发送消息,通知交易服务更新订单状态,而不需要给短信服务和积分服务发送消息。

案例演示

实现思路:

  1. 在rabbitmq控制台中,声明队列direct.queue1和direct.queue2
  2. 在rabbitmq控制台中,声明交换机hmall. direct,将两个队列与其绑定,routekey 为blue时路由到direct.queue1,为yellow时路由到direct.queue2,为red时路由到direct.queue1和direct.queue2
  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  4. 在publisher中编写测试方法,利用不同的routingkey向hmall. direct发送消息

代码实现:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消费者

@component
@slf4j
public class springrabbitlistener {
    @rabbitlistener(queues = "direct.queue1")
    public void listenerworkqueue1(string message){
        log.info("消费者1接收到消息:{},---{}",message, localtime.now());
    }
    @rabbitlistener(queues = "direct.queue2")
    public void listenerworkqueue2(string message){
        log.info("消费者2接收到消息:{},---{}",message,localtime.now());
    }
}

发送者

@autowired
private rabbittemplate rabbittemplate;
@test
public void testsimplequeue(){
  //交换机名称
  string exchangename="hmall.direct";
  //消息
  string message_blue="hello blue";
  string message_yellow="hello yellow";
  string message_red="hello red";
  //发送消息
 rabbittemplate.convertandsend(exchangename,"blue",message_blue);
 rabbittemplate.convertandsend(exchangename,"yellow",message_yellow);
 rabbittemplate.convertandsend(exchangename,"red",message_red);
}

消费者输出:

image-20240623130816236

topic交换机

topicexchange也是基于routingkey做消息路由,但是routingkey通常是多个单词的组合,并且以.分割。

queue与exchange指定routingkey时可以使用通配符:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示

实现思路:

  1. 在rabbitmq控制台中,声明队列topic.queue1和topic.queue2
  2. 在rabbitmq控制台中,声明交换机hmall. topic,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
  4. 在publisher中编写测试方法,利用不同的routingkey向hmall. topic发送消息

代码实现:

image-20240623133252103

消费者:

@component
@slf4j
public class springrabbitlistener {
    @rabbitlistener(queues = "topic.queue1")
    public void listenerworkqueue1(string message){
        log.info("消费者1接收到消息:{},---{}",message, localtime.now());
    }
    @rabbitlistener(queues = "topic.queue2")
    public void listenerworkqueue2(string message){
        log.info("消费者2接收到消息:{},---{}",message,localtime.now());
    }
}

发送者1:

    @autowired
    private rabbittemplate rabbittemplate;
    @test
    public void testsimplequeue(){
        //交换机名称
        string exchangename="hmall.topic";
        //消息
        string message="中国新闻";
        //发送消息
        rabbittemplate.convertandsend(exchangename,"china.news",message);
    }

消费者输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

发送者2:

    @autowired
    private rabbittemplate rabbittemplate;
    @test
    public void testsimplequeue(){
        //交换机名称
        string exchangename="hmall.topic";
        //消息
        string message="中国天气";
        //发送消息
        rabbittemplate.convertandsend(exchangename,"china.weather",message);
    }

消费者输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

amqp

advanced message queuing protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

spring amqp是基于amqp协议定义的一套api规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

amqptemplate和rabbittemplate

amqptemplate 是一个接口,定义了基本的 amqp 操作,如发送消息、接收消息、转换消息等。它提供了与 amqp(包括 rabbitmq)通信的基本功能的抽象。

rabbittemplateamqptemplate 的默认实现类,专门用于与 rabbitmq 进行交互。它实现了 amqptemplate 接口,并提供了更多与 rabbitmq 交互的具体功能和配置选项。

rabbittemplateamqptemplate 更加丰富,提供了一些额外的高级特性和配置选项,如事务支持、消息确认机制、消息转换器等。这些功能可以更好地满足与 rabbitmq 高级交互需求。

综上所述,amqptemplate 是一个通用的 amqp 操作接口,而 rabbittemplate 是对其的具体实现,提供了更多与 rabbitmq 交互的功能和默认配置,使得在 spring 应用中使用 rabbitmq 变得更加简单和方便。

rabbitmq使用

后台可视化界面操作

代码操作

  1. 引入依赖

    <!-- amqp依赖,包含rabbitmq-->
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-amqp</artifactid>
    </dependency>
    
  2. application.yaml中配置rabbitmq

    spring:
      rabbitmq:
        host: 192.168.1.2 # rabbitmq地址
        port: 5672 # 端口
        virtual-host: /hmall # 虚拟主机
        username: jack # 用户名
        password: jack # 密码
    
创建队列和交换机

springamqp提供了几个类,用来声明队列、交换机及其绑定关系:

image-20240623135444692

如果已经存在交换机、队列、绑定关系,运行代码时则不会进行创建,而且也不会报错。

通常发送者只需要关心消息发送,消费者关心队列、交换机、以及绑定关系,所以创建操作一般写在消费者中。

sping提供了基于java bean和基于@rabbitlistener注解两种方式创建。

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class fanoutconfiguration {
  //声明交换机
    @bean
    public fanoutexchange fanoutexchange(){
//        方式1
//        return new fanoutexchange("hmall.fanout");
//        方式2
        return exchangebuilder.fanoutexchange("hmall.fanout").build();
    }
  //声明队列
    @bean
    public queue fanoutqueue1(){
//        方式1
//        return new queue("fanout.queue1",true);
//        方式2
        return queuebuilder.durable("fanout.queue1").build();
    }
  //将队列和交换机绑定
    @bean
    public binding fanoutqueue1binding(queue fanoutqueue1,fanoutexchange fanoutexchange){//spring会去找beanname='fanoutqueue1'的bean作为参数传进来
        return bindingbuilder.bind(fanoutqueue1).to(fanoutexchange);
    }
    @bean
    public queue fanoutqueue2(){
        return queuebuilder.durable("fanout.queue2").build();
    }
    @bean
    public binding fanoutqueue2binding(queue fanoutqueue2,fanoutexchange fanoutexchange){//spring会去找beanname='fanoutqueue2'的bean作为参数传进来
        return bindingbuilder.bind(fanoutqueue2).to(fanoutexchange);
    }
}
@component
@slf4j
public class springrabbitlistener {
    @rabbitlistener(bindings = @queuebinding( //将交换机和队列绑定
            value = @queue(name="direct.queue1",durable = "true"), //如果没有队列direct.queue1则创建队列,并监听队列direct.queue1
            exchange = @exchange(name = "hmall.direct",type = exchangetypes.direct), //如果没有交换机hmall.direct则创建交换机
            key = {"blue","red"} //routingkey
            ))
    public void listenerworkqueue1(string message){
        log.info("消费者1接收到消息:{},---{}",message, localtime.now());
    }

    @rabbitlistener(bindings = @queuebinding(
            value = @queue(name="direct.queue2",durable = "true"),
            exchange = @exchange(name = "hmall.direct",type = exchangetypes.direct),
            key = {"yellow","red"}
    ))
    public void listenerworkqueue2(string message){
        log.info("消费者2接收到消息:{},---{}",message,localtime.now());
    }
}
发送消息
接收消息
@component
@slf4j
public class springrabbitlistener {
    @rabbitlistener(queues = "队列名")
    public void listenersimplequeue(string message){
        log.info("消费者收到消息:{}",message);
    }
}
配置消息转换器

convertandsend方法会先将消息进行序列化,然后再发送。

spring的对消息对象的处理是由org.springframework.amap.support.converter.messageconverter来处理的。而
默认实现是simplemessageconverter,如果消息实现了serializable接口,则会使用serialize方法进行序列化,而serialize方法是基于jdk的objectoutputstream完成序列化的。存在下列问题:

image-20240623151642723

建议采用json序列化代替默认的jdk序列化,要做两件事情:

  1. 在publisher和consumer中都要引入jackson依赖,发送者和消费者要使用相同的消息转换器:

    <dependency>
      <groupid>com.fasterxml.jackson.core</groupid>
      <artifactid>jackson-databind</artifactid>
    </dependency>
    
  2. 在publisher和consumer中都要配置messageconverter:

    @bean
    public messageconverter messageconverter() {
      return new jackson2jsonmessageconverter();
    }
    

测试:

消息可靠性

消息丢失三种情况:

发送者的可靠性

发送者重连

有的时候由于网络波动,可能会出现发送者连接mq失败的情况。通过配置我们可以开启连接失败后的重连机制,默认重连机制是关闭的。

spring:
  rabbitmq:
    connection-timeout: 1s #设置mq的连接超时时间,超过1秒钟还没有连上mq则表示连接超时
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

案例演示:

  1. 停止mq

    image-20240623173037488

  2. 开启重连

    spring:
      rabbitmq:
        host: 192.168.1.2 
        port: 5672 
        virtual-host: /hmall 
        username: jack 
        password: jack 
        connection-timeout: 1s 
        template:
          retry:
            enabled: true 
            initial-interval: 1000ms 
            multiplier: 1 
            max-attempts: 3 
    
  3. 发送者发送消息

    package com.itheima.publisher;
    
    import org.junit.jupiter.api.test;
    import org.springframework.amqp.rabbit.core.rabbittemplate;
    import org.springframework.beans.factory.annotation.autowired;
    import org.springframework.boot.test.context.springboottest;
    
    @springboottest
    class springamqptest {
        @autowired
        private rabbittemplate rabbittemplate;
    
        @test
        public void testsimplequeue() {
            rabbittemplate.convertandsend("testconvertmessage", "你好");
        }
    }
    
  4. 消息发送失败

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过springamqp提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能,如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

发送者确认

springamqp提供了publisher confirm和publisher return两种确认机制。开启确人机制后,当发送者发送消息给mq后,mq会返回确认结果给发送者。返回的结果有以下几种情况:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

开启发送者确认机制:

  1. 开启配置

    spring:
      rabbitmq:
        publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型
        publisher-returns: true # 开局publisher return机制
    

    这里publisher-confirm-type有三种模式可选:

    • none:关闭confirm机制

    • simple:同步阻塞等待mq的回执消息

    • correlated:mq异步回调方式返回回执消息

  2. 为rabbittemplate配置returnscallback

    每个rabbittemplate只能配置一个returnscallback,因此需要在项目启动过程中配置:

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  3. 每次发送消息时,指定消息id、消息confirmcallback

    image-20240623175841421

案例演示:

  1. 开启发送者确认配置

    spring:
      rabbitmq:
        host: 192.168.1.2 # rabbitmq地址
        port: 5672 # 端口
        virtual-host: /hmall # 虚拟主机
        username: jack # 用户名
        password: jack # 密码
        publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型
        publisher-returns: true # 开局publisher return机制
    
  2. 定义returnscallback

    package com.itheima.publisher;
    
    import lombok.allargsconstructor;
    import lombok.extern.slf4j.slf4j;
    import org.springframework.amqp.rabbit.core.rabbittemplate;
    import org.springframework.context.annotation.configuration;
    import javax.annotation.postconstruct;
    
    @configuration
    @slf4j
    @allargsconstructor
    public class mqconfig {
        private final rabbittemplate rabbittemplate;
        @postconstruct
        public void init() {
            rabbittemplate.setreturnscallback(returnedmessage -> {
                log.info("监听到了消息return callback");
                log.info("exchange: {}", returnedmessage.getexchange());
                log.info("routingkey: {}", returnedmessage.getroutingkey());
                log.info("message:{}", returnedmessage.getmessage());
                log.info("replycode: {}", returnedmessage.getreplycode());
                log.info("replytext: {}", returnedmessage.getreplytext());
            });
        }
    }
    
  3. 定义confirmcallback并发送消息

    3.1 发送成功

    package com.itheima.publisher;
    
    import lombok.extern.slf4j.slf4j;
    import org.junit.jupiter.api.test;
    import org.springframework.amqp.rabbit.connection.correlationdata;
    import org.springframework.amqp.rabbit.core.rabbittemplate;
    import org.springframework.beans.factory.annotation.autowired;
    import org.springframework.boot.test.context.springboottest;
    import org.springframework.util.concurrent.listenablefuturecallback;
    import java.util.uuid;
    import java.util.concurrent.timeunit;
    
    @springboottest
    @slf4j
    class springamqptest {
        @autowired
        private rabbittemplate rabbittemplate;
    
        @test
        public void testconfirmcallback() {
            //0. 创建correlationdata,并设置消息id
            correlationdata cd = new correlationdata(uuid.randomuuid().tostring());
            cd.getfuture().addcallback(new listenablefuturecallback<correlationdata.confirm>() {
                @override
                public void onfailure(throwable ex) {
                    log.error("spring amqp 处理确认结果异常", ex);
                }
                @override
                public void onsuccess(correlationdata.confirm result) {
                    if (result.isack()) {
                        log.info("收到confirmcallback ack,消息发送成功!");
                    } else {
                        log.info("收到confirmcallback nack,消息发送失败!", result.getreason());
                    }
                }
            });
            //1. 交换机名称
            string exchangename = "hmall.direct";
            //2. 消息
            string message = "测试发送者确认";
            //3. 发送消息
            rabbittemplate.convertandsend(exchangename, "blue", message, cd);
            //4. 此单元测试方法执行完,main线程就结束了,因此需要睡眠2s接收回调函数
            try {
                timeunit.seconds.sleep(2);
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
        }
    }
    

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    3.2 发送失败-路由失败

    rabbittemplate.convertandsend(exchangename, "blue22", message, cd);
    

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

注意:发送者确认机制需要发送者和mq进行确认,会大大影响消息发送的效率,通常情况下不建议开启发送者确认机制。

mq的可靠性

在默认情况下,rabbitmq会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

数据持久化

rabbitmq实现数据持久化包括3个方面,设置为持久化后,重启mq,交换机、队列、消息也不会丢失。

案例演示:

mq接收非持久化消息

发送者发送1百万条非持久化消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

发送耗时:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

mq收到了一百万条非持久化消息

注意:本测试使用的mq是3.13.3,默认使用的是lazy queue模式:所有的消息直接存入磁盘,不再存储到内存,所以in memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,persistent表示存入磁盘且持久化的消息的数量)

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

重启mq后,一百万条非持久化消息全部丢失

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

mq接收持久化消息

发送者发送1百万条持久化消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

发送耗时:

image-20240623222929928

mq收到了一百万条持久化消息

注意:本测试使用的mq是3.13.3,默认使用的是lazy queue模式:所有的消息直接存入磁盘,不再存储到内存,所以in memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,persistent表示存入磁盘且持久化的消息的数量)

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

重启mq后,一百万条持久化消息不会丢失

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

结论

在接收非持久化消息时,mq收到消息后会先将消息存到内存中的队列中,队列满了之后会把先收到的消息存到磁盘中(这个行为称为paged out,paged out会导致mq阻塞),然后再继续接收消息,把消息存进内存中的队列中,队列满了之后再把队列中的消息存入磁盘中,以此类推。

在接收持久化消息时,mq会直接将消息存到磁盘中,不会等内存中的队列满了之后再将消息保存到磁盘中。

发送一千万条非持久化消息耗时:

image-20240623223503631

发送一千万条持久化消息耗时:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

从上面发送者发送一百万条消息的耗时来看,发送持久化消息比发送非持久化消息耗时更少(不需要paged out),而且持久化消息在mq重启后不会丢失,所以建议发送持久化消息。

lazy queue

从rabbitmq的3.6.0版本开始,就增加了lazy queue的概念,也就是惰性队列。

惰性队列的特征如下:

在3.12版本后,所有队列都是lazy queue模式,无法更改。

3.12版本之前的mq设置lazy queue模式有三种方式:

非lazy queue模式+持久化消息和lazy queue模式+持久化消息mq接收消息速度对比:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消费者的可靠性

消费者确认机制

消费者确认机制(consumer acknowledgement)是为了确认消费者是否成功处理消息。mq将一条消息发送给消费者后,mq上的这条消息处理待确认状态,当消费者处理消息结束后,应该向rabbitmo发送一个回执,告知rabbitmq自己消息处理状态:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

springamqp已经实现了消息确认功能。并允许我们通过配置文件选择ack处理方式,有三种方式:

案例演示-自动模式

  1. 消费者配置

    spring:
      rabbitmq:
        host: 192.168.1.2 # rabbitmq地址
        port: 5672 # 端口
        virtual-host: /hmall # 虚拟主机
        username: jack # 用户名
        password: jack # 密码
        listener:
          simple:
            prefetch: 1
            acknowledge-mode: auto
    
  2. 消费者

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  3. 发送者

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    查看消息状态:

    image-20240624004948237

    • 因为消费者抛出业务异常,所以会给mq发送nack,然后mq不停地向消费者投递消息

      image-20240624005751210

    查看消息内容

    image-20240624005115377

    • 查看队列中的消息,提示队列是空的,所以得出结论:待确认的消息不保存在队列中

案例演示-手动模式

  1. 消费者配置

    spring:
      rabbitmq:
        host: 192.168.1.2 # rabbitmq地址
        port: 5672 # 端口
        virtual-host: /hmall # 虚拟主机
        username: jack # 用户名
        password: jack # 密码
        listener:
          simple:
            prefetch: 1
            acknowledge-mode: manual
    
  2. 消费者

    image-20240624233521995

  3. 发送者

    3.1 发送者发送ackxxxx

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    程序会运行到消费者在21行的断点处,消费者输出

    image-20240624233857040

    查看消息状态

    image-20240624234215403

    放行消费者21行断点,查看消息状态

    image-20240624234412916

    3.2 发送者发送nackxxxx,程序会运行到消费者在26行的断点处,消费者输出

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    查看消息状态

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    放行断点,消费者输出

    image-20240624235531700

    查看消息状态,消息又被重新放回了队列,并且mq又将消息投递给了消费者

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    取消断点26行断点,消费者不停地输出,说明mq不停地向消费者进行消息重投

    image-20240624235844464

    查看消息状态

    image-20240625000011663

    停掉消费者进程,查看消息状态

    image-20240625000247413

    3.3 发送者发送xxxx,消费者停在30行的断点处,消费者输出

    image-20240625001012535

    查看消息状态

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    放行断点,查看消息状态

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

失败重试机制

springamqp提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

案例演示

  1. 消费者配置
spring:
  rabbitmq:
    host: 192.168.1.2 # rabbitmq地址
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: jack # 用户名
    password: jack # 密码
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true
          initial-interval: 1000ms
          multiplier: 1
          max-attempts: 3
          stateless: true
  1. 消费者

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  2. 发送者

    image-20240624010548871

  3. 消费者输出

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  4. 查看消息状态

    image-20240624011251261

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有messagerecoverer接口来处理,它包含三种不同的实现:

将失败处理策略改为republishmessagerecoverer:

  1. 首先,定义接收失败消息的交换机、队列及其绑定关系。

  2. 然后,定义republishmessagerecoverer:

    image-20240624012025791

案例演示

  1. 定义接收失败消息的交换机、队列、绑定关系、republishmessagerecoverer

image-20240624012657886

  1. 消费者

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  2. 消费者输出

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  3. 查看error.queue上的消息

    image-20240624013151477

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),例如求绝对值的函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

image-20240624013938181

消除非幂等性的手段

案例演示:

  1. 配置消息转换器

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  2. 发送者发送消息

    image-20240624014854578

  3. 查看消息

    image-20240624014817996

  4. 消费者使用message接收

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

延迟任务:设置在一定时间之后才执行的任务

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信 (dead letter)

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(dead letter exchange,简称dlx)

image-20240624021939871

案例演示

  1. 消费者中定义交换机和队列,并监听

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  2. 定义交换机和队列,并将交换机dlx.direct声明为死信交换机,并与队列normal.queue绑定。

    image-20240624184949769

  3. 查看队列、交换机、绑定关系

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    image-20240624190459639

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  4. 发送者,发送消息时设置消息的死亡时间

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  5. 大约10s后消费者收到消息

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    image-20240624193050394

  6. 说明

    向交换机normal.direct中投递消息m,指定routingkey为hi,消息m的死亡时间设置为10s,消息m会被路由到队列normal.queue中,因为队列normal.queue没有消费者监听,碰巧队列normal.queue绑定了死信交换机dlx.direct,所以投递到队列normal.queue的消息m死亡后,会被转投到死信交换机dlx.direct中,因为指定的routingkey为hi,所以死信交换机dlx.direct会将消息m路由到队列dlx.queue中,而队列dlx.queue有消费者c监听,所以消费者c会消费消息m。这就实现了消息m延迟10秒后被消费。

延迟消息插件

使用死信队列可以实现延迟消息,但这种方法过于繁琐。为了简化这一过程,rabbitmq的官方推出了一款插件,该插件原生支持延迟消息功能。该插件的运作原理是设计了一种特殊的交换机,当消息投递到这种交换机时,它能够暂存一段时间,直到达到设定的延迟时间后再将消息投递到相应的队列。这种设计大大简化了延迟消息的处理过程,提高了系统的效率和可靠性。

下载

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

选择下载的版本,插件的版本要和rabbitmq的版本保持一致

安装

使用

  1. 创建延迟交换机,三种方式

    • 图形化界面操作

      image-20240624214941680

    • 注解方式

      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

    • springbean方式

      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  2. 查看延迟交换机

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  3. 发送消息时需要通过消息头x-delay来设置过期时间

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  4. 消费者大约10s后收到消息

    image-20240624215445214

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

本文参考文档

https://b11et3un53m.feishu.cn/wiki/a9sawkuxsikj6dk3icacvwb4n3g

https://blog.csdn.net/karry_zzj/article/details/119513541

https://blog.csdn.net/weixin_42050545/article/details/121487823

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

Ubuntu安装RabbitMq(保姆级教学,直接喂饭)

07-31

最全干货!使用Docker构建RabbitMQ高可用负载均衡集群,附答案+考点

07-28

Ruby langchainrb gem and custom configuration for the model setup

07-28

RabbitMQ 安装分享

07-28

RabbitMQ详解与实战(绝对足够惊喜)

07-28

RabbitMQ 消息丢失的场景,如何保证消息不丢失?

07-28

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论