消息队列:6.消息存储原理

消息队列必须保证消息存储的可靠性,这是消息队列可以做到异步、削峰填谷的基础。
本文将讲解消息队列是如何实现消息的存储的。

写入

硬盘

类似MySQL或Redis,RocketMQ的消息持久化也是直接存储到本地硬盘中的。
在面对削峰填谷的需求下,可靠性、性能、存储量都无比重要:消息不能丢失,消息得快速写入(但是不急着读取),并且需要存储大量消息。
硬盘非常适合满足上述需求:

  • 可靠性:可以通过设置RAID等级来保证数据可靠性。
  • 快速写入:硬盘支持顺序写入,能够快速写入大量数据。
  • 存储量:硬盘是存储量非常大的存储介质,要想超越硬盘只能使用云盘,但云盘的速度较慢。

为什么不使用类似MySQL或Redis的软件作为存储层?原因很简单,引入额外的软件作为中介会降低性能和可靠性。在削峰填谷、超高并发的场景下,减少中间层可以显著提升性能和可靠性。

commitlog

RocketMQ将消息存储到一个commitlog文件中,不同Topic的消息会被存储在同一个commitlog文件中,每个消息都会占据一行。
示意图:

将不同Topic的消息存在一块呢,是为了充足利用硬盘的顺序写的特性。虽然硬盘可能支持同时写入多个文件来保持顺序写入,但如果同时写入的文件数等于topic的数量,且Topic数量较多,就无法保证顺序写入(因为不同文件通常会存储在不同扇区,导致随机写入;而在硬盘空间充足的情况下,将不同topic的消息存储在同一个文件中可以确保顺序写入)。

读取

将不同的Topic存入同一个commitlog文件中,会带来一个问题:消费者如何快速找到commitlog里的消息呢?

消费队列(ConsumeQueue)

这里就要用到一个非常重要的概念:消费队列(ConsumeQueue)
当消费者获得一个消息时,其是从通过ConsumeQueue获得这个消息的具体地址;一个主题会包含多个消息队列,一个消息队列对应一个ConsumeQueue…

那么消息队列是如何获得消息的呢?自然是从commitlog中来的。RocketMQ会定时分发commitlog中的消息到ConsumeQueue(大概是1ms解析一条),分发程序会根据消息行中的信息,将消息的起始偏移量消息长度放入ConsumeQueue,对应到MessageQueue就是放入了一条消息;等到消费者读取这条消息时,就会再次访问硬盘获得该消息并发送到消费者。

为什么不直接在ConsumeQueue中存储消息,随机读取硬盘不是很慢吗?这是为了减少内存的空间使用,以时间换空间,分发消息没有那么高的时间性能要求,因此这里决定使用时间换空间。

示意图:

注意:ConsumeQueue也是需要持久化的,否则会丢失消息的位置。

读取流程

在之前的消息队列:5.发布订阅模式中,我们就提到过一个概念:offset,用来记录消费到哪个位置。该词的全称是consumerOffset,需要区别于前面提到过的消息存储的起始偏移量commitlog offset

消费者消费消息的流程

  1. 根据订阅关系,找到订阅的consumerQueue。
  2. 通过consumerOffset获得consumerQueue中对应的消息指针。
  3. 根据消息指针中的commitlog offset和size(或length)信息,访问硬盘,获得完整消息。

索引

一个消息中可以添加key属性,以此记录在索引表中,当需要检索该消息时快速找到。
该索引表文件,即为indexFile

indexFile的格式如下图所示:

给新消息建立索引的过程:

  1. 获得新消息的key。
  2. 计算新消息的keyhash=hash(key)。
  3. 取余,获得对应的槽位:slotpos=keyhash%500w。
  4. 查看该槽位是否被占用。
    • 如果未被占用,这将该消息的commitlog offset、size、keyhash信息存到一个空的index item位置,然后将该index item的下标放入slot。
  5. 如果被占用,查看该slot记录的index item的下标,寻找一个空的index item,将当前消息存入,并将冲突消息的下标存到当前index item的prevIndex字段,然后重置slot的下标为当前消息的index item的下标。(类似链表的头插法)。

索引消息的过程:

  1. 获得key。
  2. 计算keyhash。
  3. 计算slotpos。
  4. 查看该槽位记录下标的index item中的keyhash是否对应,如果不对应就看prevIndex字段对应的index item中的keyhash是否对应,一直找到对应的那个,然后返回。如果没有对应的,就是不存在。

这样的设计有些类似简单文件系统中superblock、inode table、data region的设计。

底层优化

操作系统会为每个进程提供虚拟内存,虚拟内存通过页表映射到物理内存。
在物理内存中,操作系统会留出一定内存用于运行内核程序,以此区分用户态和内核态的空间。
当进程处于用户态时,需要进行系统调用,才能进入内核态,并访问所有的内存空间和I/O设备。

初始情况

举例:Java读取文件并发送到网络(通过文件系统调用和网卡I/O)

