本文旨在对RocketMQ的消息实现原理进行解释,主要包括延迟消息, 事务消息, 顺序消息。
什么是消息
消息是一种用于系统之间通信的介质,可以在分布式系统中传递数据或命令。消息的传递通过消息中间件实现,例如RocketMQ、Kafka等。消息中间件通过异步解耦的方式,帮助系统实现高性能、高可靠性的数据传输。
RocketMQ主要支持以下几种消息模型:
- 普通消息:最常见的消息类型,即生产者将消息发送到队列,消费者从队列拉取消息并进行处理。
- 延迟消息:允许消息在指定时间之后才被消费者消费。
- 事务消息:支持分布式事务,在消息发送和业务执行之间实现事务一致性。
- 顺序消息:确保消息按照生产者发送的顺序被消费者消费。 本文将主要关注延迟消息、事务消息以及顺序消息的实现原理。
延迟消息
延迟消息是立马被生产者
投递出去的,而不是到了时间后才被生产者
将消息发往Broker, 将压力给到了Broker, 使Broker堆积大量消息,削峰填谷。为防止Broker中队列消息堵塞或offset跳来跳去(局部性原理
进行预加载)且需要过多’闹钟’去单独处理每条消息, 使得性能变差, 所以RocketMQ直接约定了18个等级的延迟投递: 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h。然后建立一个共享的定时任务线程池, 里面共有18个核心线程,来管理所有延迟消息的投递, 这样延迟消息也有统一归类和约束,便于管理和调配。
为达成这一目的, RocketMQ专门搞个存放延迟消息的Topic(SCHEDULE_TOPIC_XXXX
),延迟消息先发往这个Topic, 此Topic可以复用commitLog, 将消息分发到consumeQueue,消费者并不会订阅这个Topic,因此此时消费者无法消费到这条消息。然后线程池定时调度Topic下的每个队列的消息,一旦有到期的消息,就分发到原Topic供消费者消费。
每个延迟等级对应一个独立的任务(DeliverDelayedMessageTimerTask), 延迟时间是100ms,这些任务会被提交到线程池中,这些任务的内容就是根据传入的QueueID, 得到对应的consumeQueue,当然还有对应的offset。Broker会定时保存SCHEDULE_TOPIC_XXXX下consumeQueue的消费offset。如果线程池(deliverExecutorService)大小足够(例如等于或大于延迟等级数),那么这些任务可能会被并行处理。
如果任务到期立马新建一个任务扔到线程池中,延迟时间是100ms,任务的入参会塞入更新后的offset,这样线程就会继续消费后面的消息,如此往复循环。如果拿到的对应延迟消息还未到时间,那么offset不变,也立马新建一个任务塞入到线程池中,这样100ms后又会来看这个消息是否到期。
从实现层面来看,这些方式大大减少了延迟消息开发的复杂度,但是这样的实现对延迟时间来说是不准的
,
任务的执行时间, 队列消费等待时间都会导致延迟时间不准。
事务消息
说到事务肯定就是ACID(原子性、一致性、隔离性和持久性
), 面对同一个数据库的数据操作可以用数据库自身的机制来保证事务, 但面对属于不同的数据库的数据操作,就用不上数据库自身的事务了,只能上分布式事务
了。
实现分布式事务的方案有很多,比如:2PC、3PC、TCC、本地消息、事务消息。这篇讲的就是事务消息。
事务消息更适合应用在异步更新场景,用来保证最终一致性
, 过程允许两端数据的不一致。
事务开始时, 先发送一条半消息(half message)给Broker,也就是不完整的消息,这种消息会被消费者跳过消费, 然后根据本地事务的执行结果来决定是向Broker发送提交消息,还是发送回滚消息。RocketMQ设计了一个反查机制:Broker会通过生产者暴露的接口向发送的生产者来反查这个事务是否成功, 当出现意外情况, 就通过回滚消息来保证事务的一致性。如果发送的生产者挂了,还有同一个生产组的其他生产者可以供Broker反查,这就是 producer group
的作用之一。
发送半消息的的时候,发往的不是原先的Topic,而是将发往特定的Topic:RMQ_SYS_TRANS_HALF_TOPIC, 队列默认为0。Broker起了一个定时线程TransactionalMessageCheckService
服务,它会定时的扫描RMQ_SYS_TRANS_HALF_TOPIC
这个Topic下的消息,去请求生产者的反查接口看看事务成功了没,如果成功就恢复原先的Topic供消费者消费,失败的话就不重新投递。
顺序消息
顺序消息需要从发送、存储、消费保证顺序性, 也就是在这三个阶段保证其时间顺序
或因果顺序
。
在发送阶段需保证单个生产者
和单个生产者内对顺序消息是单线程(串行)发送的
,防止多生产者和多线程带来的顺序无法保证的问题。
在存储阶段需保证单个队列
内相关消息是顺序存储的,让相关的顺序消息都分配到同一个consumeQueue 。方法是通过orderId取余队列数使得一样的订单都会被发往相同的队列, 指定队列。
消费要求和发送阶段大体一致, 但顺序消息的处理在业务上需要前面消息消费失败后支持相关联的消息都直接失败,然后可以在另外的地方持久化保存这些消息,待后续修复后可以重新消费。还有就是消费者的负载均衡(c)
面对消费场景,RocketMQ利用三把锁常见的锁来尽可能地保证消息的消费顺序性。
- 分布式锁: 把对应的队列与消费者绑定, 如果发现已经被别的消费者绑定了,那么就无法拉取消息消费, 消费者会定时(默认每 20s)的去续这个锁来保证分布式锁的拥有。
- Synchronized: 保证了同一时刻只有一个线程去消费这个队列(使得同一消费者可以并发消费不同队列的消息)
- ReentrantLock : 这把锁更像一个标记位,来表明当前这个队列还有消息在消费,无法重平衡,等待下一次重平衡。(这把锁拥有更高的细粒度)
顺序性和可用性的辩证关系 : 如果要保证绝对的顺序性,这个时候只能放弃可用性,也就是不能发往不同的队列,反之如果要保证可用性就无法保证绝对的顺序性。
RocketMQ 对这两个模式都提供了方案:如果要绝对的顺序性,则创建 Topic 时要指定 -o 参数(–order)为true,且 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true。上面的条件只要有任何一个是 false 则是选择了可用性。
总结
以上就是我对RocketMQ三种特性消息实现原理的解释