it编程 > 前端脚本 > Erlang

RabbitMQ详解与实战(绝对足够惊喜)

174人参与 2024-07-28 Erlang

在这里插入图片描述

什么是rabbitmq

rabbitmq与kafka的区别

rabbitmq与kafka的各自适用场景

rabbitmq的安装

erlang下载安装

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm

rabbitmq下载安装

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm

rabbotmq的启动

systemctl start rabbitmq-server  #启动
systemctl enable rabbitmq-server #设置开机自启
systemctl status rabbitmq-server #查看状态

rabbitmq web界面管理

默认情况下,是没有安装web端的客户端插件,需要安装插件才可以生效。执行命令:

rabbitmq-plugins enable rabbitmq_management

然后需要重启服务

systemctl restart rabbitmq-server

由于web管理界面访问端口为15672,所以防火墙需要放行该端口

对于 centos 7 上的防火墙,要放行端口 15672(默认 rabbitmq 管理界面的端口),可以按照以下步骤进行操作:

  1. 登录到 centos 7 的服务器上,以具有管理员权限的用户身份。

  2. 检查防火墙状态,确认是否已安装 firewalld 防火墙:

    systemctl status firewalld
    
  3. 如果防火墙处于开启状态,可以直接跳转到第 6 步。如果防火墙停止运行,则需要启动,请继续执行以下步骤。

  4. 启动 firewalld 服务:

    systemctl start firewalld
    
  5. 设置 firewalld 开机自启:

    systemctl enable firewalld
    
  6. 添加端口规则,允许在防火墙上开放 15672 端口和5672端口:

    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    
  7. 重新加载防火墙配置,使更改生效:

    firewall-cmd --reload
    

现在,centos 7 的防火墙应该已经放行了 15672 端口和5672 端口,允许对 rabbitmq 管理界面进行访问。请注意,为了安全起见,建议仅在需要时才开放必要的端口,并在完成使用后关闭不必要的端口。

web界面访问管理

rabbitmq 默认的管理界面账号和密码通常是:

这对默认凭据在 rabbitmq 安装后可用于访问管理界面(只限于本地)。然而,出于安全考虑,强烈建议在生产环境中修改默认凭据或创建新的管理员帐户,并使用更强大的密码来加强安全性。

要在 rabbitmq 中添加新用户,您需要使用 rabbitmq 提供的命令行工具或者管理界面进行操作。下面是两种方法的简要说明:

方法一:使用 rabbitmq 命令行工具

  1. 打开命令行终端。

  2. 导航到 rabbitmq 安装目录的 sbin 文件夹(例如,在 linux 上可能是 /usr/lib/rabbitmq/sbin)。

  3. 运行以下命令来添加新用户:

    rabbitmqctl add_user <username> <password>
    

    <username> 替换为要创建的用户名,将 <password> 替换为所需的密码。

  4. 运行以下命令来赋予用户管理员权限:

    rabbitmqctl set_user_tags <username> administrator
    

    <username> 替换为刚创建的用户名。

方法二:使用 rabbitmq 管理界面

  1. 打开您的浏览器并访问 rabbitmq 管理界面。默认地址为 http://localhost:15672
  2. 使用默认的管理员账号和密码(通常是 guest/guest)登录到管理界面。
  3. 在管理界面上导航到 “admin” -> “users” 选项卡。
  4. 单击 “add a user” 按钮。
  5. 输入用户名和密码,并选择 “tag” 为 “administrator”。
  6. 单击 “add user” 按钮以创建新用户。

无论您使用哪种方法,确保为新用户选择一个强大的密码,并在生产环境中遵循安全最佳实践。

访问方式为 主机ip地址配合端口号,例如 192.168.18.14:15672

image-20230616170104156

image-20230616170123959

注意,此处需要进行一次授权,否则在代码中连接rabbitmq会失败

image-20230616194600354

image-20230616194636865

springboot+rabbitmq实战

引入依赖

      <!--rabbitmq-->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-amqp</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

修改配置文件

server:
    port: 8080
spring:
    application:
        name: rabbitmqexample
    rabbitmq:
        host: 192.168.18.14
        port: 5672
        username: admin
        password: admin

rabbitmq五种消息模型

