it编程 > 前端脚本 > Erlang

RabbitMQ的介绍和使用

180人参与 2024-08-06 Erlang

1.同步通讯和异步通讯

        举个例子,同步通讯就像是在打电话,因此它时效性较强,可以立即得到结果,但如果你正在和一个mm打电话,其他mm找你的话,你们之间是不能进行消息的传递和响应的

        异步通讯就像是微信,你可以和多个mm进行消息的传递,对方可以立即响应或者有空了在”回“你

        同步调用问题

        微服务间基于feign的调用就属于同步方式,存在一些问题。

        1.代码耦合:如果后续需要添加业务,需要不断修改支付服务的代码

        2.性能下降,吞吐量下降:支付服务一直在等待,订单等服务的响应,cpu资源一直在占用。

        3.级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题。

2.异步调用方案

        异步调用常见的就是事件驱动模式:broker是事件代理者,一旦用户支付成功,这就是一个事件,这个事件就会被broker管理,订单服务、仓储服务、短信服务。就会找broker,这个叫做订阅事件。一旦用户支付成功之后,broker就会通知被订阅过事件的服务,支付服务完成事件发布之后,就结束了服务,返回给用户

        优点:

        解决代码解耦:添加业务时不需要再更改支付服务的代码,支付服务只需要发布事件就行。至于后面的业务支付服务可以不用考虑。

        性能提升,吞吐量提供:相比较同步服务,业务处理时间的累加,支付服务还需要等待其他服务完成并响应,通过异步的方式,支付服务发布时间之后就结束服务,无需等待其他服务响应。提升了性能,和吞吐量

        服务没有依赖关系,不用担心级联失败问题

        流量削峰: 有多个事件发布,可以囤积到broker上,订阅该事件的服务可以按自己的处理能力来稳步进行。broker起到缓冲作用

        缺点:

        依赖broker的可靠性、安全性、吞吐能力

        架构复杂了,业务没有明显的流程线,不好追踪管理

3.什么是mq

