0%

如何用Redis实现消息队列

什么是消息队列(MQ)

顾名思义,将消息以队列的形式缓存,生产者(producer)和消费者(consumer)分别使用队列的两个端口进行消息的收发

核心能力

消息队列有两个核心能力:解耦和削峰

  • 解耦:

    如果没有消息队列,那么在一次业务流程中,上游的http或者rpc发出请求之后,下游的消费者需要马上进行反馈,否则整个流程就会一直阻塞,这即浪费时间,又浪费CPU性能

    因此我们需要一个能够持久化存放请求的容器,这就是消息队列的解耦——将生产者和消费者的应用进行解耦

  • 削峰

    如果在某一时刻,生产者给消费者生成了大量的请求,而消费者无法一次性全部消费,那么会产生消息的丢失

    而有了消息队列,就可以分批次处理数量过多的请求

基础要求

作为消息队列组件,需要满足一些要求:

  • 消息不丢失

    很好理解,毕竟如果消息易失,那么有没有消息队列就都没太大区别了,这一方面可以分为三个部分来看待:

    • 生产者将消息投递到MQ的时候不出现丢失
    • 消息存放在MQ时不丢失
    • 消费者从MQ消费消息时不出现丢失

    针对第二点,各个MQ组件基本都是基于数据落盘+数据备份的方式来完成的

    而对于第一第三点,则是通过两个交互环节中的ack机制保证的,譬如生产者向MQ中投递消息,如果没有收到MQ的ack返回确认,那么生产者就应当一直投递这个消息给MQ;另一方面,消费者也需要避免接收重复的消息,所以对于下游的消费者,同样需要具备消息幂等去重的能力

  • 支持消息存储

    就像前面提到的,MQ至少需要支持一定规模的数据的存放,而且这种存放需要持久性,能够让消费者自由选择时间进行消费操作

流程类型

根据消费者的消费流程,MQ可以被分为两种类型:

  • Push型:

    指当生产者将消息投递到MQ时,由MQ主动将消息以推送的方式发送给各个订阅了的消费者

  • Pull型:

    当MQ中存在消息时,由消费者主动执行拉取消息的操作来获取消息

两种类型各有优劣,实际操作中需要按需取舍

如何用Redis实现消息队列

Redis虽然是一种非关系型数据库,但是其部分数据结构是能够支持实现消息队列组件的

可能存在的问题

首先需要指出可能存在的一些问题

  • 存储昂贵

    由于Redis是基于内存实现的缓存中间件,所以存储消息容量的限制比较大

  • 数据丢失

    • 由于Redis是基于内存实现的缓存中间件,所以不可避免地会产生丢失数据的风险(比如断电和宕机),虽然有rdb/aof这种持久化机制,但是无法做到百分百安全

    • 此外,Redis走的是ap高可用流派,数据的主从复制流程是异步的,主从切换时数据存在弱一致的问题

Redis List

具体方法

一种比较容易想到的思路就是使用Redis的List结构,这是一个双向链表,天然契合MQ的队列模型,只需要使用LPUSH和RPOP进行消息的投递和读取即可

这种方法的缺点也是显而易见的:如果生产者生产消息的速度赶不上消费者的消费速度,那么消费者使用RPOP拉取消息的时候就会立刻返回空值nil,也就是说,需要消费者不断轮询地访问,这种高频的自旋对于CPU是一种无用的损耗;另一方面,如果让消费者每次轮询之后休眠一段时间,那么可能会导致消息处理不及时,也是我们不希望看到的情况

最理想的方案是:在List中有数据到达时,消费者马上能够意识到,并且处理数据,此外始终保持睡眠状态,也就是阻塞态

因此,我们可以使用Redis中的BRPOP指令来代替RPOP,即可弥补上述的缺点

1
BRPOP my_topic 0

其中topic后面的数字代表阻塞等待时长,达到此阈值仍未获取数据时会返回nil;如果设置为 0 ,则代表没有这个超时限制

局限性分析

尽管解决了阻塞问题,List仍然不能算是一个合格的消息队列组件,原因如下:

  • 无法支持发布/订阅模式

    显然List的消费者和生产者是一对一的,因为数据在POP出去之后就不复存在,只此一份,如果我们有多个消费者,每个都需要消费者所生产的数据,那么List就束手无策了

  • 无法支持消费端ack机制

    当消费者出现了宕机等意外,没有一种有效的手段告诉MQ消息处理失败的反馈,在这种情况下,一旦数据POP,就真的完全丢失了

Redis pub/sub

为了解决无法支持发布/订阅模式的问题,Redis提供了pub/sub机制,全称为publisher/subscriber

具体方法

pub/sub模式会在两者之间建立一个用于实时通信的信道channel,在传递消息时,会根据channel查找到所有建立订阅关系的subscriber,一一传送消息

