最近看了《RocketMQ技术内幕》一书并看了下RocketMQ代码,从书中和源码中摘了一些核心东西存档。

源码结构

  • broker: broker模块
  • client: 消息客户端,包含生产者、消费者相关类
  • common: 公共包
  • dev: 开发者信息(非源码)
  • distribution: 部署实例文件夹(非源码)
  • example: 示例代码
  • filter:消息过滤相关基础类
  • logappender: 日志实现相关类
  • namesrv: NameServer实现相关类
  • openmessaging: 消息开放标准,制定中。
  • remoting: 远程通信模块,基于Netty
  • srvutil: 服务器工具类。
  • store: 消息存储实现相关类
  • style: checkstyle相关实现
  • test: 测试相关类
  • tools: 工具类,监控命令相关实现类

特点

  • 基于主题的发布与订阅模式
  • 不使用Zookeeper,自研NameServer,因为Topic路由信息无须在集群之间保持强一致,追求最终一致性,并且能容忍分钟级的不一致。NameServer集群之间互不通信。
  • 存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制。消息存储基于顺序写,同时兼顾消息消费与查找引入了消息消费队列文件与索引文件。
  • 某些工作下放给使用者,如只保证消息被消费者消费,但设计上允许消息被重复消费。

设计目标:

  • 架构模式:采用发布订阅模式,主要组件包括:消息发送者、消息服务器(消息存储)、消息消费、路由发现。
  • 顺序消息:严格保证消息有序。
  • 消息过滤:支持服务端与消费端消息过滤机制。
  • 消息存储:考虑消息堆积能力和消息存储性能,
  • 消息高可用性:同步刷盘机制可确保不丢失消息,异步刷盘模式会丢失少量消息。单点故障时该节点的消息全部丢失,开启异步复制机制可保证只丢失少量消息。
  • 消息到达(消费)低延迟:不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。
  • 确保消息必须被消费一次:通过ACK机制确保至少被消费一次,但有重复消费的可能。
  • 回溯消息:支持按时间回溯消息,精确到毫秒,可向前或向后回溯。
  • 消息堆积:默认保留3天
  • 定时消息:支持特定延迟级别的定时消息。(支持任意精度需要排序,有性能损耗)
  • 消息重试机制:消费时发送异常需要重新投递。

路由中心NameServer

保存的元数据

  • topicQueueTable: topic -> List 。Topic消息队列路由信息,发送时根据路由表进行负载均衡
    • brokerName
    • readQueueNums
    • writeQueueNums
    • perm
    • topicSynFlag
  • brokerAddrTable: brokerName -> BrokerData 。Broker基础信息,包含以下信息:
    • String cluster: 所属集群名称
    • String brokerName: broker名称
    • HashMap<Long(brokerId), String(broker address)> brokerAddrs: 准备broker地址,brokerId为0的为主。
  • clusterAddrTable: clusterName -> Set<String(brokerName)> 。 Broker集群信息,存储集群中所有Broker名称。
  • brokerLiveTable: brokerAddr -> BrokerLiveInfo 。 Broker状态信息,NameServer每次收到心跳包时会替换该信息。
    • long lastUpdateTimestamp
    • DataVersion dataVersion
    • Channel channel
    • String haServerAddr
  • filterServerTable: brokerAddr -> List<String(filterServer)> 。 Broker上的FilterServer列表,用于类模式过滤。

路由注册 & 心跳

  • 默认Broker每30s发送一次心跳【BrokerOuterAPI#registerBrokerAll】
  • NameServer收到心跳包后更新brokerLiveTable中的BrokerLiveInfo以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)【RouteInfoManager#registerBroker】
  • NameServer使用读写锁,可以并发读,但同一时刻只处理一个Broker心跳包。

路由删除

  • NameServer每10s扫描一次brokerLiveTable,如果BrokerLiveInfo的lastUpdateTimestamp的时间戳距当前时间超过120s,则移除Broker信息
  • Broker正常关闭的情况下,执行unregisterBroker指令

路由发现

  • 非实时,由客户端定时拉取【DefaultRequestProcessor#getRouteInfoByTopic】

消息发送