按时间顺序说明其过程:

  1. 打开本地文件
    1. 在Java程序中,首先通过FileInputStreamFiles.newInputStream()打开磁盘上的目标文件,这一步通常发生在用户空间。
    2. Java运行时系统发出系统调用到操作系统内核,请求打开文件。
    3. 内核验证权限并找到文件,创建一个描述符(句柄),用于后续对该文件的读取操作。
  2. 读取文件数据
    1. 使用FileInputStreamread(byte[] buffer)方法从文件中读取数据,这是一个阻塞操作。
    2. 当调用read()时,内核开始从磁盘读取数据到内核空间的缓冲区(如果数据不在缓存中)。
    3. 数据一旦被内核读取到缓冲区,内核将其复制到用户空间提供的缓冲区中(即Java程序的byte[]数组)。
  3. 创建Socket连接
    1. 在Java程序中,创建一个Socket实例并连接到远程服务器地址。
    2. 这会触发TCP/IP协议栈的操作,通过系统调用进入内核,建立网络连接。
  4. 获取Socket输出流
    1. 调用Socket.getOutputStream()获取一个OutputStream实例,这个实例与套接字关联,用于向网络发送数据。
    2. 再次涉及到内核调用,因为实际的网络数据发送是由内核的TCP/IP协议栈处理的。
  5. 将文件数据写入Socket
    1. 循环读取文件输入流的数据,并调用OutputStreamwrite(byte[] buffer, int offset, int length)方法,将数据写入Socket。
    2. 每次调用write()方法时,Java运行时会将用户空间的数据复制到内核空间的网络缓冲区。
    3. 内核在适当时候负责将这些数据封装成网络帧并通过物理网络接口发送出去。

通过在内核空间建立文件的缓存,对于读写操作均有一定性能的优化:

  1. 读:再次发送读该文件时,可以直接返回缓存,不用读磁盘。
  2. 写:可将写保存到缓存中,再延迟批量写入,减少I/O调用次数。\

但是这样存在一个问题:如果仅进行转发(如消息队列将消息发送到消费者时),该操作拷贝此时过多,并且占用内存较大。因此,引入零拷贝进行优化。

零拷贝

sendFile

如果一个程序,对文件不做任何处理,就直接发送到网络,那么可以直接忽略将磁盘缓存区复制到用户空间再复制到网络缓冲区的过程,改为直接将内核空间中的磁盘缓存区复制到网络缓冲区,该方式通过sendFile(Linux的系统调用)实现。

在Java NIO(非阻塞I/O)库中,可以通过FileChannel.transferTo()方法实现这一优化:

  1. 打开文件通道:使用FileChannel.open()方法打开文件,并获取到FileChannel对象。
  2. 创建Socket通道:创建SocketChannel实例并与远程服务器建立连接。
  3. 使用transferTo()方法:直接调用FileChannel.transferTo(long position, long count, WritableByteChannel target)方法,这里的target参数就是之前创建的SocketChannel
    此时,内核可以直接将文件数据从磁盘缓存区复制到网络缓冲区,无需经过用户空间,从而实现了零拷贝。

DMA Gather

在某些现代操作系统和硬件支持下,我们可以进一步利用DMA(Direct Memory Access,直接内存访问)Gather特性对上述的零拷贝操作进行优化。
DMA允许硬件(如网卡)直接访问内存,而无需CPU的干预,这样可以减少CPU工作负载。DMA Gather则允许一次数据传输操作中,从内存中多个不连续的位置收集数据并发送到目标设备(例如网络接口)。
即文件从硬盘到内核空间的磁盘缓存区后,提供文件描述符和数据长度给网络缓存区,网卡即可使用DMA收集功能直接从磁盘缓存区中拷贝,进而减少了冲磁盘缓存区复制到网络缓冲区的操作。

对于Java应用程序来说,虽然不能直接控制DMA Gather的具体实现,但在底层操作系统和硬件的支持下,FileChannel.transferTo()方法调用可能已经隐式地利用了DMA Gather特性。当操作系统检测到数据需要从磁盘读取并发送到网络时,它可能会执行以下步骤:

  1. DMA读取:内核启动磁盘控制器的DMA读取功能,直接从磁盘读取数据到内核空间的缓冲区,无需CPU参与数据搬移。
  2. DMA Gather:网卡同样启用DMA模式,并在内核的协调下执行Gather操作,从内核空间中存放文件数据的不同缓冲区(通常是页缓存的不连续区域)收集数据,一次性将这些数据打包并发送到网络接口。

sendFile+DMA Gather的机制很好,但是存在一个问题,就是java程序是无法获得消息的实际内容的。如果要开启消息过滤,就需要获得消息的内容,因此引入另一种机制:mmap。

mmap

mmap(Memory Mapping)是一种操作系统提供的内存管理功能,它允许将文件或者其他对象的内容映射到进程的地址空间中,形成内存映射文件。通过mmap,进程可以直接像访问普通内存一样来访问文件内容,而无需每次都调用readwrite等系统调用来读写文件。