操作指令为:

发布者:publish topic_name message

比如:publish my_new_message 今天天气怎么样

订阅者:subscribe topic_name

订阅者会使用阻塞模式进行监听,解决了List方法中的CPU浪费问题

这里解释一下背后的原理:

  • 首先,消费方 subscriber 通过 subscribe 指令建立和指定 channel 之间的订阅关系. 这时在 redis 中会维护好 channel 和对应 subscriber 列表的映射关系,并在内存中为每个在线活跃的 subscriber 分配好一个缓冲区 buffer,用以承载后续到来的消息数据
  • 接下来随着 publisher 执行 publish 指令,往对应 channel 中投递消息后,此时 redis 会实时查看 channel 对应 subscriber 名单,往每个 subscriber 的缓冲区 buffer 中推送这条数据
  • 各执行了 subscribe 指令的 subscriber 会处于阻塞监听缓冲区 buffer 的状态,随着新数据到达,subscriber 会获取到这笔数据

基于这个流程,我们能看出来,pub/sub 对于 channel 以及 subscribers 之间的实时映射关系存在强依赖. 因此在操作的执行顺序上,我们需要保证先执行 subscribe 指令,再执行 publish 执行,否则前几笔 publish 投递的数据就会因为不存在 subscriber 而被直接丢弃

优缺点分析

pub/sub模式最大的优点就是实现了发布/订阅能力,然而其缺点也很明显:关于消息丢失的处理

  • 缺乏ack机制:

    与List相同,没有ack意味着pub/sub模式依然没有办法提醒发布者消息处理的成功与否,无法执行消息的重放

  • 缺乏消息储存能力

    Redis的pub/sub模式相当于golang中的无缓冲型channel,仅仅是维护了channel和subscribers之间的映射关系,每当消息来临,不会停留在channel中,而是直接送往映射的buffer中,所以会出现以下问题:

    • subscriber 宕机:倘若某个 subscriber 中途宕机,则会被踢出名单,在恢复前的这段时间内,到达的消息都会彻底与这个 subscriber 无缘
    • Redis 宕机:每条 publish 的消息都会第一时间分发到 subscriber 对应的内存缓冲区中,而这个缓冲区是完全基于内存实现的易失性存储,一旦 Redis 服务端宕机,缓冲区中的数据就完全丢失且不可恢复了;此外,pub/sub 模式下的消息数据不属于 Redis 中的基本数据类型,因此 redis 中的持久化机制 rdb 和 aof 对于 pub/sub 中的数据是完全不生效的,数据丢失的可能性大幅度提高
    • subscriber消息积压:由于消息数据会被放在 Redis 侧各 subscriber 的缓冲区 buffer 中,这部分空间是相对有限的,一旦某个 subscriber 因为消费能力弱,导致 buffer 中的的数据发生积压,此时 Redis 很可能会自动把 subscriber 踢除下线,于是这部分数据也丢失了

    对于最后这一点,可以在redis.conf文件中配置:

    1
    client-output-buffer-limit pubsub 32mb 8mb 60s

    对应的含义是,倘若某个 subscriber 的缓冲区 buffer 大小达到 32MB,则 subscriber 会被踢下线;倘若缓冲区内数据量在连续 60s 内达到 8MB 大小,subscriber 也会踢下线

Redis Streams

操作指令

首先需要介绍一下几个核心的操作指令,所有指令都可以在官方文档中找到:

  • 生产消息:

    使用该指令可以向topic中投放一组键值对消息

    1
    2
    127.0.0.1:6379> XADD my_streams_topic * key1 value1
    "1710412272535-0"
    • My_streams_topic:topic名称
    • *:表示该消息自动生成唯一标识id,基于时间戳+自增序列号生成
    • Key1、value1:输入的键值对
  • 消费消息:

    使用该指令可以从对应的topic中获取消息

    1
    2
    3
    4
    5
    127.0.0.1:6379> xread [BLOCK] [Time] streams my_streams_topic 0-0
    1) 1) "my_streams_topic"
    2) 1) 1) "1710412272535-0"
    2) 1) "key1"
    2) "value1"
    • BLOCK:表示是否使用阻塞消费模式
    • Time:如果加入BLOCK参数,那么此处需要填写time表示阻塞等待时间,超过这个时间就会返回nil;设置为0表示不设置超时阈值
    • streams:表示从一个streams对象读取消息
    • my_streams_topic:topic名称
    • 0-0:表示从头开始消费;这里如果填写的是某条消息的id的话,就会从这条消息之后开始消费