rabbitmq简单模式(生产者消费者模式)

配置类
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

/**
 * @description mq配置类
 * @author it小辉同学
 * @date 2023/06/16
 */
@configuration
public class rabbitmqconfig {
    /**
     * @return {@link queue }
     * @description 设置队列
     * @author it小辉同学
     * @date 2023/06/16
     */
    @bean
    public queue queue(){
        return new queue("simple.hello");
    }
}

生产者
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

/**
 * @description 生产者(发送消息)
 * @author it小辉同学
 * @date 2023/06/16
 */
@service

public class messagesender {
    @autowired

    private rabbittemplate rabbittemplate;

    /**
     * @param message 消息
     * @description 发送消息
     * @author it小辉同学
     * @date 2023/06/16
     */
    public void sendmessage(string message) {
        system.out.println("发送祝福:" + message);
        rabbittemplate.convertandsend("simple.hello", message);

    }

}


消费者
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

/**
 * @description 消息接收器
 * @author it小辉同学
 * @date 2023/06/16
 */
@service
public class messagereceiver {
    @autowired
    private rabbittemplate rabbittemplate;

    /**
     * @param message 消息
     * @description 处理消息
     * @author it小辉同学
     * @date 2023/06/16
     */
    @rabbitlistener(queues = "simple.hello")
    public void handlemessage(string message) {
        system.out.println("我收到了你的祝福: " + message);
    }

}


测试
import com.xiaohui.service.messagesender;
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;

@springboottest
public class mqtestdemo {
    @autowired
    private messagesender messagesender;
    @test
    public void testdemo1(){
            messagesender.sendmessage("我想跟你说:希望你开心快乐!!!");
    }
}

注意:不可以省略@springboottest,否则监听不到mq配置

image-20230616213556350

image-20230616213816564

rabbitmq工作队列模式(广播模式)

配置类
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.fanoutexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

/**
 * @description mq配置类
 * @author it小辉同学
 * @date 2023/06/16
 */
@configuration
public class rabbitmqconfig {
    //队列1
    private static final string queue01 = "queue01";
    //队列2
    private static final string queue02 = "queue02";
    //交换机
    private static final string exchange_name = "fanout_exchange";

    @bean
    public queue queue1() {
        return new queue(queue01);
    }

    @bean
    public queue queue2() {
        return new queue(queue02);
    }

    @bean
    public fanoutexchange fanoutexchange() {
        return new fanoutexchange(exchange_name);
    }

    @bean
    public binding binding01() {
        return bindingbuilder.bind(queue1()).to(fanoutexchange());
    }

    @bean
    public binding binding02() {
        return bindingbuilder.bind(queue2()).to(fanoutexchange());
    }
}

生产者

import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

/**
 * @description 生产者(发送消息)
 * @author it小辉同学
 * @date 2023/06/16
 */
@service

public class messagesender {
    @autowired

    private rabbittemplate rabbittemplate;

    /**
     * @param message 消息
     * @description 发送消息
     * @author it小辉同学
     * @date 2023/06/16
     */
    public void sendmessage(string message) {
        system.out.println( message);
        rabbittemplate.convertandsend("fanout_exchange","", message);

    }

}


消费者
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

/**
 * @description 消息接收器
 * @author it小辉同学
 * @date 2023/06/16
 */
@service
public class messagereceiver {
    @autowired
    private rabbittemplate rabbittemplate;

    /**
     * @param message 消息
     * @description 消费者01
     * @author it小辉同学
     * @date 2023/06/16
     */
    @rabbitlistener(queues = "queue01")
    public void receiver01(string message) {
        system.out.println("队列01:奔赴山海," + message);
    }

    /**
     * @param message 消息
     * @description 消费者012
     * @author it小辉同学
     * @date 2023/06/16
     */
    @rabbitlistener(queues = "queue02")
    public void receiver02(string message) {
        system.out.println("队列02:向阳而生," + message);
    }

}


测试
@springboottest
public class mqtestdemo {
    @autowired
    private messagesender messagesender;
    @test
    public void testdemo1(){
            messagesender.sendmessage("相信梦想。。。。。。");
    }
}

image-20230616222906407

rabbitmq发布/订阅模式

配置类


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

