通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪

  • 日期:08-18
  • 点击:(790)


“通过利用MQ峰值切割,系统解耦和异步操作,在互联网行业中,分布式服务可用,MQ通常不会缺席。”

由Ali研究过的RocketMQ经历了多年的高水平并发挑战。 4.3.0版本引入了事务性消息的新功能。本文介绍了与RocketMQ 4.5.0事务消息相关的源代码跟踪。

交易信息解决了什么问题?

交易信息的实现原理及其设计亮点

01 - 要解决的问题

假设我的系统现在有这样的场景:

数据库事务在本地启动以执行借记操作。成功后,MQ消息将发送到库存中心进行交付。

有些人会想到打开mybatis事务实现,将本地事务和MQ消息放在一起是不够的?如果成功发送MQ,则提交事务,如果传输失败,则回滚事务,并且一次完成整个操作。

7564501-74957b9db7557bf1

似乎没有问题,但网络不可靠。

假设由于网络原因尚未收到MQ返回的响应,因此必须在面对不确定的MQ返回结果时回滚。但是,MQ消息仅丢失给客户端,因此结果是借记失败并且装运成功。

7564501-75091651bdc02908

由于无法使用本地事务写入MQ消息的发送,我们如何确保整体需求是原子的?答案是我们今天介绍的主角:交易新闻。

02-概述

7564501-ac7b76efcd47a66e

主线

定时任务发送过程:发送半消息(半消息),执行本地事务,发送事务执行结果

定时任务审核流程:MQ服务器检查本地事务并发送事务执行结果

源代码主线分析

03 - 源代码分析

半消息发送过程

本地应用程序(客户端)

在本地应用程序中发送事务消息的核心类是TransactionMQProducer。该类通过继承DefaultMQProducer来重用与发送消息相关的大多数逻?U飧隼嗟拇敕浅P。挥?100行。以下是此类的sendMessageTransaction方法。 >

这种方法做了两件事,

检查transactionListener是否存在

调用父类来执行事务性消息传递

TransactionListener在事务消息流中起着至关重要的作用,看看这个接口

7564501-fae7a3d799789563

界面评论非常清楚。使用上面的概述视图,executeLocalTransaction方法对应于本地事务操作的执行,checkLocalTransaction对应于本地事务操作。

下面是DefaultMQProducer类

sendMessageInTransaction方法源代码

7564501-6026ddc1be6e194e

为了使源代码的逻辑更直观,作者简化了核心代码。 sendMessageInTransaction方法主要执行以下操作

消息标记有事务消息,MQ服务器使用该消息来区分正常消息和事务消息。

发送半信息

如果传输成功,则由transactionListener

执行本地事务

执行endTransaction方法。如果半消息未能发送或本地事务无法告诉服务器删除半消息,则成功发送半消息并成功执行本地事务。

发送半消息进程,客户端代码几乎在这里完成,然后看看如何处理RocketMQ服务器

RocketMQ服务器

服务器收到消息后,会执行一些域对象转换,是否支持事务消息的权限检查。它对理解交易消息没什么用。这里省略了旁路部分的介绍。以下是用于处理半消息的transactionalMessageBridge类的源代码

7564501-b5e8b3c3a6153ae5

这两种方法主要做以下几点:

7564501-9cd30dbc7b4109f7

将消息的主题queueId放入消息体的自己的缓存地图

将消息主题设置为“RMQ_SYS_TRANS_OP_HALF_TOPIC”并将queueId设置为0

将消息写入磁盘以保持持久性

通过区分主题,您可以看到所有事务半消息将被放置在同一主题的同一队列中,从而避免消费者将消息传递给

服务器保留半消息,然后将结果发送到本地应用程序。此时,完成服务器端的半消息处理,然后是计划任务的首次亮相。

定时任务审核流程

RocketMQ服务器

计划任务是一个名为TransactionalMessageService类的线程。以下是此类的检查方法

7564501-d6dca9ade9307256

检查该部分的一半消息。

最有趣的是putBackHalfMsgQueue方法,因为每次将半个消息从磁盘提取到内存中进行处理时,其属性都会更改(例如,TRANSACTION_CHECK_TIMES,这是丢弃事务消息的关键信息)。

因此,您需要在发送核对消息之前将半消息再次放在磁盘上。

RocketMQ采用的方法是基于最新的物理偏移重写,而不是修改原始的半消息。 RocketMQ的存储设计的目的是按顺序写入。如果修改消息,则无法实现高性能。

检查消息。

7564501-d667816d5b869250

本地应用程序(客户端)

以下是DefaultMQProducerImpl的checkTransactionState方法,它是核对消息的本地应用程序的处理逻辑

7564501-ab36dfebc04f4c80

在简化代码逻辑后,您可以清楚地看到

打开一个线程来执行核对的逻辑

执行transactionListener的checkListTransaction方法以获取本地事务执行的结果

RocketMQ服务器

RocketMQ服务器将接收客户端发送的Commit消息。

读取半信息 - >恢复原始信息主体信息,如主题 - >再写入磁盘,如正常信息 - >删除上半部分信息

如果是回滚消息,请直接删除上一半消息

RocketMQ事务消息的调用链结束

04-思维

1.分布式事务是否等于事务消息?

两者无所谓,事务消息只保证本地事务和MQ消息被发送以形成整体原子性,并且在传递到MQ服务器之后,消费者是否能够肯定消费购买是不能保证的。

2.源代码设计的亮点是什么?

对链接源的学习和理解发现仍有许多亮点

服务器端返回检查消息,客户端检查消息逻辑处理,并且异步提交客户端提交/回滚消息。可以说异步的地方可以使用异步,通过异步+重试模式。这确保了即使分布式环境中的短期网络状况也不会影响整体逻辑。

TransactionListener的介绍,开放和关闭的原理以及依赖性反转的原理,面向接口的编程。整体可扩展性非常好,用户只需要编写自己的Listener来发送事务消息,非常方便

TransactionMQProducer通过继承DefaultMQProducer

极大地复用了与发送消息相关的逻辑

3.源代码设计是否缺乏?

RocketMQ是一个非常成功的消息传递中间件。找到缺点并不容易。作者谈到了一些要点

与事务相关的方法(如sendMessageIntransaction)分为DefaultMQProducer。从内聚的角度来看,这是一种与事务相关的发送消息的方法,应该分为TransactionMQProducer。

在整个链接中多次写入半消息。如果并发性很大并且大多数消息是事务性消息,则可靠性可能会有问题。

最后,分享一本采访书[Java核心知识点整理]涵盖JVM,锁定,高并发,反射,Spring原理,微服务,Zookeeper,数据库,数据结构等“,以及Java208面试问题(带答案)加入我的粉丝团(Java Fill Road :)免费获得它!掌握这些知识点,你可以在面试中赢得很多候选人,暴击9999分。机会留给准备人员,只有充分的准备,可以让自己在候选人中脱颖而出。