消息队列:7.消息消费

推模式与拉模式

推消息

推消息是指,当Broker接受到消息后,会主动将消息推送给消费者。

该模式的优缺点:

  • 优点:实时性高,消费者简单。
  • 缺点:Broker无法得知消费者状态,可能导致推送过多消息。

改进推模式

上述模式中,Broker无法得知消费者的状态,因此添加反馈机制:

Broker在推送消息前,先获得消费者的状态,再进行消息推送。

优缺点:

  • 优点:可以实现削峰填谷,及时性高。
  • 缺点:Broker职责过多(需维护消费者状态),影响性能(如果消费者较多)。

所以推模式适用在消费者不多、消息量不大、及时性要求高的场景

拉消息

拉消息是指,消费者主动从Broker获得消息。

优缺点:

  • 优点:该方式可以充足考虑到消费者的性能,消费者会自行决定处理消息的速率。
  • 缺点:及时性低,如果消费者请求频率过高,会导致消息忙请求;频率低则会导致消息及时性低。

改进拉模式

拉模式存在一种变种:长轮询

在传统的轮询中,客户端会定期向服务器发送请求,询问是否有新的数据。这种方式效率较低,因为大部分时间服务器都没有新数据可供返回。

长轮询改进了这一点。在长轮询中,客户端发送一个请求到服务器,服务器不会立即返回结果。相反,服务器会保持连接打开,直到有新数据可供返回,或者超时时间到达。一旦有新数据到达,服务器就会立即返回响应给客户端,客户端收到响应后立即发起下一次长轮询请求。
长轮询的优点是可以实现实时更新,而不需要客户端频繁地发送请求。然而,它也有一些缺点,比如增加了服务器的负载和连接维持的开销。另外,长轮询也可能导致延迟,因为服务器只有在有新数据到达时才会返回响应。

比如,消费者发送拉取请求到 Broker 时,如果此时有消息,那么 Broker 直接响应返回消息,如果没消息就 hold 住这个请求,比如等 15s,在 15s 内如果有消息过来就立马响应这个请求返回消息。

通过长轮询,消费请求即避免了忙请求的情况,也进一步的提升了消息的及时性。RocketMQ和Kafka都是使用这种方式来实现获取消息的。

RocketMQ的pushConsumer和pullConsumer

RocketMQ即有pushConsumer也有pullConsume,但是RocketMQ本质上只实现了拉模式,pullConsumer就是去拉消息很好理解,至于还有个pushConsumer,实际上是伪推模式,底层的实现还是基于长链接的长轮询去拉取消息。pushConsumer的实现是背后有个线程会一直从Broker拉取消息,如果当前有过多的消息未被消费,那就过一会儿再执行,一旦有消息返回就回调用户定义的MessageListener来消费消息。

消费过程

1. 消费者启动

消费者启动后,需要先访问名称服务器(namesrv),获得订阅的Topic的路由信息,然后即可连接到Broker。
示意图:

2. 消费者负载均衡

队列粒度负载均衡

队列粒度负载均衡策略中,同一消费者分组内的多个消费者将按照队列粒度消费消息,即每个队列仅被一个消费者消费

示意图:

图中,主题中的三个队列Queue1、Queue2、Queue3被分配给消费者分组中的两个消费者,每个队列只能分配给一个消费者消费,该示例中由于队列数大于消费者数,因此,消费者A2被分配了两个队列。若队列数小于消费者数量,可能会出现部分消费者无绑定队列的情况。

当发生消费者上/下线时,或过一定时间,Broker就会发生重平衡,以此实现动态的负载均衡功能

注意:队列粒度负载均衡策略保证同一个队列仅被一个消费者处理,该策略的实现依赖消费者和服务端的信息协商机制,RocketMQ 并不能保证协商结果完全强一致。因此,在消费者数量、队列数量发生变化时,可能会出现短暂的队列分配结果不一致,从而导致少量消息被重复处理。

相对于后续的消息粒度负载均衡策略,队列粒度负载均衡策略分配粒度较大,不够灵活。但该策略在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。

*消息粒度负载均衡

5.0版本之后的RocketMQ支持消息粒度负载均衡,对于PushConsumer和SimpleConsumer类型的消费者,默认且仅使用消息粒度负载均衡策略。
消息粒度负载均衡策略中,同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费。

示意图:

图中,消费者分组Group A中有三个消费者A1、A2和A3,这三个消费者将共同消费主题中同一队列Queue1中的多条消息。
注意:消息粒度负载均衡策略保证同一个队列的消息可以被多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,并不能指定消息被哪一个特定的消费者处理。

