RabbitMQ
一、消息队列
MQ的相关概念
1. 什么是MQ
MQ(message queue)是消息队列,本质是个队列,遵循FIFO先入先出原则,只不过队列中存放的内容是message而已。MQ是一种跨进程的通信机制,用于微服务节点之间的通信,所以在分布式系统中起着重要作用,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,微服务之间的消息传递只需要依赖MQ,不需要依赖其他服务。
2. 为什么要用MQ
流量削峰
举例,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单绰绰有余,正常时段我们下单一秒后就能返回结果,但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。这种情况,使用消息队列做缓冲,把一秒内下的订单分散成一段时间来处理,允许用户下单,将用户的订单信息(订单请求)通过消息队列进行排队。这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复,在这几分钟的时间里,物流系统要处理的订单信息被缓存在消息队列中,用户的下单操作可以正常完成,当物流系统恢复后,继续处理订单信息即可。用户感受不到物流系统的故障,提升了系统的可用性。
异步处理
服务间调用是异步的(一个微服务在分布式系统中既可以是提供者,也可以是消费者或者说调用者),例如A调用B,B需要花费很长时间执行,A需要知道B什么时候执行完。使用消息总线,可以很方便地解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送消息给MQ,A服务会通过MQ获取到B已经执行完业务的消息。通过消息队列,A服务能及时地得到异步处理成功的消息。
3. MQ的分类
简要介绍,具体知识需单独学习、查阅。
ActiveMQ
优点:单机吞吐量万级,时效性ms级,可用性高。
kafka
优点:性能高,吞吐量高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据。消费者采用pull方式消费消息,消息有序,通过控制能够保证所有消息被消费且只消费一次。在日志领域比较成熟,被多家公司和多个开源项目使用。在大数据领域的实时计算以及日志采集被大规模使用。
适用于产生大量数据的互联网服务的数据收集业务。如果是开发日志采集功能,肯定首选kafka。
RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰。
RabbitMQ
性能好,时效性微秒级,社区活跃度较高,管理界面方便。如果数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ。
RabbitMQ
RabbitMQ的概念
RabbitMQ是一个消息中间件,负责接收并转发消息。可以把它当作一个快递站点,当你要发送一个包裹时,把包裹放到快递站,快递员最终会把快递送到收件人那里。RabbitMQ负责接收、存储和转发消息数据。
四大核心概念
生产者
产生消息,发送消息给消息队列的程序。
交换机
一个交换机可以绑定多个队列。
交换机是RabbitMQ中一个非常重要的部件,一方面它接收来自生产者的消息,一方面它将消息推送至队列中,交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定。
队列
队列是RabbitMQ内部使用的一种数据结构,消息存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据,这是我们使用队列的方式。
消费者
消费与接收具有相似的含义,消费者大多时候是一个等待接收消息的程序。请注意生产者、消费者和消息中间件很多时候并不在同一机器上,同一个应用程序既可以是生产者也可以是消费者。
名词介绍
Broker:接收和分发消息的应用。
包含两部分:交换机Exchange和队列Queue
Connection:producer和consumer与Broker之间的TCP连接。
Channel:
每一个生产者与MQ之间会建立一个连接Connection,每一个Connection里会有多个信道Channel,信道就是发送消息的通道。
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个线程创建单独的channel进行通讯,channel之间是完全隔离的。
Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
可以理解为连接池。
Exchange:
message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe),fanout(multicast)
Queue:
消息最终被送到这里等待consumer取走
Binding:
exchange和queue之间的虚拟连接,binding中可以包含routing key,binding信息被保存到exchange中的查询表中,用于message的分发依据。
安装
(30条消息) Linux安装RabbitMQ详细教程_☞精◈彩◈猿◈笔◈记☜的博客-CSDN博客_linux安装rabbitmq教程
一、环境准备
1、RabbitMQ版本 和 Erlang 版本兼容性关系 https://www.rabbitmq.com/which-erlang.html 2、官方安装包下载地址
【erlang下载地址】:https://hub.fastgit.org/rabbitmq/erlang-rpm/releases
【socat下载地址】:http://www.rpmfind.net/linux/rpm2html/search.php?query=socat(x86-64)
【rabbitmq下载地址】:https://github.com/rabbitmq/rabbitmq-server/releases
3、安装包中说明,请下载对应的安装包 el6:CentOS 6.x 的下载 el7:CentOS 7.x 的下载 el8:CentOS 8.x 的下载
二、安装操作步骤 1、安装C++依赖环境
*****************命令如下******************
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
2、准备安装包 【我这里选择目前最新版本,具体根据自己需要选择对应的版本】
erlang-23.3.4.8-1.el7.x86_64.rpm
rabbitmq-server-3.9.11-1.el7.noarch.rpm
socat-1.7.3.2-2.el7.x86_64.rpm
3、在【/opt】路径下,创建【rabbitmq】文件夹
******************命令如下******************
cd /opt
mkdir rabbitmq
cd rabbitmq
4、将安装包上传到【rabbitmq】文件夹下
5、安装Erlang
******************命令如下******************
## 安装命令
rpm -ivh erlang-23.3.4.8-1.el7.x86_64.rpm
6、检查Erlang是否安装成功
******************命令如下******************
## 安装成功,按两次ctrl+c退出命令模式
erl -v
7、安装socat
******************命令如下******************
## 安装命令
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
或者
yum install socat -y
8、安装rabbitmq
******************命令如下******************
## 安装命令
rpm -ivh rabbitmq-server-3.9.11-1.el7.noarch.rpm
## 检查是否安装成功命令
rpm -qa|grep rabbitmq
9、开启管理界面
******************命令如下******************
## 开启管理界面命令
rabbitmq-plugins enable rabbitmq_management
10、添加配置文件,解决只能localhost访问的问题
******************命令如下******************
## 进入【/etc/rabbitmq】文件夹下
cd /etc/rabbitmq
## 编辑【rabbitmq.config】文件
vim rabbitmq.config
11、在rabbitmq.config文件中写入下面的命令,不要忘了后面的点
******************命令如下******************
[{rabbit,[{loopback_users,[]}]}].
12、启动rabbitmq
******************命令如下******************
## 启动rabbitmq命令:
systemctl start rabbitmq-server
## 查看启动状态命令:
systemctl status rabbitmq-server
13、停止rabbitmq
******************命令如下******************
## 停止rabbitmq命令:
systemctl stop rabbitmq-server
## 查看启动状态命令:
systemctl status rabbitmq-server
14、重启rabbitmq
******************命令如下******************
## 重启rabbitmq命令:
systemctl restart rabbitmq-server
## 查看启动状态命令:
systemctl status rabbitmq-server
15、开放端口
******************命令如下******************
## 开放5672端口命令
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
## 开放15672端口命令
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
16、登录管理界面 URL地址:http://192.168.109.101:15672/ 默认账号:guest 默认密码:guest
三、卸载操作步骤 1、卸载rabbitmq相关文件
1.1、卸载前先停止rabbitmq服务
systemctl stop rabbitmq-server
1.2、查看rabbitmq安装的相关列表
yum list | grep rabbitmq
1.3、卸载rabbitmq-server.noarch
yum -y remove rabbitmq-server.noarch
2、卸载erlang
2.1、查看erlang安装的相关列表
yum list | grep erlang
2.2、卸载erlang已安装的相关内容
yum -y remove erlang-*
3、删除有关的所有文件
rm -rf /usr/lib64/erlang rm -rf /var/lib/rabbitmq rm -rf /usr/local/erlang rm -rf /usr/local/rabbitmq
二、核心部分
简单模式--Hello World
写两个程序,分别是生产者程序和消费者程序,生产者发送单个消息,消费者接收消息并打印消息(在实际开发中,便是处理消息)。
新建maven工程
修改pom文件
<!--指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
生产者代码
// 生产者:发消息,发给消息队列 public class Producer { // 实际开发中,此字符串常量应写在常量类,或者根据需求写在配置文件中 public static final String QUEUE_NAME = "hello"; public static void main(String[] args) { // 以下部分在实际开发中应写在工具类,这是连接RabbitMQ的工具类中的方法 // 连接RabbitMQ // 1. 创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2. 设置工厂ip,连接RabbitMQ队列。 connectionFactory.setHost("42.192.182.4"); // 3. 设置用户名和密码 connectionFactory.setUsername("admin"); connectionFactory.setPassword("123"); try { // 4. 创建连接 Connection connection = connectionFactory.newConnection(); // 5. 获取信道 Channel channel = connection.createChannel(); // 6. 生成一个队列 // 参数: // (1) 是队列名称 // (2) durable:是队列里面的消息是否持久化(即存储在磁盘中),默认情况消息存储在内存中即不持久化 // (3) exclusive:排他,是否只供一个消费者进行消费,是否进行消息共享,如果是true就是允许多个消费者消费,false就是消息不共享,只有一个消费者能消费。 // (4) autoDelete:表示是否自动删除,最后一个消费者断开连接之后,该队列是否自动删除,如果是true表示自动删除,反之亦然。 // 其他参数.... channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 7.发送消息 String message = "hello world"; // (1)交换机,目前不写 // (2)路由的key值,本次是队列名称 // (3)其他参数信息,null // (4)发送的消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送完毕"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
消费者代码
//消费者,接收消息 public class Consumer { // 实际开发中,此字符串常量应写在常量类,或者根据需求写在配置文件中 public static final String QUEUE_NAME = "hello2"; public static void main(String[] args) { // 以下部分在实际开发中应写在工具类,这是连接RabbitMQ的工具类中的方法 // 连接RabbitMQ // 1. 创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2. 设置工厂ip,连接RabbitMQ队列。 connectionFactory.setHost("42.192.182.4"); // 3. 设置用户名和密码 connectionFactory.setUsername("admin"); connectionFactory.setPassword("123"); try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //消费者接收(消费)消息 // 1.队列名; // 2.自动应答。true代表自动应答,false代表手动应答。 // 3.DeliverCallback 消费接收后回调(成功之后的回调),处理消息的回调函数 // 4.CancelCallback 取消接收消息的回调 channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> System.out.println(new String(message.getBody())), s -> System.out.println("消费消息被中断")); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
工作模式--Work queues
概述
工作队列的主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行。
把任务封装为消息并发送到队列。
当有多个工作线程时,这些工作线程将一起处理这些任务。
在工作模式下,消费者是工作线程,是处理任务、执行任务的线程。
注意:队列中的消息,只能被处理一次,不可以被处理多次,被工作线程A处理了,就不能被工作线程B处理。消息不能被重复消费。
DEMO
启动两个工作线程(处理消息、处理任务的线程),这两个线程就是两个消费者,任务被封装成了消息。
启动一个生产者线程。
两个工作线程是轮询地执行消息队列中的任务。
写工具类,将重复的连接代码封装在工具类中。
public class RabbitMqUtils { public static Channel getChannel() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("42.192.182.4"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
写两个工作线程(消费者)
// 这是一个工作线程,相当于之前的消费者。 public class Worker01 { //队列名称 public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //消息接收 System.out.println("C1等待接收消息...."); channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> System.out.println("接收到的消息" + ":" + new String(message.getBody())), s -> System.out.println("消费消息被中断,执行回调逻辑")); } }
写生产者程序,即任务程序,任务会发送到队列
public class Task01 { public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 从控制台输入消息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息完成:" + message); } } }
消息应答
概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。
简单描述就是消费者处理完成之后,告诉rabbitmq消息已经处理完成。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡, 因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
自动应答相当于消费者接收到消息就认为消息传送成功,就认为消息可以删除。其实消费者接收到消息之后,还要对消息进行处理,这才是目的。
手动应答的方法
Channel.basicAck--用于肯定确认
RabbitMQ知道该消息处理成功,可以将其丢弃
Channel.basicNack--用于否定确认
Channel.basicReject--用于否定确认
与Channel.basicNack相比少了一个参数multiple(为是否批量处理的参数),不处理该消息了直接拒绝,可以将其丢弃。
批量应答
multiple参数:
true代表批量应答channel上未应答的消息
比如说channel上有传送tag的消息5、6、7、8,当前tag是8,那么此时5-8的这些还未应答的消息都会被应答。
false
只会应答tag=8的消息,5、6、7这三个消息依然不会被确认收到消息应答。
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认(应答),RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
手动应答代码
生产者
public class Task { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); //从控制台中输入信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息: " + message); } } }
消费者
worker02
public class Worker02 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //消息接收 System.out.println("C1等待接收消息.处理时间较短..."); boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, new DeliverCallback() { //这是接收消息成功的回调函数 @Override public void handle(String s, Delivery delivery) throws IOException { //沉睡1s SleepUtils.sleep(1); String message = new String(delivery.getBody(), "UTF-8"); System.out.println("C1接收到的消息:" + message); //第一个参数表示应答的消息是哪个消息,就是当前的消息tag //第二个参数表示是否批量应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }, s -> System.out.println("消费消息被中断,执行回调逻辑")); } }
public class Worker03 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //消息接收 System.out.println("C2等待接收消息.处理时间较长..."); boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, new DeliverCallback() { //这是接收消息成功的回调函数 @Override public void handle(String s, Delivery delivery) throws IOException { //沉睡30s SleepUtils.sleep(30); String message = new String(delivery.getBody(), "UTF-8"); System.out.println("C2接收到的消息:" + message); //第一个参数表示应答的消息是哪个消息,就是当前的消息tag //第二个参数表示是否批量应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }, s -> System.out.println("消费消息被中断,执行回调逻辑")); } }
生产者先生产任务aa,被worker02消费并处理
接着生产任务bb,被worker03消费并处理,但是在处理消息的回调函数DeliverCallback中,沉睡30秒,沉睡30秒之后,再通过Channel.basicAck进行手动应答,在手动应答之前,RabbitMQ都会认为消息没有被处理完成。
再生产任务cc,被worker02消费并处理,因为是轮询机制
再生产任务dd,被worker03消费并处理,仍然在handler函数中,会睡眠30秒,这是对消息的处理,此时还没有应答,也就是RabbitMQ不会认为这个消息被处理完成,我们在睡眠的时候,手动停止掉worker03,即断开连接,那么没有收到应答的这个dd任务会重回消息队列,并被可用的消费者worker02处理。
C1等待接收消息.处理时间较短... C1接收到的消息:aa C1接收到的消息:cc C1接收到的消息:dd
C2等待接收消息.处理时间较长... C2接收到的消息:bb
RabbitMQ持久化
概念
消息应答的手动应答,以及消息自动重新入队,可以使得任务不丢失。自动应答不可以,因为自动应答就是只要被消费者接收,即使消费者程序在处理消息的过程中出了问题,只要接收了消息都被RabbitMQ认为是消息发送完成,可以标记为已接收并进行删除。
现在如何保障当RabbitMQ服务停掉以后,消息生产者发送过来的消息不丢失呢?
确保消息不丢失,需要做两件事:
- 队列持久化
- 消息持久化
队列如何实现持久化
之前我们创建的队列都是非持久化的,rabbitmq如果重启,该队列就会被删除掉,如果要队列实现持久化,需要在声明队列的时候把durable参数设置为持久化。
消息如何实现持久化
修改生产者代码,添加属性
MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
将消息标记为持久化,只是告诉RabbitMQ将消息保存到磁盘,并不会保证完全不会丢失消息。当消息刚准备存储在磁盘的时候,但是还没有存储完,在这个时间点,持久性保证并不强,但是对于我们的简单任务队列而言,足够了,更强有力的持久化策略后面介绍。
不公平分发
我们之前学习RabbitMQ,都是轮询分发消息到消费者。但是在某种场景下,这种策略并不是很好,比如说有两个消费者在处理任务,其中一个消费者处理任务的速度很快,而另一个则很慢,这个时候如果还是采用轮询的方式,那么会使得处理速度快的消费者很大一部分时间处于空闲状态,而处理慢的消费者一直在干活,RabbitMQ并不知道这种情况,它依然很公平地进行分发。
为了避免这种情况,我们可以设置参数channel.basicQos(1)。那么处理任务快的消费者会被分配更多的消息进行处理。避免了处理任务快的消费者在大部分时间处于空闲状态。
//设置不公平分发 int prefetchCount = 1; channel.basicQos(prefetchCount);
预取值
预取值是信道可以一次性获取队列中的消息的数量。
当为1时,只能获取一条,处理完之后(消息被ack之后)才能获取下一条,因为这个未确认的消息缓冲区的大小相当于是1。
当为0时,不限制,所以队列中的消息可以轮询着发送给消费者。
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息。另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。
发布/订阅模式--Publish/Subscribe
概述
工作队列,每个任务都恰好交付给一个消费者(线程)。
在这一部分,将消息传达给多个消费者(消费者线程),这种模式称为“发布/订阅”。
发布的消息多个消费者获得,多个消费者共享。
为了说明这种模式,我们将构建一个简单的日志系统,它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。我们会启动两个消费者,其中一个消费者接收到消息后将日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上。即,生产者发布的消息广播给了所有的消费者。
Fanout介绍
Fanout类型交换机将接收到的所有消息广播到它知道的所有队列中。
发布订阅模式代码示例
生产者发布的消息广播给所有的消费者
生产者
//生产者,负责发送消息
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
// 1.交换机 2.routing key 3.其他参数 4.消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
}
}
}
消费者1
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
//第一个参数是交换机名称,第二个类型是交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个临时队列,队列的名称是随机的。
//当消费者断开与RabbitMQ的连接,这个临时队列就删除了
String queueName = channel.queueDeclare().getQueue();
//交换机与队列之间要进行绑定
//第一个参数为队列名,第二个参数为交换机名,第三个参数为routing key
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息打印在屏幕上");
channel.basicConsume(queueName, true, (s, delivery) ->
System.out.println("ReceiveLogs01打印接收到的消息:" + new String(delivery.getBody(), "UTF-8")),
s -> {});
}
}
消费者2同理
生产者发送的消息,会被多个消费者都接收到。
路由模式--Routing
概述
采用直接交换机。
消息只去到交换机根据routing key绑定的队列中去。
示例代码
生产者
public class LogProducer {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String str = scanner.nextLine();
String[] s = str.split(" ");
String message = s[0];
String routingKey = s[1];
if (!(routingKey.equals("info") || routingKey.equals("error") || routingKey.equals("warning"))) {
System.out.println("此消息被丢弃");
}
// 1.交换机 2.routing key 3.其他参数 4.消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
}
}
}
消费者--ConsoleConsumer,我们写的都是消费者线程
public class ConsoleConsumer {
public static final String EXCHANGE_NAME = "direct_logs";
public static final String QUEUE_NAME = "console";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
//第一个参数是交换机名称,第二个类型是交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个临时队列,队列的名称是随机的。
//当消费者断开与RabbitMQ的连接,这个临时队列就删除了
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//交换机与队列之间要进行绑定
//第一个参数为队列名,第二个参数为交换机名,第三个参数为routing key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
System.out.println("等待接收消息,把接收到的消息打印在屏幕上");
channel.basicConsume(QUEUE_NAME, true, (s, delivery) ->
System.out.println("ConsoleConsumer打印接收到的消息:" + new String(delivery.getBody(), "UTF-8")),
s -> {});
}
}
消费者--DiskConsumer
public class DiskConsumer {
public static final String EXCHANGE_NAME = "direct_logs";
public static final String QUEUE_NAME = "disk";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
//第一个参数是交换机名称,第二个类型是交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个临时队列,队列的名称是随机的。
//当消费者断开与RabbitMQ的连接,这个临时队列就删除了
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//交换机与队列之间要进行绑定
//第一个参数为队列名,第二个参数为交换机名,第三个参数为routing key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
System.out.println("等待接收消息,把接收到的消息存储在磁盘");
channel.basicConsume(QUEUE_NAME, true, (s, delivery) ->
System.out.println("DiskConsumer打印接收到的消息:" + new String(delivery.getBody(), "UTF-8")),
s -> {});
}
}
效果
LogProducer
aa info
bb warning
cc info
dd error
ee debug
此消息被丢弃
ConsoleConsumer
等待接收消息,把接收到的消息打印在屏幕上
ConsoleConsumer打印接收到的消息:aa
ConsoleConsumer打印接收到的消息:bb
ConsoleConsumer打印接收到的消息:cc
DiskConsumer
等待接收消息,把接收到的消息存储在磁盘
DiskConsumer打印接收到的消息:dd
主题模式--Topics
概述
- 发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。
- 在这个规则列表中,其中有两个替换符是大家需要注意的 *(星号)可以代替一个单词 #(井号)可以替代零个或多个单词
发布确认模式--Publisher Confirms
原理
生产者生产消息到RabbitMQ的队列,RabbitMQ将消息保存到磁盘上后(这是开启了消息持久化),给生产者反馈即确认,这就是发布确认。
以下三个条件必须都满足才能保证消息不丢失,保证消息保存在磁盘上:
- 设置队列持久化
- 设置消息持久化
- 发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认的策略
开启发布确认的方法
Channel channel = RabbitMqUtils.getChannel(); //开启发布确认 channel.confirmSelect();
// 消息在手动应答时不丢失,放回队列中重新消费 public class Task { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //开启发布确认 channel.confirmSelect(); //开启队列持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //从控制台中输入信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); //MessageProperties.PERSISTENT_TEXT_PLAIN:开启消息持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息: " + message); } } }
单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
这种确认方式有一个最大的缺点就是发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布。
public static void publishMessageIndividually() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); //单个确认就马上进行发布确认 channel.waitForConfirms(); // if (channel.waitForConfirms()) { // System.out.println("消息发送成功!"); // } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条消息耗时:" + (end - begin) + "ms"); }
批量确认发布
先发布一批消息,然后一起确认可以极大地提高吞吐量,这种方式的缺点就是当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍是同步的,仍然阻塞消息的发布。
//批量确认消息的大小 int batchSize = 100; for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); //批量发送消息,批量确认 if ((i + 1) % batchSize == 0) { channel.waitForConfirms(); } }
异步确认发布
生产者负责发消息就可以了,不需要等待RabbitMQ返回确认。发送的消息哪些成功了,哪些失败了,都会通知,但是是异步的,就是不需要等到收到确认通知后,再发送消息,只管发送消息就可以了。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //开始发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //准备消息的监听器,来监听哪些消息成功了,哪些消息失败了 //消息确认成功,回调函数 ConfirmCallback ackCallback = new ConfirmCallback() { @Override public void handle(long l, boolean b) throws IOException { System.out.println("确认的消息:" + l); } }; //消息确认失败,回调函数 ConfirmCallback nackCallback = new ConfirmCallback() { @Override public void handle(long l, boolean b) throws IOException { System.out.println("未确认的消息:" + l); } }; channel.addConfirmListener(ackCallback, nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条消息耗时:" + (end - begin) + "ms"); }
以上这段代码里有两个线程,一个负责监听,一个负责发布消息,并打印结果。
所以在控制台可以看到,以上负责发布消息的线程最后一行打印,打印了之后,负责监听的线程还在工作。
异步发布确认方法,如何处理未确认的消息,即如何写未确认消息的回调函数
把未确认的消息放到一个基于内存的能被发布线程访问的数据结构。
public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); //开始发布确认 channel.confirmSelect(); //准备线程安全的哈希表,适用于高并发的情况 /** * 1. 轻松地将序号与消息进行关联 * 2. 根据消息序号(id),批量删除条目 * 3. 支持高并发(多线程) */ ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap(); //准备消息的监听器,来监听哪些消息成功了,哪些消息失败了,这是单独的线程,而下面发布消息的线程则是另一个线程,所以这个方法里两个线程在工作 //消息确认成功,回调函数 ConfirmCallback ackCallback = (tag, multiple) -> { // 删除掉已经确认的消息 if (multiple) { //headMap就是获取第一个key到输入key为止的map ConcurrentNavigableMap confirmed = concurrentSkipListMap.headMap(tag); confirmed.clear(); } else { //不用headMap,也就是不批量处理,那么就是只对当前标记的消息做处理。 concurrentSkipListMap.remove(tag); } System.out.println("确认的消息:" + tag); }; //消息确认失败,回调函数 ConfirmCallback nackCallback = (l, b) -> { System.out.println("未确认的消息tag:" + l + "\t" + "未确认的消息内容: " + concurrentSkipListMap.get(l)); }; channel.addConfirmListener(ackCallback, nackCallback); //开始时间 long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); //此处记录下消息的总和 concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条消息耗时:" + (end - begin) + "ms"); }
交换机
概述
我们之前发布消息,写交换机,写的都是
""
空字符串,这是使用的默认交换机。默认交换机会帮我们路由到指定的队列。
RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列,实际上,通常生产者都不知道这些消息被传递到了哪些队列中。
生产者只能将消息发送到交换机,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将他们推入队列,交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是丢弃他们,这由交换机的类型来决定。
交换机的类型
- direct--路由类型
- topic--主题类型
- headers(不常用)
- fanout--扇出(发布订阅类型)
空字符串表示默认交换机。
消息能发送到队列中其实是由
routingKey(bindingkey)
绑定key指定的。临时队列
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
绑定
生产者发消息给交换机,交换机是和队列绑定的,通过Routing Key绑定。交换机将消息通过Routing Key路由给指定队列。
绑定就是交换机和队列之间的桥梁,它告诉我们交换机和哪个队列进行了绑定。
三、高级部分
1. 死信队列
概述
死信就是无法被消费的消息。
Consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信。
应用场景:
为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,还有比如说,用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信的来源
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加消息到mq中)
- 消息被拒绝(basic.reject或basic.nack),并且requeue=false。此时处理不了,先拒绝,并不允许消息重新放入队列,放入死信队列中,后续进行处理。
代码架构图
代码示例
消费者01
// 当前类是消费者1 public class Consumer01 { //普通交换机的名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机的名称 public static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列的名称 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列的名称 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信和普通交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列,普通队列必须设置一定的参数才能当消息成为死信后转发到死信交换机 //要指定最后一个参数的内容 Map<String, Object> arguments = new HashMap<>(); //正常队列设置死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信交换机路由到死信队列的routing key arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //声明死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机和普通队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机和死信队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收消息..."); channel.basicConsume(NORMAL_QUEUE, true, (s, delivery) -> System.out.println("consumer01接收的消息:" + new String(delivery.getBody(), "UTF-8")), s -> {}); } }
生产者(设置消息过期)
public class Producer { //普通交换机的名称 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //死信消息,设置TTL时间 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 1; i <= 10; i++) { String message = "info" + i; //本次第三个参数要写,因为要模拟发送死信消息 channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8)); } } }
这次在生产者这边设置了消息的过期时间为10秒钟。
启动生产者之前,把消费者01停掉,于是启动了生产者,生产的10条消息没有消费者消费,超过10秒钟,那么过期,过期之后这些消息成了死信消息,死信消息进入死信队列。
为什么消费者01都停掉了,死信消息还是能进入死信队列,是因为之前启动消费者01已经建立好了普通交换机、普通队列、死信交换机和死信队列之间的关系!
要注意生产者一定是和普通交换机、普通队列对接的,生产者不用管死信交换机和死信队列。
设置普通队列长度。展示因为队列长度,消息进入死信队列。
设置普通队列长度是在消费者01设置。
比如设置普通队列长度为6,那么发送10个消息,有4个消息会进入死信队列。
arguments.put("x-max-length", 6);
先启动消费者01,然后停掉,再启动生产者。
停掉消费者01的目的是,为了不让消费者很快地把队列中的消息都消费掉,停掉消费者01后,才会出现消息堆积在队列中的结果,于是普通队列只能最多堆积6个,剩下4个消息会到死信队列中。
设置手动拒绝消息,手动拒绝的消息进入死信队列。
//开启手动应答,如果是自动应答,就不存在拒绝的情况 channel.basicConsume(NORMAL_QUEUE, false, (s, delivery) -> { String msg = new String(delivery.getBody(), "UTF-8"); if ("info5".equals(msg)) { System.out.println(msg + " 被拒绝"); //第一个参数获得标签,第二个参数表示不放回普通队列,此被拒绝的消息就会成为死信,会进入死信队列。 //如果第二个参数为true,此被拒绝的消息还会重新放回普通队列 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); } else { System.out.println("consumer01接收的消息:" + msg); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }, s -> {});
生产者发送10条消息,其中info5被拒绝,并且第二个参数为false,说明不重新放回普通队列,进入死信队列。
其余9条消息,被消费者01消费掉。
2. 延时队列
概述
延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
TTL
TTL是RabbitMQ中一个消息或一个队列的属性,表明一条消息或者该队列中所有的消息的最大存活时间,单位是毫秒。
如果一条消息设置了TTL,或者进入了设置TTL属性的队列,那么这条消息如果在TTL时间内没有被消费,则会成为死信。
要注意并不是设置了TTL,在TTL之后,队列中的消息就一定会成为死信,而是TTL时间内,消息没有被消费,那么消息会成为死信。
如果同时配置了消息和队列的TTL,那么较小的那个值将被使用。
前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,延时队列就可以新鲜出炉了。延时队列,就是想要消息延迟多久被处理,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。
整合springboot
新建springboot项目
修改pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.atguigu.rabbitmq</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <!--swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
修改application.properties文件
spring.rabbitmq.host=42.192.182.4 spring.rabbitmq.username=admin spring.rabbitmq.password=123
添加swagger配置类
@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo() { return new ApiInfoBuilder() .title("rabbitmq接口文档") .description("本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .build(); } }
代码实现延迟队列
代码架构
需要写一个生产者,一个消费死信队列消息的消费者。
通过死信队列和TTL实现普通队列中的消息经过TTL,没有被消费者消费之后,放到死信队列,被对应消费者消费,实现延时队列消费。
没整合springboot之前,普通交换机、死信交换机、普通队列、死信队列都是放在消费者代码中进行声明,现在整合了springboot,在专门的配置文件类代码中进行声明。不需要生产者和消费者再负责这块业务。
@Configuration public class TTLQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; //在springboot容器中,声明创建的对象,以前通过配置文件声明的对象,现在通过springboot配置类创建 //创建好的对象放在springboot容器中,可以使用 //声明普通交换机对象 @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } //声明死信交换机对象 @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明普通队列A @Bean("queueA") public Queue queueA() { Map<String, Object> args = new HashMap<>(); //声明当前普通队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前普通队列的死信路由(死信交换机通过死信路由将消息传到死信队列) args.put("x-dead-letter-routing-key", "YD"); //声明队列的TTL args.put("x-message-ttl", 10000); Queue queueA = QueueBuilder.durable(QUEUE_A).withArguments(args).build(); return queueA; } //声明普通队列A和普通交换机xExchange绑定 @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //声明普通队列B @Bean("queueB") public Queue queueB() { Map<String, Object> args = new HashMap<>(); //声明当前普通队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前普通队列的死信路由 args.put("x-dead-letter-routing-key", "YD"); //声明队列的TTL args.put("x-message-ttl", 40000); Queue queueA = QueueBuilder.durable(QUEUE_B).withArguments(args).build(); return queueA; } //声明普通队列B和普通交换机xExchange绑定 @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //声明死信队列QD @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } //声明死信队列QD和死信交换机yExchange绑定关系 @Bean public Binding queuedBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
生产者
controller作为生产者,客户端的请求作为发送的消息,作为生产者。
//发送延时消息 @RestController @Slf4j @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sedMsg(@PathVariable String message) { log.info("当前时间:{}, 发送一条信息给两个ttl队列,消息:{}", new Date(), message); rabbitTemplate.convertAndSend("X", "XA", "消息发送至ttl为10s的队列:" + message); rabbitTemplate.convertAndSend("X", "XB", "消息发送至ttl为40s的队列:" + message); } }
消费者,此消费者是消费死信队列的
//死信队列的消费者 @Component @Slf4j public class DeadLetterQueueConsumer { //接收消息 @RabbitListener(queues = "QD") public void receiveD(Message message) throws Exception { String msg = new String(message.getBody(), "UTF-8"); log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg); } }
因为在TTL时间内,没有消费者消费普通队列中的消息,那么TTL时间后,普通队列中的消息则成为死信消息,发送至死信队列,这是在配置类配置好的。
此消费者便消费死信队列中的消息
延时队列的优化
问题:
每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,岂不是要增加无数个队列才能满足需求?
解决
将设置队列TTL改为设置消息TTL,即在消息生产者设置发送消息的TTL。
新增一个普通队列QC,该队列不设置TTL时间。
//声明普通队列 C 死信交换机 @Bean("queueC") public Queue queueC(){ Map<String, Object> args = new HashMap<>(); //声明当前普通队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前普通队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //没有声明 TTL 属性 return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } //声明队列 C 绑定普通交换机 @Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }
生产者
//发消息 @GetMapping("/sendExpirationMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) { log.info("当前时间:{}, 发送一条时长为{}的信息给C队列,消息:{}", new Date(), ttlTime, message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { //发送消息的延时 //相当于在发送消息的时候设置延时,想延时多久就延时多久,超过了延时,由于没有消费者消费普通队列中的消息,消息成为死信消息进入死信队列。 msg.getMessageProperties().setExpiration(ttlTime); return msg; }); }
这就是在生产者方设置消息的延时。
设置消息延时有两种方式
给队列设置TTL
给消息设置TTL
以上在生产者方发送消息的时候,给消息设置延时,就是这种方式。
一旦发送两条以上消息时,消息是排队的,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
第二个消息明明TTL时间很短,比第一个消息时间短,仍然按照第一个消息时间延时消费。
基于插件的延时队列
为了解决上一小节提到的“一旦发送两条以上消息时,消息是排队的,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。”这个问题。
基于死信实现的延时队列
基于插件实现的延时队列
延时的位置是交换机而不是队列。
实现
安装延时队列插件
执行命令:
whereis rabbitmq 找到rabbitmq路径
下载插件:rabbitmq_delayed_message_exchange-3.8.0.ez
放到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins目录下
在plugins目录下,执行命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使该插件生效
重启rabbitmq
写配置类代码:
写延迟交换机、延迟队列,并将延迟交换机和延迟队列绑定,就不基于死信来实现延迟队列了。
@Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean("delayedQueue") public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } //延迟交换机 //通过自定义交换机来实现 @Bean("delayedExchange") public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } //绑定延迟交换机和延迟队列 @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs(); } }
写生产者代码
//发消息 @GetMapping("/sendDelayedMsg/{message}/{delayTime}") public void sendDelayedMsg(@PathVariable String message, @PathVariable Integer delayTime) { log.info("当前时间:{}, 发送一条时长为{}的信息给延迟交换机,消息:{}", new Date(), delayTime, message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> { //发送消息的延时 //相当于在发送消息的时候设置延时,想延时多久就延时多久,超过了延时,由于没有消费者消费普通队列中的消息,消息成为死信消息进入死信队列。 msg.getMessageProperties().setDelay(delayTime); return msg; }); }
写消费者代码
@Component @Slf4j public class DelayedQueueConsumer { //接收消息 @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveD(Message message) throws Exception { String msg = new String(message.getBody(), "UTF-8"); log.info("当前时间:{},收到延时消息:{}", new Date(), msg); } }
效果
2022-04-21 17:37:52.584 INFO 16732 --- [nio-8001-exec-1] c.a.r.controller.SendMsgController : 当前时间:Thu Apr 21 17:37:52 CST 2022, 发送一条时长为20000的信息给延迟交换机,消息:tmac 2022-04-21 17:37:56.343 INFO 16732 --- [nio-8001-exec-2] c.a.r.controller.SendMsgController : 当前时间:Thu Apr 21 17:37:56 CST 2022, 发送一条时长为2000的信息给延迟交换机,消息:tmac1 2022-04-21 17:37:58.387 INFO 16732 --- [ntContainer#1-1] c.a.r.consumer.DelayedQueueConsumer : 当前时间:Thu Apr 21 17:37:58 CST 2022,收到死信队列的消息:tmac1 2022-04-21 17:38:12.649 INFO 16732 --- [ntContainer#1-1] c.a.r.consumer.DelayedQueueConsumer : 当前时间:Thu Apr 21 17:38:12 CST 2022,收到死信队列的消息:tmac
可以看到并没有出现消息排队的效果,后发的消息延时少,为2秒,先发的消息延时多,有20秒,但是后发的消息,因为延时少,仍然被优先处理,从控制台可以看到。
3. 发布确认高级
概述
在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间,生产者消息投递失败,导致消息丢失,需要手动处理和恢复。
如何才能进行rabbitmq的消息可靠投递呢?
交换机和队列只要有一个不存在,那么消息都会丢失。
所以应该有缓存,临时存储消息,通过定时任务将未发送的消息进行重新投递。
所以这部分发布确认高级内容,就是考虑了我们之前没有考虑的内容,就是发送消息给交换机,交换机不存在,或者说交换机收不到,那么发出的消息应该怎么处理。
生产者发送消息给mq,一旦交换机接收不到消息,我们要对消息进行缓存处理。
交换机接收不到生产者发送的消息,要做两件事情
- 让生产者知道发送的消息,交换机没有接收到
- 将未发送成功的消息进行缓存
3.1 发布确认
配置文件
spring.rabbitmq.publisher-confirm-type=correlated
表示发布消息到交换机后会触发回调方法
添加配置类
@Configuration public class ConfirmConfig { //交换机 public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; //队列 public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //routing key public static final String CONFIRM_ROUTING_KEY = "key1"; //声明交换机 @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE_NAME); } //声明队列 @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding bindingConfirmQueue(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY); } }
生产者
回调接口
作用就是当消息发不出去,交换机收不到了,要通过回调接口将消息保存下来,缓存下来。
我们要写ConfirmCallback这个回调接口的实现类。
@FunctionalInterface public interface ConfirmCallback { void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3); }
第一个参数是消息内容
第二个参数是发送成功或失败
第三个参数是如果发送失败,那么发送失败的原因是什么
实现回调接口
@Component//这个注解的作用是创建实例化对象 @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; //注入 @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); } /* 这个回调方法什么时候会被调用呢? 1.发消息,交换机接收到了,回调 1.1 correlationData 保存回调消息的id及相关消息 1.2 交换机收到消息 true 1.3 发送消息失败原因,为null 2. 发消息,交换机接收失败了,回调 2.1 correlationData 保存回调消息的id及相关消息 2.2 交换机未收到消息,false 2.3 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到消息,id为:{}", id); } else { log.info("交换机未收到消息,id为:{}, 原因为:{}", id, cause); } } }
消费者
@Component @Slf4j public class ConfirmMessageConsumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMessage(Message message) throws UnsupportedEncodingException { log.info("接收到的消息" + new String(message.getBody(), "UTF-8")); } }
测试
测试时将交换机名写错
2022-04-21 19:23:53.839 INFO 7348 --- [nio-8001-exec-1] c.a.r.controller.ProducerController : 发送消息内容为:hello 2022-04-21 19:23:53.881 ERROR 7348 --- [.192.182.4:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange1' in vhost '/', class-id=60, method-id=40) 2022-04-21 19:23:53.884 INFO 7348 --- [nectionFactory2] com.atguigu.rabbitmq.config.MyCallBack : 交换机未收到消息,id为:1, 原因为:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange1' in vhost '/', class-id=60, method-id=40)
交换机如果收到了消息,但是routing key有错导致消息不可路由,那么消息会被直接丢弃。
解决:通过设置mandatory参数可以当消息到达交换机,但是路由不到队列的时候,将消息返回给生产者。
3.2 回退消息
修改配置文件
spring.rabbitmq.publisher-returns=true
写回调接口的实现类
@Component//这个注解的作用是创建实例化对象 @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; //注入 @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(this); } /* 这个回调方法什么时候会被调用呢? 1.发消息,交换机接收到了,回调 1.1 correlationData 保存回调消息的id及相关消息 1.2 交换机收到消息 true 1.3 发送消息失败原因,为null 2. 发消息,交换机接收失败了,回调 2.1 correlationData 保存回调消息的id及相关消息 2.2 交换机未收到消息,false 2.3 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到消息,id为:{}", id); } else { log.info("交换机未收到消息,id为:{}, 原因为:{}", id, cause); } } // 当消息传递过程中,不可到达目的地时,将消息返回给生产者 @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.info("回退的内容:" + returnedMessage); } }
测试
将routing key写错,也就是交换机能接收到消息,但是路由不到队列,此时消息会被退回
2022-04-21 19:54:04.757 INFO 11516 --- [nio-8001-exec-1] c.a.r.controller.ProducerController : 发送消息内容为:hello 2022-04-21 19:54:04.802 INFO 11516 --- [nectionFactory1] com.atguigu.rabbitmq.config.MyCallBack : 回退的内容:ReturnedMessage [message=(Body:'hello' MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=key11] 2022-04-21 19:54:04.803 INFO 11516 --- [nectionFactory1] com.atguigu.rabbitmq.config.MyCallBack : 交换机已经收到消息,id为:1
从日志可以看到消息被回退了,但是交换机接收到了消息,但是由于routing key有错,交换机无法将消息路由到队列,于是消息被回退。
3.3 备份交换机
无法发送给交换机的消息,发送给备份交换机
如果消息无法发送给普通交换机(交换机出问题)或者无法路由到队列(队列或routing key出问题),转发给备份交换机,代码如下
备份交换机对象的创建,两个队列的创建,以及他们与备份交换机之间的绑定都和之前是一样的。