/**
 * @description mq配置类
 * @author it小辉同学
 * @date 2023/06/16
 */
@configuration
public class rabbitmqconfig {
    //队列01
    private static final string queue01= "queue01";
    //队列02
    private static final string queue02= "queue02";
    //交换机
    private static final string exchange_name = "direct_exchange";
    //路由键01
    private static final string routingkey01 = "queue_route01";
    //路由键02
    private static final string routingkey02 = "queue_route02";

    @bean
    public queue queue01(){
        return new queue(queue01);
    }
    @bean
    public queue queue02(){
        return new queue(queue02);
    }
    @bean
    public directexchange directexchange(){
        return new directexchange(exchange_name);
    }

    @bean
    public binding binding1(){
        //将列队01绑定到交换机上为给他设置路由键
        return bindingbuilder.bind(queue01()).to(directexchange()).with(routingkey01);
    }
    @bean
    public binding binding2(){
        //将列队02绑定到交换机上为给他设置路由键
        return bindingbuilder.bind(queue02()).to(directexchange()).with(routingkey02);
    }
}

生产者

import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

/**
 * @description 生产者(发送消息)
 * @author it小辉同学
 * @date 2023/06/16
 */
@service

public class messagesender {
    @autowired

    private rabbittemplate rabbittemplate;

    /**
     * @param message 消息
     * @description 发送消息
     * @author it小辉同学
     * @date 2023/06/16
     */
    public void sendmessage(string message) {
        system.out.println( message);
        rabbittemplate.convertandsend("direct_exchange","queue_route01", message);

    }

}


消费者

import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

/**
 * @description 消息接收器
 * @author it小辉同学
 * @date 2023/06/16
 */
@service
public class messagereceiver {
    @autowired
    private rabbittemplate rabbittemplate;

    /**
     * @param message 消息
     * @description 消费者01
     * @author it小辉同学
     * @date 2023/06/16
     */
    @rabbitlistener(queues = "queue01")
    public void receiver01(string message) {
        system.out.println("队列01——路由01:奔赴山海," + message);
    }

    /**
     * @param message 消息
     * @description 消费者012
     * @author it小辉同学
     * @date 2023/06/16
     */
    @rabbitlistener(queues = "queue02")
    public void receiver02(string message) {
        system.out.println("队列02——路由02:向阳而生," + message);
    }

}


测试
import com.xiaohui.service.messagesender;
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;

@springboottest
public class mqtestdemo {
    @autowired
    private messagesender messagesender;
    @test
    public void testdemo1(){
            messagesender.sendmessage("相信梦想。。。。。。");
    }
}

image-20230616224415508

rabbitmq路由模式

配置类

接着,定义一个交换机和两个队列:

@configuration
public class rabbitconfig {

    public static final string exchange_name = "direct_exchange";

    public static final string queue_name_1 = "queue_1";
    public static final string queue_name_2 = "queue_2";

    public static final string routing_key_1 = "key_1";
    public static final string routing_key_2 = "key_2";

    @bean
    public directexchange directexchange() {
        return new directexchange(exchange_name);
    }

    @bean
    public queue queue1() {
        return new queue(queue_name_1);
    }

    @bean
    public queue queue2() {
        return new queue(queue_name_2);
    }

    @bean
    public binding binding1() {
        return bindingbuilder.bind(queue1()).to(directexchange()).with(routing_key_1);
    }

    @bean
    public binding binding2() {
        return bindingbuilder.bind(queue2()).to(directexchange()).with(routing_key_2);
    }
}
生产者
@service
public class messageproducer {

    @autowired
    private amqptemplate amqptemplate;

    public void send(string message, string routingkey) {
        amqptemplate.convertandsend(rabbitconfig.exchange_name, routingkey, message);
        system.out.println("sent message: " + message + ", routing key: " + routingkey);
    }
}
消费者
@service
public class messageconsumer {

    @rabbitlistener(queues = rabbitconfig.queue_name_1)
    public void receivefromqueue1(string message) {
        system.out.println("received message from queue 1: " + message);
    }

    @rabbitlistener(queues = rabbitconfig.queue_name_2)
    public void receivefromqueue2(string message) {
        system.out.println("received message from queue 2: " + message);
    }
}
测试
@service
public class testservice {