消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费。

在顺序消息中,消息的顺序性指的是同一消息组内的多个消息之间的先后顺序。因此,顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费。不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。
比如,队列Queue1中有4条顺序消息,这4条消息属于同一消息组G1,存储顺序由M1到M4。在消费过程中,前面的消息M1、M2被消费者1处理时,只要消费状态没有提交,消费者2是无法并行消费后续的M3、M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息。

消息粒度消费负载均衡策略下,同一队列内的消息离散地分布于多个消费者,适用于绝大多数在线事件处理的场景。只需要基本的消息处理能力,对消息之间没有批量聚合的诉求。而对于流式处理、聚合计算场景,需要明确地对消息进行聚合、批处理时,更适合使用队列粒度的负载均衡策略。

3. 拉取消息

(继队列粒度负载均衡)

为了应对消费者的拉请求,RocketMQ内部实现有一套复杂的机制。
其涉及到三个组件:

  1. PullMessageService:RocketMQ消费者端在拉取消息时的核心服务线程。在RocketMQ的拉取模式(PULL模式)下工作,负责周期性地从Broker服务器拉取消息。它主要执行以下功能:
    1. 监听并处理pullRequestQueue中的PullRequest请求,这些请求代表了需要从特定MessageQueue拉取消息的任务。
    2. 向Broker发起远程调用,根据拉取策略和配置从指定的消息队列中批量拉取消息。
    3. 将从Broker获取到的一批消息添加到对应的ProcessQueue中。
    4. 如果Broker返回了消息,那么PullMessageService会在处理这批消息的同时,根据一定的拉取策略(如基于长轮询、固定间隔时间等),构建一个新的PullRequest,这个新的PullRequest会继续请求从同一个或者下一个MessageQueue拉取更多的消息。
  2. ProcessQueue:RocketMQ在消费端维护的一个内存队列结构,用来缓存从Broker拉取回来但尚未完全消费完的消息集合。它的作用主要包括:
    1. 存储已拉取但未消费或正在消费的消息,实现本地缓存和流量控制。
    2. 实现消费过程中的幂等性和顺序消费,例如通过跟踪每个消息的消费状态来避免重复消费。
    3. 参与消费限流和堆积管理,如果ProcessQueue中的消息过多,则PullMessageService可能会停止拉取新的消息,防止消费端因处理能力不足而造成消息积压。
  3. ConsumeMessageService:RocketMQ消费者端用于消费消息的服务,无论是在Push模式还是Pull模式下都起着关键作用。它通常包含一个线程池来处理消息消费任务:
    1. 在Push模式下,Broker主动推送消息给消费者时,ConsumeMessageService接收到消息后将其分发到对应的线程池进行消费。
    2. 在Pull模式下,虽然实际拉取操作由PullMessageService完成,但是拉取回来的消息最终也会交由ConsumeMessageService的线程池进行处理和消费。
    3. 负责消费者的负载均衡和消息消费进度的持久化,确保消息被正确且唯一地消费。
    4. …(ConsumeMessageService功能较多,此处仅介绍这些)

示意图:

保存Offset

  • 广播模式:在广播模式下,消费点位将存储在消费者本地磁盘上,因为广播模式是将消息广播给每个消费者,它不需要有个统一的地方来管理这个位置,每个消费者自己维护就行。
  • 集群模式:在集群模式下,消费点位存储在 Broker ,这样其他的消费者可以从Broker获取消费点位。
    • 当消费者成功消费一条消息后,它会向Broker发送消费确认请求,其中包括了本次消费的消息所在MessageQueue的标识和新的消费位点(即下一条待消费消息的Offset)。Broker接收到消费确认请求后,会更新该消费组在这个MessageQueue上的Offset信息。

消费重试

消费者出现异常,消费某条消息失败时,RocketMQ会根据消费重试策略重新投递该消息进行故障恢复。

推荐场景:

  • 业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功
  • 消费失败的原因不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞。

消费重试:消费者在消费某条消息失败后,RocketMQ服务端会根据重试策略重新消费该消息,超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

导致消息重试的原因:

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。
  • 消息处理超时,包括在PushConsumer中排队超时。

PushConsumer消费重试策略

状态机示意图:

  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
  • WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

一个消息被处理后可能会有三种结果:

  1. 消费成功。
  2. 消费失败或过期
    1. 重试次数未达到最大次数,进入待重试状态。
    2. 重试次数达到最大次数,进入死信状态。

时间轴图:

Wait Retry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。

具体重试间隔、重试次数可查看官网

