it编程 > 前端脚本 > Erlang

消息队列-RabbitMQ

172人参与 2024-08-01 Erlang

消息队列-rabbitmq

中间件

中间件就是帮助连接多个系统,能让多个系统紧密协作的技术或者组件。比如:redis、消息队列。

比如在分布式系统中,将整个系统按业务进行拆分。分成不同的子系统,系统a负责往 redis 存数据,系统b从 redis 中取数据。两个系统借助 redis 进行协作。这时 redis 就充当了中间人的角色,连接起了两个系统,这就是中间件的概念。

消息队列

消息队列是一种很常见的中间件。字面意思来看就是存储消息的队列。有三个关键词:存储、消息、队列。就是能够存储任意数据结构的数据,例如字符串、二进制数据、json等,且有着队列先进先出的数据结构特点。

消息队列的应用场景(作用):在多个不同的系统、应用之间实现消息的传输,不用考虑应用的编程语言、系统、框架等。比如让 java 开发的应用发消息,让 python 开发的应用收消息,这样不用把所有代码写到同一个项目里,还能充分发挥不同系统的优势,实现应用的解耦。

消息队列的模型

消息队列主要由五部分组成:消息生产者(producer)、消息消费者(consumer)、消息(message)、消息队列(queue)。

以收寄快递的场景为例:发件人小明(消息生产者)将一个包裹(消息)交给快递员,快递员将包裹直接送到快递柜(消息队列),等收件人小王(消息消费者)什么时候有时间就去快递柜取快递。

消息队列的优点

异步处理

同步和异步。同步就像是打电话,电话接通后就要停下手头的工作先处理电话回应。异步就像是发邮件,只要把邮件发出去就完事,接着干自己的活,不管对面什么时候才回应。

有了异步处理意味着生产者发送完消息后可以立即转而进行其它任务,无需等待消费者的处理响应,避免了阻塞。如下图所示。

如果是同步的情况下,发布支付成功事件,假设其它服务都在150ms完成返回,这样完成耗时就是50ms+10ms+150ms,足足要210ms。但如果是异步的话,发布支付成功事件就不管了,这样仅需50ms+10ms。

image-20240202162000320

流量削峰

流量削峰是指当前消费者的处理能力有限(例如,ai 应用一次回答时间要隔几秒才能返回响应),而用户的请求量又很大。我们可以将用户的请求存储到消息队列中,利用队列数据结构的特性,按照实际情况,处理完一个请求后再从队列中取出下一个请求。这样就很好的保护了系统,不管请求量多大都能将流量高峰像放到水管的水流一样,以恒定流速稳定的处理。

数据持久化

它能将消息集中存储到硬盘里实现消息持久化,即使服务器宕机重启后数据也不会丢失。

分布式消息队列的优势

消息队列运行在单个应用程序内部,用于实现应用程序内部的异步处理,对于提高应用程序性能非常有用,但它们不具备跨进程或跨网络的能力,此时它仅仅是一般的消息队列。

而分布式消息队列是一种跨网络、跨系统的,可以在多个应用程序之间进行消息的发送和接收。它作用于多个系统构成紧密协作,此时的消息队列就是分布式消息队列。

分布式消息队列的优势

分布式消息队列除了有异步处理、流量削峰、数据持久化的优势还有可扩展性、应用解耦

可扩展性,这是分布式与单机最大的区别。如果一个服务器只能处理1000个用户请求,超出这个数量服务器可能就无法承受。分布式消息队列的节点可以水平扩展,系统可以根据需要动态地添加或移除节点,以应对不断增长的消息流量和用户请求。

应用解耦,一个大系统拆分成不同的子系统,每个子系统负责一部分业务互不干扰,还可以连接不同语言、框架开发的系统,让这些系统能更灵活,这个就是分布式架构的系统。利用消息队列完成消息的传递,衔接起各个子系统构成一个大系统。

消息队列的应用场景

  1. 耗时的场景(异步)
  2. 高并发场景(异步、流量削峰)
  3. 分布式系统协作(应用解耦)
  4. 强稳定性的场景(比如金融业务,持久化、可靠性、流量削峰)

消息队列的缺点

消息队列并非适合所有情况,某些情况下我们应该避免使用消息队列,或者需要认清使用消息队列可能会遇到的问题。

投入成本