mq(messagequeue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的broker

常见的mq

rabbitmq,适用于中小型企业开发,如果对性能要求比较高的并且需要定制服务的大型企业推荐使用kafka。下面会介绍rabbitmq的使用。

4.rabbitmq概述和安装

rabbitmq概述

rabbitmq是基于erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com

rabbitmq的部署

环境:centos7,docker在线拉取的方式部署。

#拉取rabbitmq镜像

输入:docker pull rabbitmq:3-management

#设置默认用户名密码并启动容器

docker run --name rabbitmq -e rabbitmq_default_user=root -e rabbitmq_default_pass=123 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

端口15672是rabbitmq的ui界面,5672是服务端口

ui界面展示:

rabbitmq结构和概念

publisher是我们的消息(事件)发送者,consumer是消息的消费者,publisher将来会把消息发送到我们的exchange(交换机)上,交换机负责路由,并把消息投射到queue(队列),queue负责暂存消息,consumer负责从queue里面获取消息处理消息。

 

5.rabbitmq的常见消息模型

一、基本消息队列(basicqueue)

helloworld是最基本的消息队列模型,实现的话,包含三个角色

publisher:消息发布者,将消息发送到队列queue

queue:消息队列,负责接受并缓存消息

consumer:订阅队列,处理队列中的消息

官方提供的编码方式非常麻烦,下面我们介绍学习一下springamqp,它可以大大简化我们消息发送和接收api。

springamqp简介

amqp:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言的平台无关,更符合微服务中独立性的要求。

spring amqp:是基于amqp协议定义的一套api规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

利用springamqp实现基础消息队列功能

通过rabbittemplate提供的convertandsend就可以实现消息的发送

引入相关依赖

<dependency>
    <groupid>org.springframework.amqp</groupid>
    <artifactid>spring-rabbit</artifactid>
    <scope>test</scope>
</dependency>

publisher(消息发布者)的application.yml的配置

test的测试代码:

@runwith(springrunner.class)
@springboottest
class publisherapplicationtests {
    @autowired
    private rabbittemplate rabbittemplate;

    @test
    void contextloads() {
        string queuename="simple queue";
        string message="hello,spring amqp";
        rabbittemplate.convertandsend(queuename,message);
    }

}

通过rabbittemplate实现对队列消息的监听

引入依赖

<dependency>
    <groupid>org.springframework.amqp</groupid>
    <artifactid>spring-rabbit</artifactid>
    <scope>test</scope>
</dependency>

配置consumer(消息接收者),的application.yml文件

spring:
  rabbitmq:
    host: 192.168.10.8 #主机名
    port: 5672         #端口
    username: root      #用户名
    password: 123       #密码
    virtual-host: /     #虚拟主机名

监听类的代码

@component
public class simplelistener {
    @rabbitlistener(queues = "simple queue")
    public void listensimplequeue(string msg)
    {
        system.out.println("消费者接收到simple queue的消息:"+msg);
    }
}

ps:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能。

二、工作消息队列(workqueue)

工作队列的结构如下:

工作消息队列的结构,相比于基础消息队列多了个消费者,因为rabbitmq阅后即焚的特性这两个消费者属于共同工作的关系,如果有50个消息,他们两个消费者就会一人分一半,也就是一人25条,为啥会多一个消费者?这是因为如果一个消费者每次处理40个消息,但是publisher一次发布50个消息,多出来的消息会存储在queue里面,又因为queue是占用内存的假以时日,内存就会爆满,新的消息就存不进去了,多一个消费者每次就可以处理80条消息,可以有效解决这个问题。

work queue,工作队列,可以提高消费处理速度,避免队列消息堆积。但是这里有一个消息预取机制 ,消费者会提前把消息拿过来,因此消息是平局分配,并不是“能者多劳”的模式,通过设置prefetch的值来实现每次只能获取一条消息,处理完成才能接取下一个消息。实现“能者多劳”的模式。

  1. 发布订阅(publish、subscribe)

基础消息队列和工作消息队列都是一条信息只被一个消费者消费,消费完就删除,显然不能实现我们之前预想的完成支付之后,通知仓储、短信等服务。这就需要我们了解学习发布订阅模式。发布订阅模式与之前案例的区别就是允许同一消息发送给多个消费者,实现方式是加入exchange(交换机),结构如下:

publisher将消息发送给exchange(交换机),交换机把这个消息转发给队列,因此,发布者(publisher)并不需要知到转发给了那个 队列或多个队列,转发给多个队列,这种方式就能实现被多个消费者消费,那么交换机到底是发给一个还是多个呢?这是由交换机类型来决定的。常见的exchange的类型包括:

广播:fanout

路由:direct

主题:topic

注意: exchange负责消息路由,而不是储存,路由失败则消息丢失。

广播-fanout exchange

fanout exchange 会将接收到的消息路由到每一个绑定的queue。

实现思路:

1.在consumer服务中,利用代码声明队列、交换机、并将两者绑定。

在consumer服务上添加@configuration注解,并声明fanoutexchange、queue和关系对象binding,代码如下:

@configuration
public class fanoutconfig {
    //声明交换机对象
    @bean
    public fanoutexchange fanoutexchange()
    {
        return new fanoutexchange("fanout");
    }
    //声明队列1
    @bean
    public queue fanoutqueue1()
    {
        return new queue("fanoutqueue1");
    }
    //绑定队列一到交换机
    @bean
    public binding fanoutbinding1(queue fanoutqueue1,fanoutexchange fanoutexchange)
    {
        return bindingbuilder.bind(fanoutqueue1).to(fanoutexchange);
    }
    //声明队列2
    @bean
    public queue fanoutqueue2()
    {
        return new queue("fanoutqueue2");
    }
    //绑定队列二到交换机
    @bean
    public binding fanoutbinding2(queue fanoutqueue2,fanoutexchange fanoutexchange)
    {
        return bindingbuilder.bind(fanoutqueue2).to(fanoutexchange);
    }
}

2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@rabbitlistener(queues = "fanoutqueue1")
public void listensimplequeue3(string msg) throws interruptedexception {
    system.out.println("消费者222接收到fanoutqueue1的消息:"+msg);
    thread.sleep(100);
}
@rabbitlistener(queues = "fanoutqueue2")
public void listensimplequeue4(string msg) throws interruptedexception {
    system.out.println("消费者222接收到fanoutqueue2的消息:"+msg);
    thread.sleep(100);
}

3.在publisher中编写测试方法,向fanout发送消息。

@test
public void contectfonoutexchange()
{
    //交换机名称
    string exchangename="fanout";
    //消息内容
    string message="hello everyone";
    //发送消息
    rabbittemplate.convertandsend(exchangename,"",message);
}

路由-directexchange

direct exchange 会将接收到的消息根据规则路由到指定的queue,因此称为路由模式(routes)

每一个queue都与exchange设置一个bindingkey,一个队列可以绑定多个bindingkey。

发布者发送消息时,指定消息的routingkey

exchange(交换机)将消息路由到bindingkey与消息routingkey一致的队列

实现思路:

1.利用@rabbitlistenner声明exchange、queue、rountingkey

2.在consumer服务中编写两个消费者方法,分别监听queue1和queue2

在我们的监听类里面增加两个方法,用来声明交换机、队列和rountingkey

//发布订阅directexchange
@rabbitlistener(bindings = @queuebinding(
        value = @queue(name = "queue1"),
        exchange = @exchange(name = "direct",type= exchangetypes.direct),
        key = {"red","blue"}
))
public void listenerdirectqueue1(string msg)
{
    system.out.println("消费者接收到direct.queue1的消息"+msg);
}

@rabbitlistener(bindings = @queuebinding(
        value = @queue(name = "queue2"),
        exchange = @exchange(name = "direct",type= exchangetypes.direct),
        key = {"red","yellow"}
))
public void listenerdirectqueue2(string msg)
{
    system.out.println("消费者接收到direct.queue2的消息"+msg);
}

3.在publisher中编写测试方法,向exchange发送消息。

@test
public void contextdirectexchange()
{
    string exchangename="direct";
    string msgblue="hello blue";
    string msgred="hello red";
    string msgyellow="hello yellow";
    rabbittemplate.convertandsend(exchangename,"blue",msgblue);
}

@test
public void contextdirectexchange()
{
    string exchangename="direct";
    string msgblue="hello blue";
    string msgred="hello red";
    string msgyellow="hello yellow";
    rabbittemplate.convertandsend(exchangename,"red",msgred);
}

话题-topicexchange

topicexchange与directexchange类似,区别在于rountingkey必须是多个单词的列表,并且以"."分割。queue与exchange指定bindingkey可以使用通配符

#:代表0个或多个单词

*:代表一个单词

我们使用direct的时候一个队列如果绑定了很多key,会非常麻烦,通配符的引入就把key的绑定简化许多,原来绑定多个key现在只需要绑定一个key。

实现思路:

1.利用@rabbitlistenter声明exchange、queue、routingkey

2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

//topic话题
@rabbitlistener(bindings = @queuebinding(
        value = @queue(name = "topic.queue1"),
        exchange = @exchange(name = "topic",type = exchangetypes.topic),
        key ="china.#"
))
public void listennertopicqueue1(string msg)
{
    system.out.println("消费者接收到topic.queue1的消息"+msg);
}
@rabbitlistener(bindings = @queuebinding(

        value = @queue(name = "topic.queue2"),
        exchange = @exchange(name = "topic",type = exchangetypes.topic),
        key ="#.news"
))
public void listennertopicqueue2(string msg)
{
    system.out.println("消费者接收到topic.queue2的消息"+msg);
}

3.在publisher中编写测试方法,向交换机topic发送消息

@test
public void contexttopicexchange()
{
    string exchangename="topic";
    string msg="我是懒大王";
    rabbittemplate.convertandsend(exchangename,"china.news",msg);
}

6.消息转换器

在springamqp的发送方法中,接收消息的类型是object,也就是说我们可以发送任意对象类型的消息,springamqp会帮我们序列化为字节后发送。

spring的消息对象处理是由messageconcerter来处理的,而默认实现是simplemessageconverter,基于jdk的objectoutputstream完成序列化,这种序列化方式,比较浪费内存资源,如果需要修改,只需要定义一个messageconverter类型的bean即可。推荐用json方式序列化,步骤如下:

我们在publisher服务引入依赖

我们在publisher服务中声明messageconverter

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

消息队列RabbitMQ在Windows中安装与配置完全解析

08-04

RabbitMQ3.13.x之二_RabbitMQ所有端口说明及开启后台管理功能

08-04

Elasticsearch的使用教程

08-02

关于rabbitmq(docker)部署,启动,访问,连接一系列问题最全面解决办法与思路,rabbitmq报私密连接,user can only log in via localhost,页面访问失败

08-01

消息队列-RabbitMQ

08-01

Erlang、RabbitMQ下载与安装教程(windows超详细)

08-01

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论