消息队列
消息队列是一种中间件,用于在不同系统和组件之间通信,核心功能是解耦生产者和消费者支持【异步处理】、【服务解耦】、【削峰填谷】
异步处理
消息队列的思想是:减少请求等待时间,让服务异步并发处理,提升总体性能。
当一个请求的请求链越来越长时,一个请求得到的响应时间就会越来越大,在大的电商系统中,严重影响用户体验,比如我要买一个冰箱,非得等我下单短信收到,会员注册成功,邮件发送成功后,手机界面才显示我已经成功购买,那还得了,买一个东西竟然让我等这么长的时间。
将一个请求链中不紧急响应的请求拎出来,而需要紧急响应的请求在完成后往消息队列中放入消息,被拎出来的不紧急的请求并发处理消息队列中取消息完成自己的业务逻辑

服务解耦
消息队列的思想:将请求链中需要立刻响应客户请求的这部分,该部分的业务逻辑完成后立刻响应客户端,并产生一个消息放在消息队列中,请求链的下游部分,可以订阅这个消息完成自己的业务逻辑,业务扩充时只需要新增下游的业务代码就行
当用户发起请求在电商后台就有一条请求链,为了业务扩充,一条请求链会越来越长,如果不使用消息队列,但凡一个接口出错,都有可能会影响到整个请求链的核心业务,如果下游有业务增加,程序员还需要更改上游的程序,这也太麻烦了,而且容易出问题

削峰填谷
消息队列的思想:使用队列暂存请求,服务按照自己的处理速度从请求队列中拿请求去处理
使用消息队列做缓冲,在面对秒杀这类大活动时,在某一时刻就会接收大量的请求, Redis和MySQL 是无法承受的,消息队列就起到了缓冲的作用。

消息队列模型
1. 队列模型
发送消息的是生产者,接收消息的是消费者,多个生产者可以往同一个消息队列中发送消息,但是队列中的消息只能被一个消费者消费。

2. 发布订阅模型
将消息发送到一个主题中,订阅了这个主题的消费者就可以消费这条消息,解决了一条消息只能被一个消费者消费的问题。
发布订阅模型的本质:维护若干个队列,生产者使用某种机制或算法往消息队列中发送消息,如果一个主题中有n个队列,那么并发度就是n。
订阅有两种类型:1. 临时订阅; 2. 持久订阅
①临时订阅:订阅只有在消费者启动并且运行的时候才存在,一旦消费者退出,相应的订阅和消息就会丢失
②持久订阅:这种订阅会一直存在,除非主动删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以继续被处理
如何保证消息不丢失
【造成消息丢失的原因】
- 消费者消费消息失败
MQ向消费者推送一条消息,消费者收到这条消息后会立刻返回一条ack,MQ收到ack后会删除这条消息,如果消费者来不及消费这条消息就宕机,那么这条消息就丢失了。
- 生产者生产消息失败
生产者向MQ推送消息,由于网络原因MQ没有收到这条消息,就造成了消息丢失
- MQ消息丢失
MQ中的消息是存在于内存中的,若断电则消息丢失
保证消息不丢失是生产者、broker、消费者三个共同决定的

