RabbitMQ:订阅模型-消息订阅模式
创始人
2024-05-01 15:13:58
0

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

RabbitMQ 单生产单消费模型主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

文章目录

    • 一、RabbitMQ 订阅模型-消息订阅(Fanout)模式
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、消息订阅(Fanout)模式组成
        • 3、消息订阅(Fanout)模式流程
    • 二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现
        • 1、添加 Maven 依赖
        • 2、封装工具类 ConnectionUtil
        • 3、生产者实现
        • 4、消费者-1 实现
        • 5、消费者-2 实现
        • 6、消费者-3 实现
    • 三、订阅模型 三种模式区别
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、RabbitMQ 路由(direct)模式
        • 3、RabbitMQ 主题(topic)模式


一、RabbitMQ 订阅模型-消息订阅(Fanout)模式

1、RabbitMQ 消息订阅(Fanout)模式

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

image-20221201012238266

2、消息订阅(Fanout)模式组成

RabbitMQ 订阅模型-消息订阅(Fanout)模式主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

3、消息订阅(Fanout)模式流程

消息订阅(Fanout)模式流程:

  1. 消息订阅(Fanout)模式 可以有多个消费者
  2. 每个消费者有自己的 queue(队列)
  3. 每个队列都要绑定到 Exchange(交换机)
  4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给那个队列,生产者无法决定
  5. 交换机把消息发送给绑定过的所有队列
  6. 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

com.rabbitmqamqp-client5.16.0junitjunit4.13.2test...

2、封装工具类 ConnectionUtil

package com.lizhengi.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author liziheng* @version 1.0.0* @description 连接工具类封装* @date 2022-12-26 11:39 上午**/
public class ConnectionUtil {/*** 创建 MQ 连接工厂对象*/private static final ConnectionFactory CONNECTION_FACTORY;// 类加载时,重量级资源加载static {CONNECTION_FACTORY = new ConnectionFactory();//设置连接MQ主机CONNECTION_FACTORY.setHost("127.0.0.1");//设置连接MQ端口号CONNECTION_FACTORY.setPort(5672);//设置连接MQ虚拟主机CONNECTION_FACTORY.setVirtualHost("/test");//设置连接MQ虚拟主机的用户名密码CONNECTION_FACTORY.setUsername("admin");CONNECTION_FACTORY.setPassword("123456");}/*** 建立与RabbitMQ连接 工具方法** @return Connection*/public static Connection getConnection() {try {//返回连接对象return CONNECTION_FACTORY.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}/*** 关闭通道和连接 工具方法** @param channel    Channel* @param connection Connection*/public static void closeConnectionAndChanel(Channel channel, Connection connection) {try {if (channel != null) {channel.close();}if (connection != null) {connection.close();}} catch (Exception e) {e.printStackTrace();}}
}

3、生产者实现

package com.lizhengi.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 生产者* @date 2022-12-26 11:44 上午**/
public class Producer {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();/*  为通道声明交换机*  参数1:交换机名称;参数2:交换机类型*/channel.exchangeDeclare("logs", "fanout");//发送消息for (int i = 0; i < 100; i++) {channel.basicPublish("logs", "", null, ("fanout type message" + i).getBytes());}//释放资源ConnectionUtil.closeConnectionAndChanel(channel, connection);}
}

4、消费者-1 实现

package com.lizhengi.fanout;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 消费者* @date 2022-12-26 11:45 上午**/
public class Customer1 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs", "fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义channel.queueBind(queueName, "logs", "");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者1:" + new String(body));}});}
}

5、消费者-2 实现

package com.lizhengi.fanout;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 消费者* @date 2022-12-26 11:45 上午**/
public class Customer2 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs", "fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义channel.queueBind(queueName, "logs", "");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者2:" + new String(body));}});}
}

6、消费者-3 实现

package com.lizhengi.fanout;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-消息订阅(Fanout)模式 消费者* @date 2022-12-26 11:45 上午**/
public class Customer3 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs", "fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义channel.queueBind(queueName, "logs", "");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者3:" + new String(body));}});}
}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。