此外streams支持发布/订阅模式,可以保证消息被多个消费者组访问

  • 创建消费者组:

    1
    2
    127.0.0.1:6379> XGROUP CREATE my_streams_topic my_group 0-0
    OK
    • my_streams_topic:topic 名称
    • my_group:消费者组名称
    • 0-0:从头开始消费
  • 基于消费者组消费信息:

    同一份数据在同一个消费者组下只会被消费到一次. 不同消费者组各自能获取到独立完整的消息数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    127.0.0.1:6379> XREADGROUP GROUP my_group consumer1 BLOCK 0 STREAMS my_streams_topic >
    1) 1) "my_streams_topic"
    2) 1) 1) "1710412272535-0"
    2) 1) "key1"
    2) "value1"
    2) 1) "1710413025712-0"
    2) 1) "key2"
    2) "value2"
    3) 1) "1710413031719-0"
    2) 1) "key3"
    2) "value3"
    4) 1) "1710413036009-0"
    2) 1) "key4"
    2) "value4"
    • my_group: 消费者组名称
    • Consumer1:消费者名称
    • my_streams_topic:topic 名称
    • BLOCK 0: 采用阻塞等待的模式,0 代表没有超时上限
    • >:读最新的消息 (尚未分配给某个 consumer 的消息)

    此外还有一种消费模式,读取的是已经分配给当前消费者,但是还未经确认的旧消息:

    1
    2
    3
    4
    5
    127.0.0.1:6379> XREADGROUP GROUP my_group consumer1 BLOCK 0 STREAMS my_streams_topic 0-0
    1) 1) "my_streams_topic"
    2) 1) 1) "1710413250364-0"
    2) 1) "key5"
    2) "value5"
    • 0-0:标识读取已分配给当前 consumer ,但是还没经过 xack 指令确认的消息

    > 与 0-0,两者之间的区别在于,“>”读取新消息,“0-0”读取旧消息

  • 确认消息:

    通过 xack 指令,携带上消费者组、topic 名称以及消息 id,能够完成对某条消息的确认操作

    1
    2
    127.0.0.1:6379> XACK my_streams_topic my_group 1710413250364-0
    (integer) 1
    • my_streams_topic:topic 名称
    • my_group:消费者组名称
    • 1710413250364-0:消息 id

优缺点分析

首先是最明显的优点:

  • 支持发布/订阅模式

    Redis Streams 引入了消费者组 group 的概念,因此是能够保证各个消费者组 consumer group 均能够获取到一份独立而完整的消息数据

  • 数据可持久化

    Redis 中的 streams 和 string、list 等数据类型一样,都能够通过 rdb( redis database)、aof( append only file) 的持久化机制进行落盘存储,能够在很大程度上降低数据丢失的概率

  • 支持消费端 ack 机制

    Redis Streams 中另一项非常重要的改进,是支持 consumer 的 ack 能力,consumer 在处理好某条消息后,能通过 xack 指令对该消息进行确认。这样对于没经过 ack 确认的消息,Redis Streams 还是为 consumer 保留了重新消费的能力

  • 支持消息缓存

    和 pub/sub 模式不同的是,Redis Streams 中会实际开辟内存空间用于存储 Streams 中的数据,因此哪怕某个 consumer group 是在消息生产之后才完成注册操作,也能够进行消息溯源,从 topic 起点开始执行消息的消费操作

    然而由于Redis是基于内存实现的存储,因此如果消息量过于庞大,可能会造成很大的资源压力甚至out of memory。因此,可以在XADD指令中加上maxlen,显式地设定topic中能缓存的数据长度

    1
    XADD my_topic MAXLEN 10000 * key1 value1
    • 最多缓存10000条数据

整体对比

现在对Redis实现MQ的各个方法做个比较:

MQ 实现方案 发布/订阅能力 消费端ACK机制 消息缓存能力 数据丢失风险
List 不支持 不支持 支持
pub/sub 支持 不支持 不支持
Streams 支持 支持 支持

可以看到,在各项能力上 List 和 pub/sub 互有千秋,而 Streams 可以说是兼具了各方面的优势,称得上是已经趋近于成熟的MQ实现方案

下面我们再进一步拿 Redis Streams 和业界专业的 MQ 组件进行对比

MQ组件 消息存储介质 消息分区/并发能力 数据丢失风险 运维成本
Redis Streams 内存 不支持
Kafka 磁盘 支持 理论上不存在 偏高

由于Redis Streams在存储上需要使用内存,因此消息存储容量相对有限;且同一个 topic 的数据由于对应为同一个 key,因此会被分发到相同节点,无法实现数据的纵向分治,因此不具备类似于 kafka 纵向分区以提高并发度的能力

因此使用Redis作为MQ的主要优势就在于运维成本低,如果在实际的业务流程中,对于数据的精度没有特别高的要求,那么使用Redis Streams这种轻量化的MQ方案不失为一种好的选择