生产者生产消息阶段:发送消息到broker中,需要接收Broker的响应并处理这个响应,如果响应内容失败,生产者还需要重新发送消息到broker中
Broker存储消息阶段:①消息持久化,即将收到的消息刷盘,接收到消息需要进行刷盘,刷盘成功后再响应生产者,才算成功接收生产者的消息。②RabbitMQ或Kafka这类中间件使用时是部署的一个集群,队列中间件通常会写多个节点,实现消息的多个副本
消费者消费消息阶段:消费者拿到消息并且真正执行完这个消息的业务逻辑,再响应Broker,这才算消费成功
如何处理重复消息
处理思想:幂等处理重复消息
先来了解一下消息为什么会被重复消费?
可能性1:生产者重复发送同一个消息
消息重复接收和发送是不可避免的,broker再刷盘完成后,给生产者发送响应信息,然后此时因为网络不稳定导致这个响应信息被丢失,生产者因为超时重传机制重传消息,导致Broker重复接收了消息,消费者取到消息了,并且执行完业务逻辑了,刚准备获取下一个消息时挂掉,其它消费者顶替前一个消费者,这条消息就会被重复处理了
可能性2:Broker多次发送同一个消息
在网络不稳定、消费者消费速度慢、消息积压的场景下都有可能导致消息被重复消费。因为Broker在发送消息给消费者的时候是希望能收到消费者的ACK的,这样Broker才确定自己发送出去的消息已经被消费者接收了,但是当网络不稳定的时候,消费者发送的ACK可能会丢失,当消息积压的时候,Broker发送出去的消息迟迟不能被消费者消费,Broker迟迟收不到ACK,它就认为消费者挂了或者以为自己发送的消息丢失了,Broker就会重新发送消息。
可能性3:消费者挂了没来的及发送ACK
消费者确实接收到消息了,也正在处理消息,可能消费者刚处理完这条消息它就蹦了(这很正常,一个线程可能就会在运行的过程中崩溃,突然卡死或者退出程序),它没有来的及给Broker发送ACK,Broker没有收到消费者的ACK,它就一直不放心,然后重复发送消息,直到收到消费者的ACK为止,这样消费者就会消费重复消息了。
那如何处理消息被重复消费的问题呢?
解决消息被重复消费很重要,总不能让用户在买东西的时候,下了一笔单结果扣了两笔甚至更多的钱吧,那还得了!
【幂等处理】:解决消息重复被消费的方式可能有多种,但是核心的思想就是对于接到重复的消息,不进行处理,不论接收到多少个重复消息,处理结果都和第一次处理这个消息的结果一致,这就是幂等处理思想。
【具体的做法】:Redis缓存几分钟内消息ID + 数据库去重表
给每条消息做个标记,比如可以用消息ID和业务ID做标记(都是唯一的),先从redis中检查消息是否被消费过,如果消费过直接丢弃不处理,如果没有在redis中找到消息,那么就去数据库去重表中查询消息,如果数据库去重表中也没有这种消息,说明消息被首次消费,将唯一ID写入redis和数据库去重表中。
这种方法避免了redis中占用过多的内存。因为redis中缓存了5~10分钟内的消息ID,所以超过这个时间段内的重复消息还是比较少的,也不会频繁访问数据库表,不用担心访问数据库表导致系统性能的下降。
1 | 创建数据库去重表 |

如何保证消息的有序性
全局有序:只有一个生产者发送消息,一个单线程消费者消费这个队列,主题中也只有一个队列

局部有序:在主题中划分出若干个队列,每个队列都有一个单线程消费者消费,生产者按照某种机制往队列中发送消息

如何保证消息被顺序消费
保证消息是按照顺序放入队列中的,但这不能保证消息一定是被顺序处理的,因为消费者是多线程的,消费者顺序拿消息,但是不能保证处理结果也是顺序的。
保证消息顺序被消费是:给一个业务添加一个唯一的ID,这要是和这个业务相关的消息生产出,就使用Hash 取模方法,将一个业务产生的信息发送到同步发送一个队列中

如何处理堆积消息
导致消息积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了要解决积压的问题,可以通过扩容消费端的实例数来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务
总结和补充
如何保证消息不丢失、如何处理重复消息、如何保证消息的有序性、如何处理堆积消息都是在保证消息队列消费消息的一致性,总而言之,保证消息的一致性是通过“重试机制”+“幂等性设计”+“死信队列”+“延迟队列”+“补偿机制”+“监控机制”共同实现的。
【重试机制】:由生产者、Broker、消费者共同决定的,生产者在收到Broker消息刷盘后发送的ACK才能确保消息被成功发送;Broker在刷盘成功后发送ACK才能保证消息被持久化不会丢失;消费者在消费成功后发送ACK给Broker才能保证消息被成功消费。所以对于生产者和消费者,没有收到ACK时会进行消息重试机制,可以使用指数退避的策略重发消息
**【幂等性设计】:**通过redis+数据库去重表的方式,解决消息被重复消费的问题
【死信队列】:①指数退避策略达到最大限制后还没有成功投递消息,这个消息就放在死信队列中 ②消费者消费消息失败,将该消息放入死信队列中 ③消费者消费消息超时
**【延迟队列】:**如果消费者业务执行失败,例如库存不足,消费者可以将消息投递到延迟队列中(RocketMQ 中有延迟队列),稍后重试。
**【补偿机制】:**可以定期扫描生产者消息和消费者消费的消息,是否一致,如果不一致,可以触发重试或者回滚
【监控机制】:监控生产者发送重复消息率,监控消费者消费重复消息率,并设置阈值进行告警,监控延迟队列、死信队列的容量,监控去重数据库表的容量不足告警,并定期进行处理
Kafka
【Kafka是一种高吞吐量的分布式发布订阅消息系统】
当一个服务器的处理能力是每秒处理100个QPS, 而客户端每秒发送1000个消息,如果没有任何措施,服务端肯定会被客户端打垮。