使用消息队列意味着你需要学习和掌握一个新的工具,并在你的系统中引入一个额外的中间件。这将会使你的系统变得更复杂,并需要更多的维护工作。若你在公司实施此类解决方案,我们通常会选择由第三方大公司提供的稳定中间件,而这会产生额外的成本。即使你自己部署和维护,也需要额外的资源投入。

消息的顺序性

因为队列数据结构的特性,消息需要按照特定的顺序被消费。

数据的一致性

这是任何分布式系统都要考虑的问题,要保证数据的一致性,防止消息的重复消费,避免同一个系统多次处理同一条消息。分布式锁就是为了解决分布式系统中多个服务器之间的一致性问题。

消息队列的可靠性

开始使用消息队列,就需要承担由此带来的各种可能问题,例如消息丢失。并非消息队列就可以保证消息不会丢失。比如在发送消息的过程中,可能就因为某些原因而失败;或者消息队列宕机情况的发送。

主流消息队列选型

比较常见的mq实现:

image-20240203214744783

追求可用性:kafka、 rocketmq 、rabbitmq

追求可靠性:rabbitmq、rocketmq

追求吞吐能力:rocketmq、kafka

追求消息低延迟:rabbitmq、kafka

rabbitmq 的使用

rabbitmq 官网教程:https://www.rabbitmq.com/getstarted.html

在 rabbitmq 的官网教程中提供了5个不同的demo示例,每种都有不同语言实现的版本,这里我们选择使用java教程。

image-20240202173024856

helloword

第一种 helloworld 快速入门,它是基于最基础的消息队列模型来实现的,是一对一的消息队列模型。包括三个角色:

image-20240202173314203

示例代码如下

生产者代码

package com.gdit.rabbitmq.mq_java.helloworld;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

// 定义一个名为 singleproducer 的类,用于实现消息发送功能
public class singleproducer {

    // 定义消息队列名称
    private final static string queue_name = "hello";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.138.*.*"); // 设置 rabbitmq 所在服务器主机名,本地就localhost
        factory.setusername("***"); // 指定连接用户名
        factory.setpassword("***"); // 指定连接密码

        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        try (connection connection = factory.newconnection();
             // 通过已建立的连接创建一个的通道
             channel channel = connection.createchannel()) {
            // 在通道上声明(创建)队列 参数一:队列名称 参数二:是否持久化 参数三:是否为独占队列 参数四:无人使用是否自动删除
            channel.queuedeclare(queue_name, false, false, false, null);
            // 要发送的消息内容
            string message = "hello world!";
            // 使用 channel.basicpublish 方法将消息发送到指定队列 参数一:交换机名称 参数二:路由键(队列名称) 参数三:其它 参数四:消息内容
            channel.basicpublish("", queue_name, null, message.getbytes());
            system.out.println(" [x] sent '" + message + "'");
        }
    }
}

运行生产者代码,查看 idea 和 rabbitmq 控制台

image-20240202175631652

image-20240202175748783

可以看到 idea 控制台发送成功,rabbitmq 控制台多了个 hello 队列,队列内消息总数 1 条,已准备 1 条

消息发送之后,我们根据官方文档编写消费者代码去消费消息

消费者代码

package com.gdit.rabbitmq.mq_java.helloworld;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;

public class singleconsumer {

    // 定义消息队列名称
    private final static string queue_name = "hello";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        connection connection = factory.newconnection();
        // 通过已建立的连接创建一个的通道
        channel channel = connection.createchannel();

        // 在通道上声明(创建)队列,在该通道上声明要监听的队列
        channel.queuedeclare(queue_name, false, false, false, null);
        system.out.println(" [*] waiting for messages. to exit press ctrl+c");

        // 处理接收的消息
        delivercallback delivercallback = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            system.out.println(" [x] received '" + message + "'"); // 打印消息
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给 delivercallback 来处理,会持续阻塞
        // 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
        channel.basicconsume(queue_name, true, delivercallback, consumertag -> { });

    }

}

运行消费者代码,查看 idea 和 rabbitmq 控制台

image-20240202181059940

image-20240202180939686

可以看到 idea 控制台接收消息成功,rabbitmq 控制台hello队列内消息总数 0 条,已准备 0 条。说明消费成功了

work queues

第二种 work queues,它是一对多的消息队列模型,就是一个生产者给一个队列发送消息,可以有多个消费者从这个队列取消息。也是包括三个角色:

image-20240202195903748

适用场景:多个机器同时去队列接受并处理任务(尤其是每个机器的处理能力有限)。假设当前只有一个消费者,由于其性能有限,无法处理队列中的所有任务。此时我们通过增加机器的方式,让多台机器监听同一队列,处理队列内的消息。就是处理并发

