消息队列:8.消息功能特性
普通消息
普通消息是RocketMQ中最基础的消息。常用于微服务解耦、数据集成等场景。
数据集成:在不同系统、应用程序或数据存储之间传输、转换、共享和同步数据的情况。
生命周期
- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
- 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被
处理(包括消费成功和失败)。 - 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中
删除。
使用方式
- 在broker中先提前创建NORMAL类型的消息主题:
sh mqadmin updateTopic-n <nameserver_address>-t <topic_name>-c <cluster_name>-a +message.type=NORMAL
- 在客户端SDK中向该主题发送消息即可。
普通消息没什么好讲的,补充一下同步消息和异步消息的概念吧。
同步消息
同步消息:生产者发送一条消息给Broker,需要阻塞等待Broker返回响应,然后才会继续发送后续的消息。
示意图:
如果在发行消息时,发送消息未送达,或未接受到Broker的ack响应,都会重新发送,直到失误一定次数后抛出异常。
异步消息
异步消息:发送一条消息给Broker,不需要阻塞等待Broker返回响应,可直接继续发送后续的消息。
当异步消息发送成功或失败时,会回调在发送时设置的回调函数进行操作。
示意图:
延时消息
延时消息为RocketMQ中的高级特性消息。常用于分布式定时调度、任务超时处理场景。
分布式定时调度:如每天5点执行文件清理
任务超时处理:订单下单后暂未支付,需要等待一段时间后关闭订单
RocketMQ延时消息设置的延时时间是当前系统时间后的某一个时间戳,而不是一段延时时长,定时时间的格式为毫秒级的Unix时间戳。
社区版的定时消息支持18个等级的延迟:
延迟等级 | 延迟时间 | 延迟等级 | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 15s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
生命周期
- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
- 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
- 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。
- 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
使用方式
使用mqadmin工具创建延迟类型的主题:sh mqadmin updateTopic-n <nameserver_address>-t <topic_name>-c <cluster_name>-a +message.type=DELAY
实现原理
1.SCHEDULE_TOPIC_XXXX
当Broker接收到延迟消息时,会将该消息的Topic和MessageQueueID等属性作为消息扩展属性存储起来,然后将该消息存储到SCHEDULE_TOPIC_XXXX主题中,根据延时等级存储到对应的18个消息队列。
2.定时线程池
Broker 会开启一个定时线程池,里面一共有18个核心线程,这个线程池的任务就是定时调度(间隔100ms)查看SCHEDULE_TOPIC_XXXX下的每个队列的消息,一旦有到期的消息,就分发到原Topic的队列中供消费者消费。
示意图:
顺序消息
顺序消息为RocketMQ中的高级特性消息,常用于有序事件处理、数据库变更增量同步等场景。
数据库变更增量同步:如数据库操作日志,不可乱序。
生命周期
顺序消息生命周期与普通消息生命周期相同。
使用方式
- 使用mqadmin工具创建顺序消息类型的主题:
sh mqadmin updateTopic-n <nameserver_address>-t <topic_name>-c <cluster_name>-a +message.type=FIFO
- 发送消息时需要设置MessageGroup,该值用于为消息设置GroupID,同一个GroupID的消息只会被其中一个消费者(如果有一个包含多个消费者的消费者组订阅顺序主题)消费,从而确保消息的顺序性。
- 消费消息时,必须提交消费结果。同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
保证顺序性
为保证顺序性,需同时保证生产顺序性和消费顺序性。
- 保证消息生产的顺序性,必须满足单一生产者、串行发送两个条件。
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法保证先后顺序。
- 保证消息消费的顺序性,必须满足投递顺序、有限重试两个条件。
- 投递顺序:RocketMQ通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。(消费者类型为PushConsumer时,RocketMQ保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。)
- 有限重试:RocketMQ顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。(顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。)
服务端顺序存储逻辑如下:相同消息组的消息按照先后顺序被存储在同一个队列;不同消息组的消息可以混合在同一个队列中,且不保证连续:
如上图所示,同组的消息均保证顺序。
全局与分区
全局顺序消息:需要将所有的消息发送到同一个队列(设置同一个MessageGroup),并且消费不能并发消息,性能差。
分区顺序消息:仅保证同一批业务逻辑相关消息的顺序,而并发度取决于Topic下的队列数,不同队列之间可以并发处理顺序消息,互不影响,性能较好。
底层实现
rocketMq的顺序消息,通过两种机制来保证消息只会被一个线程处理:
- borker维护全局队列锁,保证只有一个客户端可获得该队列消息。
- 客户端内维护了局部队列锁,保证在一个JVM应用里面,这个队列只会被一个线程消费。
1. 维护全局队列锁
在 RocketMQ 中,有两种基本的消息消费模型:并发消费(MessageListenerConcurrently 实现)和有序消费(MessageListenerOrderly 实现)。
并发消费不保证消息的消费顺序,而有序消费则保证了在同一个消息队列(Queue)中消息的消费顺序与生产顺序一致。
ConsumeMessageOrderlyService
该类在初始化的时候会生成一个定时线程池,然后在ConsumeMessageOrderlyService
启动的时候,会创建一个任务,1s执行一次lockMQPeriodically
这个方法,就是给当前客户端所消费的所有队列去broker进行上锁。(ConsumeMessageOrderlyService执行在消费者本地)lockMQPeriodically
内部会调用lockBatchMQ
发送上全局锁请求到Broker,对MessageQueue进行上锁,并获得上锁结果。
上全局锁过程:
- 循环判断当前队列,是否被当前的客户端持有,如果已经持有了,那么更新下持有时间。对未持有锁的队列,存入
notLocakedMqs
。 - 循环
notLocakedMqs
,尝试获取全局锁。尝试过程如下:- 获得用来锁全局锁信息的锁。
- 每个队列(当前客户端所消费的所有队列)的全局锁信息。
- 当锁信息为空时,说明当前队列没有被上锁,设置该队列锁为当前客户所有。
- 如果队列锁为当前队列所有,则更新持有时间。
- 如果队列锁已过期,无论之前是什么客户端持有,都获得该队列锁并更新持有时间。
- 否则说明该锁已被其他客户端持有,获得失败。
当获得了全局队列锁后,即可进入本地队列锁。
2.维护本地队列锁
ConsumeMessageOrderlyService
中对消息消费的策略:
- 当从broker拉取到的消息后,会放入
ProcessQueue
,ProcessQueue
内部会根据offset对消息做有序存储,保证单个队列的消息是有序的。 - 只有当
ProcessQueue
中存在消息且目前不在消费(目前无线程正在消费),才会构建ConsumerRequest
消费任务到线程池去进行消费。 ConsumerRequest
的消费策略(run
方法):- 获取本地的队列锁,本地的队列锁获取成功,则进行处理;如果获取失败则调用tryLockLaterAndReconsume延迟消费消息。
- 再次判断全局队列锁有无获得成功,未成功则退出。
- 调用监听器消费信息,获得结果。
- 当结果为异常时,重试。
当消费者数量变更的时候,会触发负载均衡,客户端会重新计算消费的队列,这个时候会把不需要再消费的队列的全局锁释放掉,同时还是去borker里面对新消费的队列进行上锁,如果上锁失败,那么这个队列的消息是不能消费的,只有上锁成功才能被消费。
事务消息
事务消息为RocketMQ中的高级特性消息,应用于保证分布式事务的最终一致性场景。
RocketMQ的事务消息支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
需要注意的是,事务消息的特点是支持回滚消息(即二阶段未通过后可将消息移出),而不是通过RocketMQ来管理分布式事务并实现最终一致性。比如有一段代码,要求具有原子性,其中包括发送一些消息、更改一些数据等功能,当发生异常时,要求数据库可回滚,消息也可回滚。
事务消息处理流程
- 生产者将消息发送至Broker。
- Broker持久化消息,返回ACK。目前消息状态为(事务待提交)。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
生命周期
- 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
- 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
- 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
- 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。
- 消息删除:RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
使用方法
- 使用mqadmin工具创建事务类型主题:
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION
。 - 事务生产者需要提供一个事务检查器,用于检查确认异常半事务的中间状态,比如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 错误的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
注意:消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。如果事务执行较慢,消息回查太快,可以将第一次事务回查时间设置较大一些或让程序能正确识别正在进行中的事务。
实现原理
1. RMQ_SYS_TRANS_HALF_TOPIC
和顺序消息类似,发送事务消息到Broker时,Broker会检查是否为事务消息,是则会将原先的Topic、queue存储在属性中,并将该事务消息的topic替换为RMQ_SYS_TRANS_HALF_TOPIC
,存入该主题。
2. TransactionMessageCheckService
该主题内消息会不被消费者消费,而是等到生产者提交或回滚,再将该消息发往Topic或直接丢弃。
会有一个**定时线程TransactionMessageCheckService
**,定时扫描RMQ_SYS_TRANS_HALF_TOPIC
下的消息,反查生产者的反差接口来判断事务有无成功,成功则投递到原Topic,失败则丢弃,未知则跳过。