    @autowired
    private messageproducer producer;

    @postconstruct
    public void test() {
        producer.send("hello, queue 1", rabbitconfig.routing_key_1);
        producer.send("hello, queue 2", rabbitconfig.routing_key_2);
    }
}

运行程序后,可以看到控制台输出:

sent message: hello, queue 1, routing key: key_1
sent message: hello, queue 2, routing key: key_2
received message from queue 1: hello, queue 1
received message from queue 2: hello, queue 2

说明消息成功发送到了指定的队列。

rabbitmq主题模式

配置类
@configuration
public class rabbitconfig {

    public static final string exchange_name = "topic_exchange";

    public static final string queue_name_1 = "queue_1";
    public static final string queue_name_2 = "queue_2";
    public static final string queue_name_3 = "queue_3";

    public static final string routing_key_1 = "key_1.*";
    public static final string routing_key_2 = "key_2.*";
    public static final string routing_key_3 = "*.key_3";

    @bean
    public topicexchange topicexchange() {
        return new topicexchange(exchange_name);
    }

    @bean
    public queue queue1() {
        return new queue(queue_name_1);
    }

    @bean
    public queue queue2() {
        return new queue(queue_name_2);
    }

    @bean
    public queue queue3() {
        return new queue(queue_name_3);
    }

    @bean
    public binding binding1() {
        return bindingbuilder.bind(queue1()).to(topicexchange()).with(routing_key_1);
    }

    @bean
    public binding binding2() {
        return bindingbuilder.bind(queue2()).to(topicexchange()).with(routing_key_2);
    }

    @bean
    public binding binding3() {
        return bindingbuilder.bind(queue3()).to(topicexchange()).with(routing_key_3);
    }
}
生产者

在生产者端,发送消息到交换机,这里使用了三种不同的路由键:

@service
public class messageproducer {

    @autowired
    private amqptemplate amqptemplate;

    public void send(string message, string routingkey) {
        amqptemplate.convertandsend(rabbitconfig.exchange_name, routingkey, message);
        system.out.println("sent message: " + message + ", routing key: " + routingkey);
    }
}

消费者

在消费者端,使用通配符连接到交换机,并指定一个消费者:

@service
public class messageconsumer {

    @rabbitlistener(queues = rabbitconfig.queue_name_1)
    public void receivefromqueue1(string message) {
        system.out.println("received message from queue 1: " + message);
    }

    @rabbitlistener(queues = rabbitconfig.queue_name_2)
    public void receivefromqueue2(string message) {
        system.out.println("received message from queue 2: " + message);
    }

    @rabbitlistener(queues = rabbitconfig.queue_name_3)
    public void receivefromqueue3(string message) {
        system.out.println("received message from queue 3: " + message);
    }
}
测试

现在,我们来测试一下,这里发送了三条消息,分别匹配了不同的绑定键:

@service
public class testservice {

    @autowired
    private messageproducer producer;

    @postconstruct
    public void test() {
        producer.send("hello, queue 1", rabbitconfig.routing_key_1);
        producer.send("hello, queue 2", rabbitconfig.routing_key_2);
        producer.send("hello, queue 3", rabbitconfig.routing_key_3);
    }
}

运行程序后,可以看到控制台输出:

sent message: hello, queue 1, routing key: key_1.*
sent message: hello, queue 2, routing key: key_2.*
sent message: hello, queue 3, routing key: *.key_3
received message from queue 1: hello, queue 1
received message from queue 2: hello, queue 2
received message from queue 3: hello, queue 3

我的天哪,太不容易了,坑倒是不多,文字东西太多了,还不能违背初心去抄袭,花费时间很长!如果您看到这里,祝贺你,我们一起成长了!感谢相遇,再会有期!!!

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

RabbitMQ 消息丢失的场景,如何保证消息不丢失?

07-28

25道RabbitMQ面试题含答案(很全)

07-28

【RabbitMQ】【Docker】基于docker-compose构建rabbitmq容器

07-28

rabbit启动:Error when reading /var/lib/rabbitmq/.erlang.cookie: eacces auth.erl

07-28

RabbitMQ 安装分享

07-28

Ruby langchainrb gem and custom configuration for the model setup

07-28

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论