如何让服务端不被大量的客户端请求打垮的同时还能处理掉客户端发送的消息?那么就加一层中间件,这个中间件用于暂存客户端发送的消息,服务端按照自身的处理能力从这个中间件中取出请求。根据消息是先生产的先被消费的特性,很容易想到使用队列去存储消息,可以在服务端的进程中创建一个队列,存储客户端发送的消息。但是这种方式存在较大的安全隐患,一旦服务端的进程崩溃或者重启,那么未消费的消息都会丢失。

为了避免由于服务端进程崩溃或重启导致的消息丢失问题,需要在服务端进程之外去创建一个队列去存储消息

如何提高系统的吞吐量呢?可以增加消息发送的速度和消息消费的速度,这就需要增加多个生产者和多个消费者(发送消息的就是生产者,消费消息的就是消费者)

当消费者和生产者的速度越来越多时,对于共享资源消息队列,在访问时会产生冲突,一般通过加互斥锁的方式实现,只要有人拿到互斥锁那么其他的生产者和消费者都要等待,也就是会出现当生产者和消费者数量超过一定值时反而会使得系统的性能下降,解决方法就是将单个队列扩展成多个队列,生产者按照消息类型放进不同的队列中。如何区分不同队列的特性呢?那么可以为不同的队列创建不同的主题topic,消费者订阅不同的topic 队列,这样就大大降低了对单个消息队列的争抢。

在高并发的情况下,单个消息队列中的消息可能还是会被打满,怎么解决这个问题呢?就是将单个消息队列再次进行拆分,拆分成不同的分区 partition,每个分区由不同的消费者负责。

当所有的partion都存储在一台机器上时,生产者发送消息,消费者消费消息,都会导致 partition 所在的机器CPU 和内存过载,解决办法是什么呢?可以将partion 分散到不同的机器中,就像负载 均衡所做的那样,每一台机器就代表一个 broker,一个broker上有多个partition

但是如果某台服务器宕机了,那么这台服务器上所有的partition都会丢失,也就是消息丢失,为了保证可靠性,可以为每个 broker上的partition创建副本,并且将副本存放到不同的broker 上,以防止单点故障。当主节点宕机时,可以从从节点中选取一个新的主节点,供生产者和消费者存取。

那么在极端情况下,可能会发生所有的服务器下线,这就导致所有消息以及消息副本的丢失,所以必须要考虑消息的持久化操作,将消息持久化就是不能将数据存放在断电就消失的内存中,而要存放在硬盘中,所有要将broker中的数据存放在硬盘中,系统长久的运行会产生很多的消息,磁盘空间再大也会有打满的一天,所以要采取一些淘汰策略,将使用过的消息并且今后可能不会再使用的消息永久删除。但是由谁来判断broker的运行状态呢,如果主broker宕机了,由谁来决定选举哪一个broker为新主节点呢?组件一旦多了,就需要一个管理员,就像redis集群中的哨兵节点,kafka的管理员是ZooKeeper,ZooKeeper会定期和kafka集群进行通信,获取kafka 集群的状态