公平分配

在这个模型中,官网有提到一个公平分配。意思是生产者发送多条消息后,这些消息会按照轮询方式将消息分配给消费者,就是一人一条轮着来。这样做的方式可以做到公平分配,但每台机器处理一条消息的时间不同。比如机器a处理一条消息需要10s,而机器b处理一条消息只需要5s。如果还按照公平分配那机器b会因为机器a被拖慢速度。这里我们可以通过控制单个消费者处理任务积压数channel.basicqos(prefetchcount);一次只处理一条消息,也就是说,在消费者未确认前,rabbitmq 不会向该消费者发送更多的消息。而先完成消费确认可以从消息队列中取下一条消息。这样就不用等了。不过它只对手动确认消息生效(后面讲),如果自动确认消息那它收到消息就确认了还会造成公平分配的情况发生。

image-20240202205420928

示例代码如下:

消息生产者代码

package com.gdit.rabbitmq.mq_java.work_queues;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.messageproperties;

import java.util.scanner;

public class multiproducer {

    // 定义消息队列名称
    private static final string task_queue_name = "multi_queue";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        try (connection connection = factory.newconnection();
             // 通过已建立的连接创建一个的通道
             channel channel = connection.createchannel()) {
            // 声明(创建)队列
            channel.queuedeclare(task_queue_name, true, false, false, null);

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            scanner scanner = new scanner(system.in);
            while (scanner.hasnext()) {
                string message = scanner.nextline();
                // 发布消息到队列,设置消息持久化 messageproperties.persistent_text_plain
                channel.basicpublish("", task_queue_name,
                        messageproperties.persistent_text_plain,
                        message.getbytes("utf-8"));
                system.out.println(" [x] sent '" + message + "'");
            }
        }
    }

}

消息消费者代码

消费者1

package com.gdit.rabbitmq.mq_java.work_queues;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;

public class multiconsumer1 {

    private static final string task_queue_name = "multi_queue";

    public static void main(string[] argv) throws exception {
        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        final connection connection = factory.newconnection();

        // 通过已建立的连接创建一个的通道
        final channel channel = connection.createchannel();

        // 在通道上声明(创建)队列,在该通道上声明要监听的队列
        channel.queuedeclare(task_queue_name, true, false, false, null);
        system.out.println(" [*] waiting for messages. to exit press ctrl+c");

        // 控制单个消费者的处理任务挤压数,每次消费者最多同时处理 1 个任务
        channel.basicqos(1);

        // 处理接收的消息
        delivercallback delivercallback = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);

            try {
                // 打印接受的消息
                system.out.println(" [消费者 1] received '" + message + "'");
                // 消息确认
                channel.basicack(delivery.getenvelope().getdeliverytag(), false);
                thread.sleep(10000); // 休眠10s,模拟业务处理
            } catch (interruptedexception e) {
                e.printstacktrace();
                // 发生异常后,拒绝确认消息,发送失败消息,并不重新投递该消息
                channel.basicnack(delivery.getenvelope().getdeliverytag(), false, false);
            } finally {
                system.out.println(" [x] done");
                // 消息确认
                channel.basicack(delivery.getenvelope().getdeliverytag(), false);
            }
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给 delivercallback 来处理,会持续阻塞
        // 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
        channel.basicconsume(task_queue_name, false, delivercallback, consumertag -> {
        });

    }
}

消费者2

// 消费者2代码与消费者1代码基本一致,只是消费者2休眠5s模拟业务处理
...
try {
    // 打印接受的消息
    system.out.println(" [消费者 1] received '" + message + "'");
    // 消息确认
    channel.basicack(delivery.getenvelope().getdeliverytag(), false);
    thread.sleep(5000); // 休眠10s,模拟业务处理
} 
...

测试运行

测试未设置 channel.basicqos(1);

// 控制单个消费者的处理任务挤压数,每次消费者最多同时处理 1 个任务
// channel.basicqos(1);

image-20240202211839057

测试设置 channel.basicqos(1);

// 控制单个消费者的处理任务挤压数,每次消费者最多同时处理 1 个任务
channel.basicqos(1);

image-20240202212224307

消息确认机制

为了保证消息成功被消费,rabbitmq 提供了消息确认机制,当消费者接收到消息后要给一个反馈:

消息确认机制可以设置为自动确认,不过不建议自动确认。设置为 true 后消息收到立马就确认了,此时工作可能还未完成,建议设置成 false,根据实际情况手动进行确认。

// 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
channel.basicconsume(task_queue_name, false, delivercallback, consumertag -> {});

消息确认操作

// 指定确认某条消息 参数2:表示是否批量确认,即是否需要确认所有历史消息知道这条消息为止
channel.basicack(delivery.getenvelope().getdeliverytag(), false);

// 指定某条消息消费失败 参数2:表示是否批量确认 参数3:是否重新入队,可用于重试
channel.basicnack(delivery.getenvelope().getdeliverytag(), false, false);

// 指定拒绝某条消息 参数2:是否重新入队,可用于重试
channel.basicreject(delivery.getenvelope().getdeliverytag(), false); 

publish/subscribe

发布订阅模型

image-20240202221454684

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

发布订阅模型根据交换机类型不同分为三种:fanout exchange、direct exchange、topic exchange。类似于广播、单播、组播

image-20240202221956540

fanout

fanout模型会将消息交给所有绑定到交换机的队列,它与 work queue 不同的是,fanout会把同一条消息发给所有与之绑定的队列里。而 work queue 则是多个消费者从一个队列里取不同的消息。

image-20240203143658191

在广播模式下,消息发送流程是这样的:

接下来 demo 的流程如下:

  1. 创建一个消息生产者类;
  2. 声明交换机名称 fanout_exchange,消息生产者向交换机发送消息;
  3. 创建一个消息消费者类;
  4. 并创建两个队列与交换机 fanout_exchange 绑定,同时接收消息

demo 示例代码如下:

消息生产者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.fanout;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.util.scanner;

public class fanoutproducer {

    // 定义交换机名称
    private static final string exchange_name = "fanout_exchange";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        try (connection connection = factory.newconnection();
             // 通过已建立的连接创建一个的通道
             channel channel = connection.createchannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangedeclare(exchange_name, "fanout");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            scanner scanner = new scanner(system.in);
            while (scanner.hasnext()) {
                string message = scanner.nextline();
                // 发布消息到指定交换机,不指定路由键
                channel.basicpublish(exchange_name, "", null, message.getbytes("utf-8"));
                system.out.println(" [x] sent '" + message + "'");
            }
        }
    }

}

与之前不同的是消息生产者没有队列的出现了,而是出现了交换机。声明交换机并将消息发送至交换机。

消息消费者代码:

package com.gdit.rabbitmq.mq_java.publish_subscribe.fanout;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.io.ioexception;
import java.nio.charset.standardcharsets;
import java.util.concurrent.timeoutexception;

public class fanoutconsumer {

    // 定义交换机名称
    private static final string exchange_name = "fanout_exchange";

    public static void main(string[] args) throws exception {
        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        final connection connection = factory.newconnection();

        // 创建两个通道
        final channel channel1 = connection.createchannel();
        final channel channel2 = connection.createchannel();
        // 声明(创建)交换机
        channel1.exchangedeclare(exchange_name, "fanout");
        // 创建队列1
        string queuename1 = "xiaowang_queue";
        channel1.queuedeclare(queuename1, true, false, false, null); // 声明(创建队列)
        channel1.queuebind(queuename1, exchange_name, ""); // 绑定队列与交换机
        // 创建队列2
        string queuename2 = "xiaoli_queue";
        channel2.queuedeclare(queuename2, true, false, false, null); // 声明(创建队列)
        channel2.queuebind(queuename2, exchange_name, ""); // 绑定队列与交换机

        system.out.println(" [*] waiting for messages. to exit press ctrl+c");

        // 处理队列1接收的消息
        delivercallback delivercallback1 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [小王] received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel1.basicconsume(queuename1, true, delivercallback1, consumertag -> {});

        // 处理队列2接收的消息
        delivercallback delivercallback2 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [小李] received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel2.basicconsume(queuename2, true, delivercallback2, consumertag -> {});

    }

}

在消息消费者的代码中通过创建两个通道来处理两个队列,并新增了交换机,声明交换机后将两个队列都绑定到一个交换机上。

测试运行

image-20240202225842978

可以看到,fanout类型的交换机,消息生产者将消息发给 fanout_exchange 交换机后,与之绑定的两个队列都收到了消息。

direct

在fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到direct类型的exchange。

在direct模型下:

接下来 demo 的流程如下:

  1. 创建一个消息生产者类;
  2. 声明交换机名称 direct_exchange,消息生产者向交换机发送消息并指定路由键;
  3. 创建一个消息消费者类;
  4. 并创建两个队列与交换机 direct_exchange 绑定,并绑定路由键,接收消息

demo 示例代码如下:

消息生产者代码:

package com.gdit.rabbitmq.mq_java.publish_subscribe.direct;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.util.scanner;

public class directproducer {

    // 定义交换机名称
    private static final string exchange_name = "direct_exchange";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        try (connection connection = factory.newconnection();
             // 通过已建立的连接创建一个的通道
             channel channel = connection.createchannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangedeclare(exchange_name, "direct");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            scanner scanner = new scanner(system.in);
            while (scanner.hasnext()) {
                string userinput = scanner.nextline(); // 获取控制台输入
                string[] strings = userinput.split(" "); // 以空格划分为数组
                string message = strings[0]; // 第一个是消息
                string routingkey = strings[1]; // 第二个是路由键
                // 发布消息到指定交换机,根据输入指定路由键
                channel.basicpublish(exchange_name, routingkey, null, message.getbytes("utf-8"));
                system.out.println(" [x] sent '" + message + "'");
            }
        }
    }

}

消息消费者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.direct;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;

public class directconsumer {

    // 定义交换机名称
    private static final string exchange_name = "direct_exchange";

    public static void main(string[] args) throws exception {
        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        final connection connection = factory.newconnection();

        // 创建两个通道
        final channel channel1 = connection.createchannel();
        final channel channel2 = connection.createchannel();
        // 声明(创建)交换机
        channel1.exchangedeclare(exchange_name, "direct");
        // 创建队列1
        string queuename1 = "tom_queue";
        channel1.queuedeclare(queuename1, true, false, false, null); // 声明(创建队列)
        channel1.queuebind(queuename1, exchange_name, "blue"); // 绑定队列与交换机
        channel1.queuebind(queuename1, exchange_name, "yellow"); // 绑定队列与交换机
        // 创建队列2
        string queuename2 = "jerry_queue";
        channel2.queuedeclare(queuename2, true, false, false, null); // 声明(创建队列)
        channel2.queuebind(queuename2, exchange_name, "red"); // 绑定队列与交换机
        channel2.queuebind(queuename2, exchange_name, "yellow"); // 绑定队列与交换机

        system.out.println(" [*] waiting for messages. to exit press ctrl+c");

        // 处理队列1接收的消息
        delivercallback delivercallback1 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [汤姆] received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel1.basicconsume(queuename1, true, delivercallback1, consumertag -> {});

        // 处理队列2接收的消息
        delivercallback delivercallback2 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [杰瑞] received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel2.basicconsume(queuename2, true, delivercallback2, consumertag -> {});

    }

}

测试运行

image-20240203150802471

可以看到,消息生产者第一条消息指定了路由键 yellow,在消息消费者的两个队列都绑定了 yellow 路由键,两条队列都收到了。第二条消息指定了路由键 red,只有 jerry_queue 队列收到了消息。第三条消息指定了路由键 blue,只有 tom_queue 队列收到了消息。所有 direct 交换机需要指定路由键完成消息的发送与接收。

topic

image-20240203153302186

topic类型的交换机与direct相比,都是可以根据routingkey把消息路由到不同的队列。只不过topic类型exchange可以让队列在绑定routing key 的时候使用通配符!