SimpleConsumer消费重试策略

状态机示意图:

Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

SimpleConsumer类型下,消费者的重试间隔是预分配的,每次获取消息消费者会在调用API时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。

消息重试间隔=不可见时间-消息实际处理时长

InvisibleDuration参数主要用于控制消息可见性的延迟时间,特别是在Pull模式下具有特殊的意义。当消费者在Pull模式下拉取消息时,InvisibleDuration指的是消息被拉取之后,在这段时间内这条消息对于其他消费者是不可见的,也就是消息处于“隐形”状态。

  • 当消费者拉取到一条消息后开始处理,但处理过程中还未完成消费确认(ACK),此时若该消息立即对其他消费者可见,可能导致消息被重复消费。设置InvisibleDuration可以在一段时间内阻止其他消费者看到这条消息,给当前消费者足够的时间来完成消息处理并提交确认。
  • 如果某个消费者在处理消息时出现异常,未能在InvisibleDuration结束前完成ACK,消息将会在InvisibleDuration过后再次变为可见,允许其他消费者或者其他副本的消费者有机会重新拉取并处理该消息。

时间轴图:

消息过滤

RocketMQ提供了两种方法:Tag标签过滤、SQL属性过滤。

当消费者拉取消息时,Broker只会个给消费者想要的消息,不需要的会被自动认为已消费。

Tag标签过滤

每一条消息仅支持设置一个tag。message.setTags("TagA")

当消费者进行订阅时,可指定自己需要的Tag。该订阅关系会随着心跳消息发送给Broker,Broker即可知道消费者们的过滤条件,然后在Broker端进行过滤,并将过滤后的消息发送给消费者。

消费者订阅Tag有多个类型:

  • 单Tag匹配:只接收具有某个Tag的标签,consumer.subscribe("TopicA", "TagA")
  • 多Tag匹配:可接收多个Tag,Tag之间使用||隔开,consumer.subscribe("TopicA", "TagA||TagB||TagC")
  • 全匹配:忽略Tag,consumer.subscribe("TopicA", "*)

Tag标签过滤原理

每条消息在ConsumeQueue中,处理commitlog offset、size属性外,还有一个tag hashcode属性。
Broker在消费者来拉消息的时候,利用请求的 offset 的从ConsumerQueue能直接得到消息的tag hashcode,且本地已经存储了当前消费者的订阅消息,可直接利用 hashcode 对比当前消息是否应该被该消费者拉取。如果hashcode不一致,则跳过这条消息以达到过滤的作用。

使用tag Hashcode而不是Tag原字符串的原因是tag Hashcode是定长的,更适合放到ConsumeQueue中(ConsumeQueue也是定长的)。至于可能引起的Hash冲突,只要让消费者再次进行过滤即可。

SQL属性过滤

生产者可在消息中添加多个属性,如:message.putUserProperty("propertyA","123")
然后消费者即可设置对应的SQL过滤条件:(SQL过滤支持使用Tags中属性)

1
consumer.subscribe("TopicA", MessageSelector.bySQL("(TAGS is not null and TAGS in ('TagA','TagB')) and (propertyA is not null and a between 0 and 200)"));

SQL属性过滤是使用SQL92语法来作为过滤规则表达式的:

语法 说明 示例
IS NULL 判断属性不存在。 a IS NULL :属性a不存在。
IS NOT NULL 判断属性存在。 a IS NOT NULL:属性a存在。
> >= < <= 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。(可转化为数字的字符串也被认为是数字。) a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。 a IS NOT NULL AND a > ‘abc’:错误示例,abc为字符串,不能用于比较大小。
BETWEEN xxx AND xxx 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。 a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100。
NOT BETWEEN xxx AND xxx 用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。 a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100。
IN (xxx, xxx) 表示属性的值在某个集合内。集合的元素只能是字符串。 a IS NOT NULL AND (a IN (‘abc’, ‘def’)):属性a存在且属性a的值为abc或def。
= <> 等于和不等于。可用于比较数字和字符串。 a IS NOT NULL AND (a = ‘abc’ OR a<>’def’):属性a存在且属性a的值为abc或a的值不为def。
AND OR 逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。 a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。

SQL属性过滤原理

SQL属性过滤需要从commitlog获取消息,然后解析其中的属性,接着再做SQL匹配,不匹配的消息被过滤,校验通过的消息被消费者拉取到本地。

此时因为不会存在 hash 碰撞的情况,所以消费者本地不需要再进行二次校验。

SQL由于需要进行SQL解析,且还要从磁盘中获得消息实际数据,所以性能较差。