消息队列RabbitMQ
消息队列是一种消息中间件,不负责处理消息,仅负责消息的接受存储和转发.
RabbitMQ特点:
- **消息传递模式:**支持多种消息传递模式,包括发布订阅、点对点和工作队列等。
- **消息路由和交换机:**RabbitMQ引入了交换机(Exchange)概念,用于将消息路由到一个或者多个队列。允许根据消息内容、标签或者路由键进行灵活的消息路由。
- **消息确认机制:**RabbitMQ支持消息确认机制,保证消息不会被重复消费。
- **可扩展性:**RabbitMQ可以通过添加更多的节点和集群来增加吞吐量和可用性。
- 支持多种编程语言。
- **消息持久性:**Rabbit允许消息和队列进行持久化设置,确保消息在RabbitMQ重启后不会丢失。
- **灵活的插件系统:**RabbitMQ具有丰富的插件系统,可以拓展多种功能。
- 具有易于管理的Web界面。
为什么要使用消息队列
- 流量削峰:队列缓存请求
- 应用解耦
- 异步处理
1 AMQP
AMQP(Advanced Message Queuing Protocol)不是一个具体的消息中间件产品,而是一个协议规范,一种为面向消息的中间件设计的应用层协议。AMQP提供了一种统一的消息服务,是的不同程序之间可以通过消息队列进行通信。SpringBoot框架默认就提供了对AMQP协议的支持。
AMQP 本质上是一个开放的标准,他不光可以被 RabbitMQ 实现,也可以被其他产品实现。通过这种标准的协议,实际上是可以在不同的消息中间件系统之间进行灵活的消息传递。只不过,目前具体实现这种标准的产品目前并不多,RabbitMQ 则是最有影响力的一个产品。因此,RabbitMQ 成了 AMQP 协议事实上的代表。SpringBoot 框架默认提供的 AMQP 协议支持底层也是基于 RabbitMQ 产品实现的。
AMQP 协议的三层:
- Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
- Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
- TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。
AMQP 模型的三大组件:
- 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
- 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
- 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
2 RabbitMQ各组件及其功能
- **Broker:**RabbitMQ服务器,接受客户端连接,实现AMQP实体服务
- **Virtual Host:**虚拟主机,实现逻辑隔离,用于隔离不同环境或不同应用的消息流。每个虚拟主机都有自己的Exchange和Queue。
- **Connection:**连接,管理和维护RabbitMQ服务器的TCP链接,生产者和消费者通过这个连接与Broker建立物理网络连接(RabbitMQ消息基于TCP进行传输)。
- **Channel:**信道,是在Connection中创建的轻量级通道,客户端可以建立多个信道,可以减小建立TCP Connection的开销,信道数量没有限制。
- **Exchange:**交换机,负责接受来自生产者的消息,并将其路由到一个或多个队列。有direct、topic、fanout、headers四种模式。
- **Queue:**队列是消息的存储位置,每个队列都有唯一的名称。
- **Binding:**绑定,是Exchange和Queue之间的关联规则,定义了消息如何从交换机路由到特定队列。
此外生产者和消费者也是消息队列中的核心组件,生产者负责发送消息到Exchange或者Queue,消费者负责从Queue中订阅和处理消息。
- 生产者:生产者是消息的发送方,负责产生并发送消息到 RabbitMQ。生产者通常将消息发送到交换机(Exchange)。
- 消费者:消费者是消息的接收方,负责从队列中获取消息并进行处理。消费者通过订阅队列来接收消息。
- 消息:消息是生产者和消费者之间传递的数据单元。消息通常包含消息体和可选的属性,如路由键等。
3 RabbitMQ中交换机类型
- Direct Exchange:根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。
- Topic Exchange:根据消息的路由键与队列绑定时指定的路由键模式匹配程度,将消息进行路由到一个或多个队列。路由键可以使用通配符(*【匹配一个单词】,#【匹配零个或多个单词】)。
- Headers Exchange:根据消息的表头信息决定消息的路由,而不是路由键,当消息的表头与绑定规则完全匹配时,才会被路由到该队列。
- 消费方指定的headers中需要有一个x-match键
- all:所有键匹配才可以接受消息
- any:只要有键值对匹配成功,就可以接受消息
- 消费方指定的headers中需要有一个x-match键
- Fanout Exchange:广播路由,用于发布/订阅模式。
- Default Exchange:默认实现的交换机,不需要手动创建。当消息被发布到默认交换机时,路由键会被解释为队列的名称,实现点对点通信。
4 RabbitMQ工作原理
AMQP协议模型有三部分组成:生产者、消费者和服务端。
执行流程:
- 生产者连接到服务端,建立连接,开启信道
- 生产者声明交换器和队列,设置相关属性,并通过路由关键词将交换器和队列进行绑定
- 消费者建立连接,开启信道,监听消息
- 生产者发送消息
- 虚拟主机根据路由关键词选择路由,发送道不同的消息队列
- 消费者拿到消息进行消费
5 RabbitMQ工作模式
- Simple模式:一对一。
- Work Queue工作队列模式:生产者发送消息到Queue,多个消费者同时消费,消息会均匀分配给多个消费者。
- 发布订阅模式:生产者发送消息到Queue,所有的消费者都会消费同一个消息。
- 路由模式:在发布订阅模式上增加了路由键,根据路由键判断将消息转发到哪些Queue中。
- 主题模式:在路由模式基础上,添加了模糊匹配的功能。
6 消息持久化
RabbitMQ 允许消息的持久化,以确保即使在 RabbitMQ 服务器重新启动后,消息也不会丢失。RabbitMQ 可以通过以下方式实现消息的持久化:
- 消息持久化:在 RabbitMQ 中,只需要在发送消息时,将delivery_mode属性设置为 2,就可以将消息标记为持久化。
- 队列持久化:在 RabbitMQ 中声明队列时,也可以将队列声明为持久化。RabbitMQ 中的队列分为三种不同类型经典队列,仲裁队列和流式队列。其中,经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。
- 交换机持久化:与经典队列类似,RabbitMQ 也可以在声明交换机时,将交换机的 durable 属性设置为true,这样就可以将交换机标记为持久化。
RabbitMQ 的持久化机制会对其性能产生影响。因此,需要根据具体的业务场景和需求来权衡是否需要持久化以及需要哪种类型的持久化。
7 如何保证RabbitMQ消息的顺序性
- 拆分多个queue,对同一数据的操作放在一个队列中
- 只是用一个队列和一个消费者
8 有哪些情况会发生消息丢失,如何保证不丢失
- 生产者发送丢失:
- 原因:网络问题,代码问题
- 解决:发布确认机制,生产者设置信道为confirm模式,broker收到消息会发送确认消息给生产者
- 或者开启AMQP事务处理,但是这个方式是同步的,会阻塞,不推荐
- RabbitMQ存储丢失
- 原因:消息没有持久化,网络问题
- 解决:消息回退(设置mandatory参数,消息不可达时返回给生产者)、持久化
- 消费者丢失
- 原因:消费端宕机或消息处理异常
- 解决:手动ack确认
9 Rabbit如何保证消息被消费
- 消费端配置手动ACK确认机制
- 结合数据库进行状态标记
- 在新增场景,可以采用数据库唯一约束
- 在更新场景,可以考虑乐观锁+版本号
- 插入记录时 ,在业务代码中检查
- 利用一个去重表(可以基于Redis实现,并利用TTL设置过期时间),插入成功的进入消费流程,失败的查看是否被消费,被消费则返回成功,没有则进行延时消费。
10 Rabbit中如何进行事务处理
RabbitMQ 提供了事务处理机制,允许生产者在发送消息时将操作包装在一个事务中,以确保消息的可靠性传递。在 RabbitMQ 中,事务是通过通道(Channel)来实现的。可以通过以下步骤进行事务处理:
- 开启事务:在生产者端,可以通过调用 Channel 的 tx_select 方法来开启一个事务。这将启动一个新的事务,并将所有后续的消息发布操作放在该事务内。
- 发送消息:接下来在事务中,可以正常发送消息。如果消息发送失败,事务会自动回滚。
- 提交事务:如果事务中所有消息发送成功后,需要提交事务。可以通过调用 Channel 的tx_commit方法提交事务。
- 处理异常:如果在事务过程中发生异常,可以使用 try/catch 快来捕获异常。然后在异常处理过程中,调用 Channel 的 tx_rollback 方法来回滚 RabbitMQ 相关的事务操作。
需要注意的是,RabbitMQ 的事务处理是基于存储过程的,它可以保证在事务中的操作要么全部成功,要么全部失败。但是,由于 RabbitMQ 是一个异步的消息队列系统,事务处理可能会对其性能产生影响。因此,需要根据具体的应用场景和需求来权衡是否需要使用事务以及如何使用事务。
11 如何解决消息堆积问题
产生堆积的原因:
- 消费者处理速度慢
- 队列容量小
- 网络故障
- 消费者故障
- 队列使用不当
- 消息太大
- 业务逻辑复杂且耗时
- 生产速度大于消费速度
解决方案:
- 增加消费者数量,水平拓展
- 提高消费者处理效率,代码调优、增加资源
- 消息预取限制
- 增加队列容量
- 采用死信队列等容错机制
- 对大型消息进行分片
- 优化业务逻辑
12 死信队列
死信队列是 RabbitMQ 提供的一种特殊序列,处理那些无法被正常消费的消息。有三种情况会产生死信:
- 消息被消费者明确拒绝。
- 消息达到预设的过期时间仍没有消费者消费。
- 消息由于队列已经达到最大长度限制而被丢弃。
在 RabbitMQ 中,实现死信队列只需要给正常队列增加三个核心参数即可:
- dead-letter-exchange:指定当前队列对应的死信队列
- dead-letter-routing-key:指定消息转入死信队列时的路由键
- message-ttl:消息在队列中的过期时间。
接下来,就可以往正常队列中发送消息。如果消息满足了某些条件,就会成为死信,并被重新发送到对应的死信队列中。而此时,RabbitMQ 会在消息的头部添加一些与死信相关的补充信息,例如时间、成为死信的原因、原队列等。应用程序可以按需处理这些补充的信息。
最后,死信队列中的消息都是正常业务处理失败的消息,应用程序需要创建一个消费者来专门处理这些被遗漏的消息。例如记录日志、发送警报等。这样才能保证业务数据的完整性。
13 延时队列
延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
- 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
- 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。
也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
14 优先级队列
RabbitMQ 自 V3.5.0 有优先级队列实现,优先级高的队列会先被消费。
可以通过x-max-priority
参数来实现优先级队列。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。
15 集群
RabbitMQ 支持两种主要类型的集群:普通集群(Classic Cluster)和镜像集群(Mirrored Cluster)。他们之间有一些重要的区别:
- 普通集群: 这种模式使用Erlang语言天生具备的集群方式搭建。这种集群模式下,集群的各个节点之间只会有相同的元数据,即队列结构,而消息不会进行冗余,只存在一个节点中。消费时,如果消费的不是存有数据的节点, RabbitMQ会临时在节点之间进行数据传输,将消息从存有数据的节点传输到消费的节点。很显然,这种集群模式的消息可靠性不是很高。因为如果其中有个节点服务宕机了,那这个节点上的数据就无法消费了,需要等到这个节点服务恢复后才能消费,而这时,消费者端已经消费过的消息就有可能给不了服务端正确应答,服务起来后,就会再次消费这些消息,造成这部分消息重复消费。 另外,如果消息没有做持久化,重启就消息就会丢失。并且,这种集群模式也不支持高可用,即当某一个节点服务挂了后,需要手动重启服务,才能保证这一部分消息能正常消费。所以这种集群模式只适合一些对消息安全性不是很高的场景。而在使用这种模式时,消费者应该尽量的连接上每一个节点,减少消息在集群中的传输。
- 镜像集群:这种模式是在普通集群模式基础上的一种增强方案,这也就是RabbitMQ的官方HA高可用方案。需要在搭建了普通集群之后再补充搭建。其本质区别在于,这种模式会在镜像节点中间主动进行消息同步,而不是在客户端拉取消息时临时同步。并且在集群内部有一个算法会选举产生master和slave,当一个master挂了后,也会自动选出一个来。从而给整个集群提供高可用能力。这种模式的消息可靠性更高,因为每个节点上都存着全量的消息。而他的弊端也是明显的,集群内部的网络带宽会被这种同步通讯大量的消耗,进而降低整个集群的性能。这种模式下,队列数量最好不要过多
总的来说,普通集群适用于对性能要求高,但可以接受数据丢失的情况。而镜像集群则适用于对数据持久性和可用性有更高要求,并愿意付出一些性能代价的场景。