routingkey一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#`:能够匹配`item.spu.insert` 或者 `item.spu
item.*`:能匹配`item.spu` `item.abc`

示例代码:

消息生产者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.topic;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.util.scanner;

public class topicproducer {

    // 定义交换机名称
    private static final string exchange_name = "topic_exchange";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        try (connection connection = factory.newconnection();
             // 通过已建立的连接创建一个的通道
             channel channel = connection.createchannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangedeclare(exchange_name, "topic");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            scanner scanner = new scanner(system.in);
            while (scanner.hasnext()) {
                string userinput = scanner.nextline(); // 获取控制台输入
                string[] strings = userinput.split(" "); // 以空格划分为数组
                string message = strings[0]; // 第一个是消息
                string routingkey = strings[1]; // 第二个是路由键
                // 发布消息到指定交换机,根据输入指定路由键
                channel.basicpublish(exchange_name, routingkey, null, message.getbytes("utf-8"));
                system.out.println(" [x] sent '" + message + "'");
            }
        }
    }

}

消息消费者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.topic;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;

public class topicconsumer {

    // 定义交换机名称
    private static final string exchange_name = "topic_exchange";

    public static void main(string[] args) throws exception {
        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        final connection connection = factory.newconnection();

        // 创建两个通道
        final channel channel1 = connection.createchannel();
        final channel channel2 = connection.createchannel();
        // 声明(创建)交换机
        channel1.exchangedeclare(exchange_name, "topic");
        // 创建队列1
        string queuename1 = "ironman_queue";
        channel1.queuedeclare(queuename1, true, false, false, null); // 声明(创建队列)
        channel1.queuebind(queuename1, exchange_name, "*.复联.#"); // 绑定队列与交换机
        // 创建队列2
        string queuename2 = "batman_queue";
        channel2.queuedeclare(queuename2, true, false, false, null); // 声明(创建队列)
        channel2.queuebind(queuename2, exchange_name, "#.正义.#"); // 绑定队列与交换机

        system.out.println(" [*] waiting for messages. to exit press ctrl+c");

        // 处理队列1接收的消息
        delivercallback delivercallback1 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [钢铁侠] received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel1.basicconsume(queuename1, true, delivercallback1, consumertag -> {});

        // 处理队列2接收的消息
        delivercallback delivercallback2 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [蝙蝠侠] received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel2.basicconsume(queuename2, true, delivercallback2, consumertag -> {});

    }

}

测试运行

image-20240203154325369

消息过期机制

消息过期机制(time-to-live)就是可以给每条消息指定一个有效期,一段时间内未被消费者处理就过期了。这种机制允许系统白动清理和丢弃那些长时间未被消费的消息,以避免消息队列中积累过多的过期消息,从而保持系统的效率和可靠性。

消息过期机制的应用场景:

订单超时取消:用户提交订单后 5 分钟或者 15 分钟还没进行支付操作,那么该订单就以及失效了。因此消息过期机制非常适合这种过期场景的处理。通过设置合适的过期时间,可以确保即使清理无效的消息,提高系统的效率和准确性。

官方文档中提供了消息过期机制的两种方式,官方文档:https://www.rabbitmq.com/ttl.html。一种是给队列所有消息设置过期时间,另一种是给某条具体消息设置过期时间。

给队列所有消息设置过期时间

也就是说,无论什么时候进入这个队列的消息,在设定过期时间后特定的时间点都会过期失效。是针对整个队列而言的。

image-20240203162906613

给某条具体消息设置过期时间

针对特定消息设置一个独立的过期时间。这样,在到达指定时间后,这条消息就会过期失效。

image-20240203162923001

示例代码1:给队列所有消息设置过期时间,

以 helloword 的代码为例设置过期时间。

消息生产者代码

package com.gdit.rabbitmq.mq_java.ttl;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

import java.util.hashmap;
import java.util.map;

// 定义一个名为 singleproducer 的类,用于实现消息发送功能
public class ttlproducer {

    // 定义消息队列名称
    private final static string queue_name = "ttl_queue";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置 rabbitmq 所在服务器主机名,本地就localhost
        factory.setusername("***"); // 指定连接用户名
        factory.setpassword("***"); // 指定连接密码

        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        try (connection connection = factory.newconnection();
             // 通过已建立的连接创建一个的通道
             channel channel = connection.createchannel()) {
            /*map<string, object> args = new hashmap<>();
            args.put("x-message-ttl", 60000); // 设定队列内消息过期时间 60s
            // 在通道上声明(创建)队列 参数一:队列名称 参数二:是否持久化 参数三:是否为独占队列 参数四:无人使用是否自动删除 参数五:配置参数
            channel.queuedeclare(queue_name, false, false, false, args);*/
            // 要发送的消息内容
            string message = "hello world!";
            // 使用 channel.basicpublish 方法将消息发送到指定队列 参数一:交换机名称 参数二:路由键(队列名称) 参数三:其它 参数四:消息内容
            channel.basicpublish("", queue_name, null, message.getbytes());
            system.out.println(" [x] sent '" + message + "'");
        }
    }
}

消息消费者代码

package com.gdit.rabbitmq.mq_java.ttl;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;
import java.util.hashmap;
import java.util.map;

public class ttlconsumer {

    // 定义消息队列名称
    private final static string queue_name = "ttl_queue";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        connection connection = factory.newconnection();
        // 通过已建立的连接创建一个的通道
        channel channel = connection.createchannel();
        map<string, object> args = new hashmap<>();
        args.put("x-message-ttl", 60000); // 设定队列内消息过期时间 60s
        // 在通道上声明(创建)队列 参数一:队列名称 参数二:是否持久化 参数三:是否为独占队列 参数四:无人使用是否自动删除 参数五:配置参数
        channel.queuedeclare(queue_name, false, false, false, args);
        system.out.println(" [*] waiting for messages. to exit press ctrl+c");

        // 处理接收的消息
        delivercallback delivercallback = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            system.out.println(" [x] received '" + message + "'"); // 打印消息
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给 delivercallback 来处理,会持续阻塞
        // 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
        channel.basicconsume(queue_name, false, delivercallback, consumertag -> { });

    }

}

测试运行

image-20240203164340661

可以看到,消息生产者发送了一条消息,消息消费者也收到了,我们再来看 rabbitmq 控制台。

image-20240203164435404

控制台显示 ttl_queue 队列当前消息总数1,未确认1。因为我们把自动确认关闭了,也没在代码里写确认。但会发现过了 60s 这条消息还在,并没有过期。

为什么?因为消费者确实收到了消息,只是还没确认。就像送快递时快递员打电话给你,你接到了电话只是还没签收,这个快递肯定不能丢。要是快递员打电话联系不上,就会认为这个快递没人处理取消了。我们把消息消费者代码关掉再看控制台。

image-20240203164840286

此时消息就被取消了。

示例代码2:给指定消息设置过期时间

// 要发送的消息内容
string message = "hello world!";
// 给指定消息设置过期时间
amqp.basicproperties properties = new amqp.basicproperties.builder()
    .expiration("10000") // 10s
    .build();
// 使用 channel.basicpublish 方法将消息发送到指定队列 参数一:交换机名称 参数二:路由键(队列名称) 参数三:其它 参数四:消息内容
channel.basicpublish("", queue_name, properties, message.getbytes());
system.out.println(" [x] sent '" + message + "'");

死信队列

死信(dead letter):指过期的消息、被拒收的消息、处理失败的消息还有消息队列已满后加入队列的消息的统称。

死信队列:专门处理死信的队列,实际就是一个普通的队列,但被专门用来接收并处理死信消息。

死信交换机:用于将死信消息转发到死信队列的交换机,也可以设置路由绑定来缺点消息的路由规则。其实也是一个普通的交换机,只是专门用来转发死信消息的的。

死信队列主要用于处理以下情况下的死信消息。根据官方文档https://www.rabbitmq.com/dlx.html的说明,有以下三种情况:

**使用:**根据官网的提示,死信交换机跟死信队列跟普通的一样,正常创建就行。在声明普通队列时将死信交换机作为参数填入,还可以指定死信交换机转发的路由键,意思是死信转发到指定的死信交换机。

image-20240203174619142

示例代码:

示例代码的思路:创建一个死信消息消费者代码,像普通消息消费者一样正常绑定交换机和队列,并处理消息,只是名称换了。在消息生产者代码处正常发送消息,在消息消费者处声明队列时把指定死信队列的参数的map对象,然后正常接收消息,消息消费者收到消息后把这条消息拒绝。拒接后就会把消息转发到死信消息消费者那里。

死信消息消费者代码

package com.gdit.rabbitmq.mq_java.dead_letter;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;

public class dlxconsumer {

    // 定义死信交换机名称
    private static final string dlx_exchange_name = "dlx_direct_exchange";

    public static void main(string[] args) throws exception {
        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        final connection connection = factory.newconnection();

        // 创建两个通道
        final channel channe = connection.createchannel();

        // 声明(创建)死信交换机 参数1:交换机名称 参数2:交换机类型
        channe.exchangedeclare(dlx_exchange_name, "direct");

        // 创建死信队列
        string queuename = "dlx_queue";
        channe.queuedeclare(queuename, true, false, false, null); // 声明(创建队列)
        channe.queuebind(queuename, dlx_exchange_name, "dead"); // 绑定队列与交换机

        // 处理死信队列接收的消息
        delivercallback delivercallback1 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [死信队列] received '" + message + "'");
        };
        // 开始消费队列中的消息,
        channe.basicconsume(queuename, true, delivercallback1, consumertag -> {});

    }
}

消息生产者代码

package com.gdit.rabbitmq.mq_java.dead_letter;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;
import java.util.scanner;

public class workproducer {
    // 定义工作交换机名称
    private static final string work_exchange_name = "work_direct_exchange";

    public static void main(string[] argv) throws exception {

        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        try (connection connection = factory.newconnection();
             // 通过已建立的连接创建一个的通道
             channel channel = connection.createchannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangedeclare(work_exchange_name, "direct");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            scanner scanner = new scanner(system.in);
            while (scanner.hasnext()) {
                string message = scanner.nextline(); // 获取控制台输入
                // 发布消息到指定交换机,并指定路由键
                channel.basicpublish(work_exchange_name, "work", null, message.getbytes("utf-8"));
                system.out.println(" [x] sent '" + message + "'");
            }
        }
    }

}

消息消费者代码

package com.gdit.rabbitmq.mq_java.dead_letter;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.delivercallback;

import java.nio.charset.standardcharsets;
import java.util.hashmap;
import java.util.map;

public class workconsumer {
    // 定义死信交换机名称
    private static final string dlx_exchange_name = "dlx_direct_exchange";
    // 定义工作交换机名称
    private static final string work_exchange_name = "work_direct_exchange";

    public static void main(string[] args) throws exception {
        // 创建连接工厂对象,用于创建到 rabbitmq 服务器的连接
        connectionfactory factory = new connectionfactory();
        factory.sethost("43.**"); // 设置连接属性
        factory.setusername("***");
        factory.setpassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 rabbitmq 服务器进行交互
        final connection connection = factory.newconnection();

        // 创建两个通道
        final channel channel1 = connection.createchannel();

        // 声明(创建)交换机
        channel1.exchangedeclare(work_exchange_name, "direct");

        // 创建用于指定死信队列的参数的map对象
        map<string, object> args1 = new hashmap<>();
        args1.put("x-dead-letter-exchange", dlx_exchange_name);
        args1.put("x-dead-letter-routing-key", "dead");

        // 创建队列,并将死信队列的参数map对象传入
        string queuename = "cat_queue";
        channel1.queuedeclare(queuename, true, false, false, args1); // 声明(创建队列)
        channel1.queuebind(queuename, work_exchange_name, "work"); // 绑定队列与交换机

        system.out.println(" [*] waiting for messages. to exit press ctrl+c");

        // 处理队列接收的消息
        delivercallback delivercallback1 = (consumertag, delivery) -> {
            // 将消息体转换为字符串
            string message = new string(delivery.getbody(), standardcharsets.utf_8);
            // 打印接收的消息
            system.out.println(" [猫] received '" + message + "'");
            // 拒绝消息
            channel1.basicreject(delivery.getenvelope().getdeliverytag(), false);
        };
        // 开始消费队列中的消息,
        channel1.basicconsume(queuename, false, delivercallback1, consumertag -> {});

    }

}

测试运行

image-20240203182022301

可以看到,消息生产者发送消息后,消息消费者收到了,只是在代码处我们拒绝了。然后消息就被死信交换机送到了死信队列,被死信消息消费者收到了就行处理。

spring boot整合rabbitmq的使用

spring boot 天然支持集成rabbitmq,并提供了封装好的框架。类似于jdbc和 mybatis 的关系。使用这种方式的优点是简单易用,只需要进行相应的配置即可直接使用。缺点是封装得非常好,如果你没有学习过相关文档,可能不知道该如何使用。刚刚我们讲的官方客户端,大家很快就能创建出生产者和消费者。但是,如果你使用 spring boot,如果没有看过官方文档,你知道该如何使用吗?你知道如何配置,如何使用特定的语法来发送消息吗?另外一个点是,封装的框架可能不够灵活。这里的不够灵活并不是指它不能实现某些功能,而是指只有 spring boot 官方给你封装了的功能,你才能使用。使用别人框架的一个缺点就是,如果框架没有封装某个功能,你可能无法使用,受到了框架的限制。这是一个双刃剑。所以,选择哪种方式取决于具体场景。

这里提供几个博客供大家参考:

【有道云笔记】03异步通信 https://note.youdao.com/s/php3ltnc

【rabbitmq】springboot整合rabbitmq实现延迟队列、ttl、dlx死信队列

【springboot】 整合rabbitmq 消息单独以及批量的ttl

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

关于rabbitmq(docker)部署,启动,访问,连接一系列问题最全面解决办法与思路,rabbitmq报私密连接,user can only log in via localhost,页面访问失败

08-01

Erlang、RabbitMQ下载与安装教程(windows超详细)

08-01

RabbitMQ 安装&体验

08-01

windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)_erlang和rabbitmq关系

08-01

Elasticsearch的使用教程

08-02

【RabbitMQ】使用手册

07-31

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论