RocketMQ
RoketMQ消息队列和Kafka的区别?
RocketMQ 也是将消息分类,然后消费者去订阅 topic进行消费,RocketMQ参考了Kafka的设计思想,但是在架构上做了一些调整,在功能上做了一些增加。
kafka通过创建多个 topic 对消息分类,为了提高每个topic 的并发性能,将每个topic 分成多个partition,为了防止系统单点故障以及拓展系统的性能,将partition放在不同的 broker 上,为了防止broker故障导致消息丢失,创建broker副本机制,并且使用 ZooKeeper节点协调kafka集群的多个节点。
ZooKeeper是通用的分布式协调服务,功能较多RocketMQ 使用 Nameserver管理分布式节点
kafka的partition中存储的是完整的消息,而RocketMQ的 partition中存储的是消息的简要信息,例如消息的偏移 offset,消息的完整信息存放在commitLog中 ,通过 offset定位commitLog中的消息。kafka读取消息只需要读取一次,而RocketMQ需要读取两次

kafka的每一个partition对应一个文件,文件内部是顺序写,但是 kafka集群中Partion较多,每个 partition的文件随机分布在磁盘的任何位置上,这就导致kafka写文件的会退化成随机写,写性能下降。而RocketMQ将多个topic下的数据全部写到一个文件中CommitLog文件中。
kafaka创建多个topic对消息进行分类,RocketMQ不仅支持创建Topic对消息进行分配,还支对消息打上标签。

RocketMQ支持延时队列,kafka实现延时队列需要程序员自己实现
RocketMQ 支持死信队列,当消息多次发送失败的时候,就可以将这个消息存放到死信队列中,而kafka不支持这个队列。
为什么kafka的性能高于RocketMQ?
0 CPU 拷贝
kafka的性能优于RocketMQ 的主要原因之一是kafka使用了0拷贝技术。kafka使用了sendfile技术, RocketMQ使用 mmap+write技术
文件发送的一般方式
在说明0拷贝技术性能之前先说明普通发送文件的过程:消息队列将消息持久化到硬盘,将硬盘上的消息发送出去需要用户态调用read()和write()函数,read 和 write 是操作系统提供的系统函数,用户态每调用一个系统函数需要经历2次上下文切换,第一次是从用户态陷入内核,第二次是从内核态返回内存中。
想要将消息发送出去,用户态需要将消息从硬盘中读出来,访问硬盘操作不能让用户态来做,只能让操作系统帮助,用户态调用 read() 函数陷入内核,操作系统将磁盘中的数据读到内核缓冲区,这个搬运过程需要DMA完成,不需要CPU操作,然后将内核缓冲区中的数据拷贝到用户缓冲区中,这个操作需要CPU完成,read()调用返回后,从内核态回到用户态;然后用户态调用 write()函数将消息拷贝到socket 缓冲区发送出去, 从用户态又陷入内核中,这个拷贝操作是由CPU完成的,然后操作系统将socket缓冲区中的数据拷贝到网卡的内存中发送出去,这个拷贝操作是由DMA完成的。借用一下小林coding对零拷贝的图解
总之,消息发送的最终结果就是将磁盘中的数据拷贝到网卡中发送出去,但是将内核缓冲区中的数据拷贝到用户态然后从用户态又拷贝到内核缓冲区中,这个搬来搬去的操作实际上没有什么用处,因为数据拷贝到用户缓冲区中并不会被用户更改,多次的上下文操作和数据拷贝操作,会增加耗时,降低性能。

0拷贝
0拷贝并不是指没有经过任何一次的拷贝,而是CPU不参与的拷贝,是0 CPU拷贝
0拷贝有两种方式:mmap+write和sendfile方式
mmap+write方式
mmap是内存映射,将内核空间映射到了用户空间,避免了一次将内核空间数据拷贝到内核空间的操作,但是mmap和write都是系统调用,调用它们都需要再经过4次的上下文切换。相比较与一般的文件发送方式,mmap+write方式减少了一次拷贝。借用小林coding的图解

sendfile方式
sendfile方式是将内核缓冲区的中的数据直接拷贝到网卡中发送出去,相比较于一般的文件发送方式可以减少2次拷贝操作,2次上下文切换。借用小林coding的图解

