什么是消息队列(MQ)
顾名思义,将消息以队列的形式缓存,生产者(producer)和消费者(consumer)分别使用队列的两个端口进行消息的收发
核心能力
消息队列有两个核心能力:解耦和削峰
解耦:
如果没有消息队列,那么在一次业务流程中,上游的http或者rpc发出请求之后,下游的消费者需要马上进行反馈,否则整个流程就会一直阻塞,这即浪费时间,又浪费CPU性能
因此我们需要一个能够持久化存放请求的容器,这就是消息队列的解耦——将生产者和消费者的应用进行解耦
削峰
如果在某一时刻,生产者给消费者生成了大量的请求,而消费者无法一次性全部消费,那么会产生消息的丢失
而有了消息队列,就可以分批次处理数量过多的请求
基础要求
作为消息队列组件,需要满足一些要求:
消息不丢失
很好理解,毕竟如果消息易失,那么有没有消息队列就都没太大区别了,这一方面可以分为三个部分来看待:
- 生产者将消息投递到MQ的时候不出现丢失
- 消息存放在MQ时不丢失
- 消费者从MQ消费消息时不出现丢失
针对第二点,各个MQ组件基本都是基于数据落盘+数据备份的方式来完成的
而对于第一第三点,则是通过两个交互环节中的ack机制保证的,譬如生产者向MQ中投递消息,如果没有收到MQ的ack返回确认,那么生产者就应当一直投递这个消息给MQ;另一方面,消费者也需要避免接收重复的消息,所以对于下游的消费者,同样需要具备消息幂等去重的能力
支持消息存储
就像前面提到的,MQ至少需要支持一定规模的数据的存放,而且这种存放需要持久性,能够让消费者自由选择时间进行消费操作
流程类型
根据消费者的消费流程,MQ可以被分为两种类型:
Push型:
指当生产者将消息投递到MQ时,由MQ主动将消息以推送的方式发送给各个订阅了的消费者
Pull型:
当MQ中存在消息时,由消费者主动执行拉取消息的操作来获取消息
两种类型各有优劣,实际操作中需要按需取舍
如何用Redis实现消息队列
Redis虽然是一种非关系型数据库,但是其部分数据结构是能够支持实现消息队列组件的
可能存在的问题
首先需要指出可能存在的一些问题
存储昂贵
由于Redis是基于内存实现的缓存中间件,所以存储消息容量的限制比较大
数据丢失
由于Redis是基于内存实现的缓存中间件,所以不可避免地会产生丢失数据的风险(比如断电和宕机),虽然有rdb/aof这种持久化机制,但是无法做到百分百安全
此外,Redis走的是ap高可用流派,数据的主从复制流程是异步的,主从切换时数据存在弱一致的问题
Redis List
具体方法
一种比较容易想到的思路就是使用Redis的List结构,这是一个双向链表,天然契合MQ的队列模型,只需要使用LPUSH和RPOP进行消息的投递和读取即可
这种方法的缺点也是显而易见的:如果生产者生产消息的速度赶不上消费者的消费速度,那么消费者使用RPOP拉取消息的时候就会立刻返回空值nil,也就是说,需要消费者不断轮询地访问,这种高频的自旋对于CPU是一种无用的损耗;另一方面,如果让消费者每次轮询之后休眠一段时间,那么可能会导致消息处理不及时,也是我们不希望看到的情况
最理想的方案是:在List中有数据到达时,消费者马上能够意识到,并且处理数据,此外始终保持睡眠状态,也就是阻塞态
因此,我们可以使用Redis中的BRPOP指令来代替RPOP,即可弥补上述的缺点
1 | BRPOP my_topic 0 |
其中topic后面的数字代表阻塞等待时长,达到此阈值仍未获取数据时会返回nil;如果设置为 0 ,则代表没有这个超时限制
局限性分析
尽管解决了阻塞问题,List仍然不能算是一个合格的消息队列组件,原因如下:
无法支持发布/订阅模式
显然List的消费者和生产者是一对一的,因为数据在POP出去之后就不复存在,只此一份,如果我们有多个消费者,每个都需要消费者所生产的数据,那么List就束手无策了
无法支持消费端ack机制
当消费者出现了宕机等意外,没有一种有效的手段告诉MQ消息处理失败的反馈,在这种情况下,一旦数据POP,就真的完全丢失了
Redis pub/sub
为了解决无法支持发布/订阅模式的问题,Redis提供了pub/sub机制,全称为publisher/subscriber
具体方法
pub/sub模式会在两者之间建立一个用于实时通信的信道channel,在传递消息时,会根据channel查找到所有建立订阅关系的subscriber,一一传送消息
操作指令为:
发布者:publish topic_name message
比如:publish my_new_message 今天天气怎么样
订阅者:subscribe topic_name
订阅者会使用阻塞模式进行监听,解决了List方法中的CPU浪费问题
这里解释一下背后的原理:
- 首先,消费方 subscriber 通过 subscribe 指令建立和指定 channel 之间的订阅关系. 这时在 redis 中会维护好 channel 和对应 subscriber 列表的映射关系,并在内存中为每个在线活跃的 subscriber 分配好一个缓冲区 buffer,用以承载后续到来的消息数据
- 接下来随着 publisher 执行 publish 指令,往对应 channel 中投递消息后,此时 redis 会实时查看 channel 对应 subscriber 名单,往每个 subscriber 的缓冲区 buffer 中推送这条数据
- 各执行了 subscribe 指令的 subscriber 会处于阻塞监听缓冲区 buffer 的状态,随着新数据到达,subscriber 会获取到这笔数据
基于这个流程,我们能看出来,pub/sub 对于 channel 以及 subscribers 之间的实时映射关系存在强依赖. 因此在操作的执行顺序上,我们需要保证先执行 subscribe 指令,再执行 publish 执行,否则前几笔 publish 投递的数据就会因为不存在 subscriber 而被直接丢弃
优缺点分析
pub/sub模式最大的优点就是实现了发布/订阅能力,然而其缺点也很明显:关于消息丢失的处理
缺乏ack机制:
与List相同,没有ack意味着pub/sub模式依然没有办法提醒发布者消息处理的成功与否,无法执行消息的重放
缺乏消息储存能力
Redis的pub/sub模式相当于golang中的无缓冲型channel,仅仅是维护了channel和subscribers之间的映射关系,每当消息来临,不会停留在channel中,而是直接送往映射的buffer中,所以会出现以下问题:
- subscriber 宕机:倘若某个 subscriber 中途宕机,则会被踢出名单,在恢复前的这段时间内,到达的消息都会彻底与这个 subscriber 无缘
- Redis 宕机:每条 publish 的消息都会第一时间分发到 subscriber 对应的内存缓冲区中,而这个缓冲区是完全基于内存实现的易失性存储,一旦 Redis 服务端宕机,缓冲区中的数据就完全丢失且不可恢复了;此外,pub/sub 模式下的消息数据不属于 Redis 中的基本数据类型,因此 redis 中的持久化机制 rdb 和 aof 对于 pub/sub 中的数据是完全不生效的,数据丢失的可能性大幅度提高
- subscriber消息积压:由于消息数据会被放在 Redis 侧各 subscriber 的缓冲区 buffer 中,这部分空间是相对有限的,一旦某个 subscriber 因为消费能力弱,导致 buffer 中的的数据发生积压,此时 Redis 很可能会自动把 subscriber 踢除下线,于是这部分数据也丢失了
对于最后这一点,可以在redis.conf文件中配置:
1
client-output-buffer-limit pubsub 32mb 8mb 60s
对应的含义是,倘若某个 subscriber 的缓冲区 buffer 大小达到 32MB,则 subscriber 会被踢下线;倘若缓冲区内数据量在连续 60s 内达到 8MB 大小,subscriber 也会踢下线
Redis Streams
操作指令
首先需要介绍一下几个核心的操作指令,所有指令都可以在官方文档中找到:
生产消息:
使用该指令可以向topic中投放一组键值对消息
1
2127.0.0.1:6379> XADD my_streams_topic * key1 value1
"1710412272535-0"- My_streams_topic:topic名称
- *:表示该消息自动生成唯一标识id,基于时间戳+自增序列号生成
- Key1、value1:输入的键值对
消费消息:
使用该指令可以从对应的topic中获取消息
1
2
3
4
5127.0.0.1:6379> xread [BLOCK] [Time] streams my_streams_topic 0-0
1) 1) "my_streams_topic"
2) 1) 1) "1710412272535-0"
2) 1) "key1"
2) "value1"- BLOCK:表示是否使用阻塞消费模式
- Time:如果加入BLOCK参数,那么此处需要填写time表示阻塞等待时间,超过这个时间就会返回nil;设置为0表示不设置超时阈值
- streams:表示从一个streams对象读取消息
- my_streams_topic:topic名称
- 0-0:表示从头开始消费;这里如果填写的是某条消息的id的话,就会从这条消息之后开始消费
此外streams支持发布/订阅模式,可以保证消息被多个消费者组访问
创建消费者组:
1
2127.0.0.1:6379> XGROUP CREATE my_streams_topic my_group 0-0
OK- my_streams_topic:topic 名称
- my_group:消费者组名称
- 0-0:从头开始消费
基于消费者组消费信息:
同一份数据在同一个消费者组下只会被消费到一次. 不同消费者组各自能获取到独立完整的消息数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14127.0.0.1:6379> XREADGROUP GROUP my_group consumer1 BLOCK 0 STREAMS my_streams_topic >
1) 1) "my_streams_topic"
2) 1) 1) "1710412272535-0"
2) 1) "key1"
2) "value1"
2) 1) "1710413025712-0"
2) 1) "key2"
2) "value2"
3) 1) "1710413031719-0"
2) 1) "key3"
2) "value3"
4) 1) "1710413036009-0"
2) 1) "key4"
2) "value4"- my_group: 消费者组名称
- Consumer1:消费者名称
- my_streams_topic:topic 名称
- BLOCK 0: 采用阻塞等待的模式,0 代表没有超时上限
- >:读最新的消息 (尚未分配给某个 consumer 的消息)
此外还有一种消费模式,读取的是已经分配给当前消费者,但是还未经确认的旧消息:
1
2
3
4
5127.0.0.1:6379> XREADGROUP GROUP my_group consumer1 BLOCK 0 STREAMS my_streams_topic 0-0
1) 1) "my_streams_topic"
2) 1) 1) "1710413250364-0"
2) 1) "key5"
2) "value5"- 0-0:标识读取已分配给当前 consumer ,但是还没经过 xack 指令确认的消息
> 与 0-0,两者之间的区别在于,“>”读取新消息,“0-0”读取旧消息
确认消息:
通过 xack 指令,携带上消费者组、topic 名称以及消息 id,能够完成对某条消息的确认操作
1
2127.0.0.1:6379> XACK my_streams_topic my_group 1710413250364-0
(integer) 1- my_streams_topic:topic 名称
- my_group:消费者组名称
- 1710413250364-0:消息 id
优缺点分析
首先是最明显的优点:
支持发布/订阅模式
Redis Streams 引入了消费者组 group 的概念,因此是能够保证各个消费者组 consumer group 均能够获取到一份独立而完整的消息数据
数据可持久化
Redis 中的 streams 和 string、list 等数据类型一样,都能够通过 rdb( redis database)、aof( append only file) 的持久化机制进行落盘存储,能够在很大程度上降低数据丢失的概率
支持消费端 ack 机制
Redis Streams 中另一项非常重要的改进,是支持 consumer 的 ack 能力,consumer 在处理好某条消息后,能通过 xack 指令对该消息进行确认。这样对于没经过 ack 确认的消息,Redis Streams 还是为 consumer 保留了重新消费的能力
支持消息缓存
和 pub/sub 模式不同的是,Redis Streams 中会实际开辟内存空间用于存储 Streams 中的数据,因此哪怕某个 consumer group 是在消息生产之后才完成注册操作,也能够进行消息溯源,从 topic 起点开始执行消息的消费操作
然而由于Redis是基于内存实现的存储,因此如果消息量过于庞大,可能会造成很大的资源压力甚至out of memory。因此,可以在XADD指令中加上maxlen,显式地设定topic中能缓存的数据长度
1
XADD my_topic MAXLEN 10000 * key1 value1
- 最多缓存10000条数据
整体对比
现在对Redis实现MQ的各个方法做个比较:
| MQ 实现方案 | 发布/订阅能力 | 消费端ACK机制 | 消息缓存能力 | 数据丢失风险 |
|---|---|---|---|---|
| List | 不支持 | 不支持 | 支持 | 低 |
| pub/sub | 支持 | 不支持 | 不支持 | 高 |
| Streams | 支持 | 支持 | 支持 | 低 |
可以看到,在各项能力上 List 和 pub/sub 互有千秋,而 Streams 可以说是兼具了各方面的优势,称得上是已经趋近于成熟的MQ实现方案
下面我们再进一步拿 Redis Streams 和业界专业的 MQ 组件进行对比
| MQ组件 | 消息存储介质 | 消息分区/并发能力 | 数据丢失风险 | 运维成本 |
|---|---|---|---|---|
| Redis Streams | 内存 | 不支持 | 低 | 低 |
| Kafka | 磁盘 | 支持 | 理论上不存在 | 偏高 |
由于Redis Streams在存储上需要使用内存,因此消息存储容量相对有限;且同一个 topic 的数据由于对应为同一个 key,因此会被分发到相同节点,无法实现数据的纵向分治,因此不具备类似于 kafka 纵向分区以提高并发度的能力
因此使用Redis作为MQ的主要优势就在于运维成本低,如果在实际的业务流程中,对于数据的精度没有特别高的要求,那么使用Redis Streams这种轻量化的MQ方案不失为一种好的选择