相关内容

热门资讯

安卓系统自带的网页,功能与特色... 你有没有发现,每次打开安卓手机,那熟悉的系统界面里总有一个默默无闻的小家伙——安卓系统自带的网页浏览...
美咖云系统安卓版,开启智能生活... 你有没有发现,最近手机上多了一个叫“美咖云系统安卓版”的小家伙?它就像一个魔法师,轻轻一点,就能让你...
安卓系统推荐最好的手机,盘点性... 你有没有想过,拥有一部性能卓越的手机,就像是拥有了移动的宝藏库?在这个信息爆炸的时代,一部好手机不仅...
安卓11系统能精简吗,释放潜能 你有没有发现,随着手机越来越智能,系统也越来越庞大?安卓11系统,这个最新的操作系统,是不是也让你觉...
安卓自动重启系统软件,揭秘原因... 手机突然自动重启,是不是感觉整个人都不好了?别急,今天就来和你聊聊这个让人头疼的安卓自动重启系统软件...
苹果手机x刷安卓系统,探索安卓... 你有没有想过,你的苹果手机X竟然也能刷上安卓系统?是的,你没听错,就是那个一直以来都和我们苹果手机X...
安卓系统智商低吗,智商低下的真... 你有没有想过,为什么安卓系统的智商总被调侃得好像有点低呢?是不是觉得它总是慢吞吞的,有时候还犯点小错...
安卓系统手机联系人,揭秘你的社... 你有没有发现,手机里的联系人列表就像是一个小小的社交圈呢?里面藏着我们的亲朋好友、工作伙伴,甚至还有...
安卓系统免费铃声下载,打造个性... 手机里那首老掉牙的铃声是不是让你觉得有点out了呢?别急,今天就来给你支个招,让你轻松给安卓手机换上...
安卓系统用哪个桌面好,打造个性... 你有没有发现,手机桌面可是我们每天都要面对的“脸面”呢?换一个好看的桌面,心情都能跟着好起来。那么,...
虚拟大师是安卓10系统,功能与... 你知道吗?最近在手机圈里,有个新玩意儿引起了不小的轰动,那就是虚拟大师!而且,更让人惊喜的是,这个虚...
安卓系统与苹果优缺点,系统优缺... 说到手机操作系统,安卓和苹果绝对是两大巨头,它们各有各的特色,就像两道不同的美味佳肴,让人难以抉择。...
安卓win双系统主板,融合与创... 你有没有想过,一台电脑如果既能流畅运行安卓系统,又能轻松驾驭Windows系统,那该有多爽啊?没错,...
安卓系统可精简软件,轻松提升手... 你有没有发现,手机里的安卓系统越来越庞大,软件也越装越多,有时候感觉手机就像个“大肚子”,不仅运行速...
安卓系统基于linux的代码,... 你有没有想过,那个陪伴你每天刷抖音、玩游戏、办公的安卓系统,其实背后有着一套复杂的基于Linux的代...
苹果和安卓的拍照系统,谁更胜一... 你有没有发现,现在手机拍照已经成为我们生活中不可或缺的一部分呢?无论是记录生活的点滴,还是捕捉美丽的...
苹果和安卓系统不同吗,系统差异... 你有没有想过,为什么你的手机里装的是苹果的iOS系统,而朋友的手机却是安卓系统呢?这两种系统,看似都...
安卓系统有多少级,揭秘其多级架... 你有没有想过,那个陪伴我们日常生活的安卓系统,它其实有着丰富的层级结构呢?没错,就是那个让我们的手机...
华为鸿蒙系统与安卓的,技术融合... 你知道吗?最近科技圈可是炸开了锅,华为鸿蒙系统与安卓的较量成为了大家热议的话题。这不,今天我就来给你...
什么安卓手机是苹果系统,搭载苹... 你有没有想过,为什么有些人宁愿花大价钱买苹果手机,而有些人却对安卓手机情有独钟呢?其实,这个问题背后...