发送方式

  • 同步(sync):发现消息后,同步等待,直到消息服务器返回发送结果。
  • 异步(async):指定回调函数,回调在新的线程执行
  • 单向(oneway):只发送,不在乎消息是否成功存储在消息服务器上。

Message

基础属性

  • topic: String,主题topic
  • flag: int,消息flag,RocketMQ不做处理,定义在MessageSysFlag
    • COMPRESSED_FLAG
    • MULTI_TAGS_FLAG
    • TRANSACTION_NOT_TYPE
    • TRANSACTION_PREPARED_TYPE
    • TRANSACTION_COMMIT_TYPE
    • TRANSACTION_ROLLBACK_TYPE
  • properties: Map<String, String>, 拓展属性
    • TAGS:消息tag,用于消息过滤
    • KEYS:Message索引键,多个用空格隔开,根据key快速检索消息
    • WAIT:是否等消息存储完成后再返回
    • DELAY:消息延迟级别,用于定时消息或消息重试
  • body: byte[], 消息体
  • transactionId:String

MessageBatch

MessageBatch继承于Message,用于批量发送消息。把所有Message编码成一个byte[](MessageDecoder#encodeMessages),每个Message格式如下,之后的发送逻辑和单条发送一样。

关键类

  • DefaultMQProducer: 对外的API,提供send等方法。
  • DefaultMQProducerImpl: send的实际实现,DefaultMQProducer调用该类中的相应方法。
    • ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable: 保存了topic路由信息
  • MQClientInstance: 保存Broker等信息,同一个clientId只会创建一个,封装了网络处理API,是Producer、Consumer与NameServer、Broker打交道的网络通道。
    • ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable
    • ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable
    • ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable
    • ConcurrentMap<String/* group */, MQProducerInner> producerTable
    • ConcurrentMap<String/* group */, MQConsumerInner> consumerTable
    • ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable
  • MQClientAPIImpl: 与NameServer、Broker等通信的封装。
  • TopicPublishInfo: Client解析后的Topic信息,包含MessageQueue列表、是否有序、原始TopicRouteData等信息。
  • TopicRouteData: 从NameServer获取到的Topic路由信息。包含QueueData列表、BrokerData列表等。

消息发送流程

  1. 验证消息:长度等
  2. 查找Topic路由信息:本地不存在时从NameServer获取该topic路由信息(TopicRouteData)后转为TopicPublishInfo。
  3. 选择消息队列:从TopicPublishInfo中的MessageQueue列表挑选一个MessageQueue(MQFaultStrategy#selectOneMessageQueue)。
    • 所有的broker延迟信息都会被记录
    • 发送消息时会选择延迟最低的broker来发送
    • broker延迟过高会自动减少它的消息分配
  4. 发送
    1. 根据MessageQueue获取对应的Broker的地址。如果Broker信息不存在,则从NameServer更新topic的路由信息。
    2. 分配全局唯一ID,超过4k进行压缩。
    3. 调用注册的hook的sendMessageBefore方法
    4. 构造消息发送请求包
    5. 根据发送方式(同步、异步、单向)进行网络传输
    6. 调用注册的hook的sendMessageAfter方法

消息存储

存储的文件包括:

  • CommitLog:消息存储文件,所有Topic的消息都存储在CommitLog文件中。
  • ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到消息消费队列,供消息消费者消费。
  • IndexFile:消息索引文件,主要存储消息Key与Offset的对应关系。

其他:

  • 事务状态服务:存储每条消息的事务状态
  • 定时消息服务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度。

关键类:

  • DefaultMessageStore
  • CommitLog
  • ConsumeQueue
  • IndexService
  • MappedFile

存储文件、内容

CommitLog

默认存储位置为${ROCKET_HOME}/store/commitLog,每个文件默认1G,以该文件中第一个偏移量为文件名,补齐到20位。每个文件最少会空闲8个字节,高4位为当前文件剩余空间,低4位为magic number(0xcbd43194)。

单条数据结构:

  • TOTALSIZE: 4字节,消息总大小
  • MAGICCODE: 4字节,0xdaa320a7
  • BODYCRC: 4字节,消息体crc校验码
  • QUEUEID: 4字节,消息消费队列id
  • FLAG: 4字节,消息flag,RocketMQ不做处理,供应用程序使用
  • QUEUEOFFSET: 8字节,消息在消息消费队列的偏移量
  • PHYSICALOFFSET: 8字节,消息在CommitLog文件中的偏移量
  • SYSFLAG: 4字节,消息系统flag(是否压缩、是否是事务消息)
  • BORNTIMESTAMP: 8字节,Producer调用发送API的时间戳
  • BORNHOST: 8字节,Producer的IP、端口号
  • STORETIMESTAMP: 8字节,消息存储时间戳
  • STOREHOSTADDRESS: 8字节,Broker服务器IP+端口号
  • RECONSUMETIMES: 4字节,消息重试次数
  • Prepared Transaction Offset: 8字节,事务消息物理偏移量
  • BODY LENGTH: 4字节,消息体长度
  • BODY: N字节,消息体内容
  • TOPIC LENGTH: 1字节,topic长度
  • TOPIC: N字节,topic
  • PROPERTIES LENGTH: 2字节,消息属性长度
  • PROPERTIES: N字节,消息属性。格式:key1 0x01 value1 0x02 key2 0x01 value2

单条消息长度:91 + N(body) + N(topic) + N(properties)

ConsumeQueue

consumequeue的第一级目录为topic,第二级目录为topic的消息队列。
单个ConsumeQueue文件默认包含30w个条目,每个20字节。

Index

用于提高根据主题与消息队列检索消息的速度。

其中,hash槽中保存改槽最新加入的一条记录的Index索引。
根据Index索引计算在Index文件中的实际位置:Head大小(40字节)+ hashNum(500w)* 4字节 + Index索引 * 20字节

Index条目按顺序存储。
Index条目字段含义:
hashcode: key的hashcode,查找时进一步比较hashcode是否一致
phyoffset: 消息对应在commitLog中的文件偏移量
timedif: 该消息存储时间与第一条消息的时间差值,必然大于0
preIndexNo: 前一条记录的Index索引

checkpoint文件

记录CommitLog、ConsumeQueue、Index文件的刷盘时间点,各占8字节。文件固定长度4k,但只用到前24个字节。

实现类

MappedFile

CommitLog对应的实现类。根据transientStorePoolEnabIe的配置,行为略有不同。

  • commit操作:需达到最小提交页数。
  • flush操作:调用MappedByteBufferFileChannelforce方法。

TransientStorePool

单独创建的MappedByteBuffer内存缓存池,如果启用,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目的物理文件对应的内存映射中。
RocketMQ会把这个堆外内存锁定在内存中,避免被交换到磁盘。

开启TransientStorePool时,由ByteBuffer执行写等操作,不开启由MappedByteBuffer执行。

指针:

  • wrotePosition
  • commitPosition: 开启TransientStorePool时使用
  • flushedPosition
  • readPostion: 非属性,提供方法获取。开启TransientStorePool时返回commitPosition,不开启时返回wrotePosition。

MappedFileQueue

用于管理MappedFile。

存储流程

  1. 消息校验。(Broker是否为Master、topic长度等)
  2. 处理延迟消息,原topic和queueId当成属性,topic和queueId设置为SCHEDULE_TOPIC_XXXX和对应的queueId。
  3. 设置消息参数。(CRC、时间戳、IP等)
  4. 获取当前可写入的CommitLog(MappedFile)。不存在时创建(创建下一个和下下个文件)
  5. 追加消息到CommitLog
    1. 生成全局唯一消息ID。16字节,4字节IP + 4字节端口 + 8字节消息偏移量
    2. 计算消息长度
    3. 如果所需长度大于剩余长度,追加CommitLog最后8字节,返回END_OF_FILE
    4. 写入到ByteBuffer,返回结果。
    5. 异步刷盘(设置为同步刷盘除外)
  6. ReputMessageService异步分发,写ConsumeQueue和Index

ConsumeQueue和Index恢复机制

RocketMQ在启动时创建abort文件,在退出时删除,如果启动时存在该文件,说明异常退出。

启动时,主要完成flushedPosition、committedWhere指针的设置、消息消费队列最大偏移量加载到内存、删除flushedPosition之后的所有的文件。如果异常启动,会将最后一个有效文件中的所有消息重新转发到消息消费队列与索引文件,确保不丢失消息,但同时可能导致消息重复的问题。

刷盘机制

  • 同步刷盘
  • 异步刷盘
    • 使用TransientStorePool: 单独申请一个与目标物理文件同样大小的堆外内存,使用内存锁定,消息先追加到堆外内存,然后提交到与物理文件的内存映射内存中,再flush到磁盘。
    • 不使用TransientStorePool: 消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中。

过期文件删除

非当前写文件在一定时间间隔内没有再次被更新(默认72小时),则认为是过期文件,不会关注这个文件上的消息是否被完全消费。

时机:

  • 每天固定时间(默认凌晨4点)
  • 磁盘空间是否充足。通过-Drocketmq.broker.diskSpaceCleanForciblyRatio设置 ,默认0.85。而-Drocketmq.broker.diskSpaceWarningLevelRatio设置最大阈值,默认0.90,超过该阈值将设置磁盘不可写,会拒绝新消息写入。
  • 手动调用excuteDeleteFilesManualy

消息消费

一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
同一消息队列保证消息顺序消费,不支持全局顺序消费。如需全局顺序消息消费,只能把该topic的队列数设置为1。
消息推模式,消息拉模式。
集群模式,广播模式。

消息队列负载由 RebalanceService 线程默认每隔 20s 进行一次消息队列负载,根据当前消费组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同 一个消息消费队列同一时间只会分配给一个消费者。

ProcessQueue

一个ProcessQueue对应一个MessageQueue,从Broker拉取到的消息会先存入ProcessQueue,默认每次拉取32条消息,按消息的队列偏移量顺序存入,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。

消费进度管理

广播模式

在消费者本地保存进度,默认保存在./.rocketmq_offsets目录下,文件为.rocketmq_offsets/{clientId}/groupName/offsets.json,保存MessageQueue: Offset

实现类为LocalFileOffsetStore,默认5s持久化一次。

集群模式

消费进度保存在Broker。实现类为RemoteBrokerOffsetStore

保存在${RocketMQ_HOME}/store/config/consumerOffset.json

定时消息

不支持任意时间精度的定时消息(因为支持任意精度需要做消息排序,会带来巨大的性能消耗),只支持特定级别,ls 5s 10s 30s lm 2m 3m 4m 5m 6m 7m 8m 9m lOm 20m 30m lh 2h,对应延迟级别1 2 3…

收到延迟消息时,将topic改为SCHEDULE_TOPIC_XXXX,消息队列为delayLevel - 1,原topic等信息设置到消息的属性中,每种延迟级别会有一个对应的任务,根据上次偏移量会定期拉取出消费队列中的消息,如果达到分发时间,则从CommitLog中拉取消息,还原topic等信息,存入coimmitLog文件。

关键类:ScheduleMessageService

顺序消息

  • 全局顺序消息:通过将topic设置只有1个消息队列实现
  • 局部顺序消息:确保一个消息队列的消息被顺序消费,发送时通过MessageQueueSelector按照自定义的规则选择消息队列

顺序消费由ConsumeMessageOrderlyService实现。消费时会按MessageQueue获取锁。启动时会向Broker锁定MessageQueue

消息过滤

  • 表达式过滤
    • TAG: 消费队列保存了Tag的hashcode,服务端会比对hashcode,客户端再进行精确比对
    • SQL92
  • 类过滤:消费者上传代码或从某个地址拉取代码,broker端会启动FilterServer,编译并加载,消费者获取消息时从FilterServer获取。 目前这些代码已移除。

主从同步机制

从服务器启动时主动向主服务器建立TCP长连接,从向主提交自身的偏移量。
消费者首先向主拉取消息,主返回一批消息,根据主服务器负载压力情况(当前拉取偏移与最大偏移的差值与Message最大占用内存的差值),会与消息一起返回建议的下次拉取的服务器(配置文件中whichBrokerWhenConsumeSlowly,默认为1)

事务消息

TransactionListener包含2个方法:executeLocalTransactioncheckLocalTransaction,前者在prepare后调用,如果此时返回提交或回滚,则在broker会进行相应操作,否则返回UNKNOW,此时broker会进行回查,调用checkLocalTransaction方法。

注意:回查时,会从相同Producer Group中随机选择一个Producer回查,如果运行在不同实例,需要维护全局的事务状态表