为什么kafka使用sendfile, rocketMQ使用mmap,RocketMQ为什么不能使用 sendfile 呢?
区别这两种方式之前先了解一个sendfile和mmap函数的区别
1 | sendfile |
1 | mmap |
sendfile 函数返回的是发送出去多少字节,不管发送出去的信息内容是什么,而mmap返回的是消息的具体内容,应用层在接收到这些消息可以做自己的逻辑处理,RocketMQ 功能中存在死信队列,死信队列就是那些总是发送不出去的消息存放的队列,所以如果不使用 mmap 返回消息内容的话,就无法获知发送不出去的消息将它们存在死信队列中。而kafka没有死信队列机制,所以它使用sendfile返回发送出去多少消息就可以了。
顺序写
kafka的每个partion对应一个文件,写文件时就是在追加文件,写入磁盘操作,顺序写的性能是优于随机写的
RabbitMQ
传统的消息队列,消息模型是基于队列的,一个消息只能被一个消费者处理,吞吐量较低,RabbitMQ是当前主流消息中间件之一。
RabbitMQ 可以根据消息类型创建多个队列并命名这个队列,生产者向命名队列发送消息,消费者通过命名队列获取消息
RabbitMQ 采用的是传统的队列模型,生产者将消息放入队列,消费者从队列中消费消息
Rabbit的消息模型既支持队列,也支持消息交换器
消息交换器 Exchange模型
生产者有时候需要将消息发送给一个队列,有时候需要将消息发送给多个队列,RabiitMQ 定制了一个消息路由分发策略组件Exchange,也就是在Exchange 中绑定队列
消息交换器Exchange:生产者可以把消息发布到消息交换器上而不用知道这些消息都有哪些消费者。每一个订阅了交换器的消费者都会创建一个队列,消息交换器会把生产的消息放入队列中。消息交换器也可以基于各种路由规则为订阅者过滤消息。

RabbitMQ中实现了多种队列,比如优先级队列、死信队列
实现优先级队列本质上是通过生产者给消息打上标签,然后通过大顶堆或者小顶堆实现的
RabbitMQ 架构
RabbitMQ 实际上就是消息分发器 Exchange和多个队列组成,可以通过创建多个 RabbitMQ 实例构建集群实现系统的高可用和高可靠,在不同的服务器上各部署一个 RabbitMQ实例

普通集群模式
普通集群模式中,每个RabbitMQ 都是一个完整的实例,都可以进行读写,集群中每个RabbitMQ 中的Exchange都会进行数据同步,但是不会同步 RabbitMQ中的队列的数据
写操作:将消息写入某个broker的队列中,并不会将发生变化的队列同步到其他broker中的队列中;如果写某个broker中的Exchange,那么会将该Exchange同步到集群中所有其它的broker的Exchange中


**读操作:**如果访问的消息队列信息正好在正在访问的broker中的Exchange中存在,直接返回消息,否则访问该broker中的Exchange,通过Exchange定位到其他的broker中获取队列消息


所以可以通过增加broker的数量提高系统的吞吐量和扩展性,但是因为不存在消息副本,会存在一定的风险,当某个broker宕机了,那么这个broker上的所有消息都会丢失,即使每个broker上的Exchange数据一样,也不能通过访问其它的broker上的Exchange去定位要找的数据了
镜像队列集群
在普通集群模式的基础上,在broker上同步其它broker上的队列,主queue用来读写,从queue来同步主queue,所以从queue也就叫镜像队列,当主queue所在的broker挂了时,从queue就可以上升为主queue。
写操作:将Exchange和queue数据在其他的broker上同 步
读操作:消费者若访问的是主queue所在的broker,则直接返回数据,否则当前broker会从主queue所在的broker上读数据,再由当前的broker返回给消费者

镜像集群的优点是让系统变的高可用,缺点是,当队列越来越多时,同步操作越来越多,占用较大的带宽。
Quorum队列集群
RabbitMQ使用一致性算法 Raft 同步broker集群中的Exchange数据和队列数据,通过选举机制选择主节点。它和kafka的ZooKeeper的区别是,RabbitMQ不需要一个中心节点去协调集群中的broker节点,而是通过集群中的broker节点本身完成数据的同步和选举。