Java程序可以通过JNI或其他方式调用mmap()系统调用,将文件映射到进程的地址空间(用户空间)。此时,对文件的访问可以直接通过内存访问进行,避免了传统的read()系统调用,从而减少了内核空间到用户空间的一次数据复制。

mmap仅仅是将文件内容与虚拟内存添加了映射关系,实际上没有将文件从硬盘中加载到实际内存,因此需要做好缓存预热。

RocketMQ实际机制

写入磁盘机制:

  1. 异步刷盘:通过mmap,将写入操作写入系统的页缓存,等操作系统异步将修改过的页(脏页)写入到磁盘,可提升一点性能。
  2. 同步刷盘:通过mmap,将写入操作写入系统的页缓存,并立即刷盘,以保证消息不丢失。

刷盘:将数据从 page cache 写入到磁盘中。

读取磁盘并发送到网络的机制:

  1. 执行mmap,建立映射。(mmap操作后,物理内存中实际并没有分配资源,只有当进程访问到,发现内存中没数据才会进行缺页中断,分配资源,而这个缺页中断是系统调用,涉及上下文切换,比较耗费时间,因此进行内存预热)
  2. 文件预热:
    1. 将当前映射的文件每个页都写入0字节,以保证触发页中断,将文件从磁盘加载到缓存。
    2. 调用 mlockmadvise(MADV_WILLNEED),保证预热的页不会被页置换,并且建议进行页预读。
  3. write操作:当消费者拉取信息时,使用write从用户缓存写入到网络缓冲区,实际上由于mmap的原因,是直接从内核的磁盘缓存区write到网络缓存区。

示意图:

对比RocketMQ与Kafka

RocketMQ

RocketMQ采用多Topic混合存储一个文件的方式来保存消息,即一个CommitLog文件中会包含分给此Broker的所有消息,不论消息属于哪个Topic的哪个Queue。

然后再以Topic+队列维度,存储ConsumeQueue(一个MessageQueue对应一个ConsumeQueue文件)。消费者具体是通过ConsumeQueue得到消息的真实物理地址再去访问CommitLog获取消息的,所有ConsumeQueue可理解为消息的索引。

每条消息存储至commitlog,都会在对应的ConsumeQueue生成一条记录,因此这个索引也叫稠密索引

Kafka

分区(Partition)

Kafka和RocketMQ一样,Topic下也分了多个队列提高消费的并发度,但是在Kafka中不叫队列,叫分区(Partition)
Kafka是以Partition为单位来存储消息的:

每个Topic的每个分区都会拥有自己的消息文件,且对应会有索引文件(还有一个时间索引文件这里不多介绍),它们的文件名一样(以第一条消息的offset命名,后缀为.log和.index)。

稀疏索引

Kafka不会为每条消息都对应生成一个索引,而是每隔几条消息再创建一条索引,这样能节省存储空间,能在内存中保存更多的索引,这样的索引叫稀疏索引

索引逻辑:

  1. 首先通过 offset 找到对应的索引文件。
  2. 再通过二分法遍历索引文件找到离目标消息最近的索引。
  3. 再利用这个索引内容从消息文件找到最近这条消息的位置。
  4. 再从这个位置开始顺序遍历消息文件找到目标消息。

这样一次寻址的时间复杂度为$O(log_2n)+O(m)$,其中 n 为索引文件中的索引个数,m 为索引的稀疏程度。

Kafka选择 用更少的空间,就需要花费更多的时间;而 RocketMQ 用的时间更少则花了更多的空间。

冷热分区

其实这个 Kafka 索引的二分查找是经过工程优化冷热分区的二分查找。如果按照正常的二分查找,那么需要读取索引的头和尾内容,尾的内容是最新写入的,很有可能已经在 pageCache ,而头的内容可能是很久之前的,很大概率不在 pageCache 中,因此需要从磁盘加载读取到 pageCache。

而内存是有限的,操作系统会通过类 LRU 机制进行页替换,当内存不足,很有可能因为加载这些很久以前的数据,导致内存中一些最近的 pageCache 被置换到磁盘中,而最近的 pageCache 的消息正常而言是近期会被消费者读取消费的,但这些消息又被挤出了内存,这样一来会频繁触发页中断,对性能很不好。且按照一般的逻辑,消费者要拉取的消息肯定是在索引文件的尾部,也就是最近写入的,而不是时间久远的头部,从头部找意义不大。

因此 Kafka 给索引文件做了冷热分区,修改过的二分是先查热区的二分,如果查不到再从冷区开始,由于热区的数据本身都已经在 pageCache 中,因此对缓存友好,不会污染缓存,且很大可能性能找到对应的消息。


消息队列:6.消息存储原理
http://shoumingchilun.github.io/2024/03/13/技能/开发/消息队列/RocketMQ/rocket-storage/
作者
寿命齿轮
发布于
2024年3月13日
许可协议