1. 程式人生 > 實用技巧 >分散式事務(三)mysql對XA協議的支援

分散式事務(三)mysql對XA協議的支援

引子

從Mysql5開始,innoDB引擎支援XA協議的分散式事務。DTP模型中,一個TM(事務管理器管理)管理多個RM(資源管理器),每個RM維護自己的事務分支。在看原始碼之前我們看一下底層DB mysql對XA事務的支援。

回到頂部

1. XA語法

官網:13.3.8.1XA Transaction SQL Syntax

 1 XA {START|BEGIN} xid [JOIN|RESUME]   開啟XA事務,使用begin才能使用join/resume,start不支援
 2 
 3 XA END xid [SUSPEND [FOR MIGRATE]]   不支援SUSPEND [FOR MIGRATE]
 4 
 5 XA PREPARE xid                       二階段提交的準備階段
 6 
 7 XA COMMIT xid [ONE PHASE]            二階段提交的提交階段,ONE PHASE代表一階段提交,如果只有一個rm參與者,那麼二階段提交優化為一階段提交
 8 
 9 XA ROLLBACK xid                      回滾
10 
11 XA RECOVER [CONVERT XID]             列出所有處於prepared狀態的事務

上面的語法中都有xid官方解釋如下:

xid: gtrid [, bqual [, formatID ]]

其中,

gtrid:全域性事務ID,不得超過64,建議使用十六進位制數。

bqual:分支限定符(branch qualifier),如果沒有提供bqual,那麼預設值為空字串'',長度不超過64,建議使用十六進位制數。

formatID:是一個無符號整數,用於標記gtrid和bqual值的格式,預設為1,長度不超過64.

對應java介面:

 1 public interface Xid {
 2     int MAXGTRIDSIZE = 64;
 3     int MAXBQUALSIZE = 64;
 4 
 5     int getFormatId();
 6 
 7     byte[] getGlobalTransactionId();
 8 
 9     byte[] getBranchQualifier();
10 }
回到頂部

2. XA狀態

官網:13.3.8.2XA Transaction States

XA事務的狀態,按照如下步驟進行展開

1.使用XA START來啟動一個XA事務,並把它置於ACTIVE狀態。

2.對於一個ACTIVE狀態的 XA事務,我們可以執行構成事務的SQL語句,然後釋出一個XA END語句。XA END使事務進入IDLE狀態。

3.對於一個IDLE狀態XA事務,可以執行一個XA PREPARE語句或一個XA COMMIT…ONE PHASE語句:

  • XA PREPARE把事務放入PREPARED狀態。在此點上的XA RECOVER語句將在其輸出中包括事務的xid值,因為XA RECOVER會列出處於PREPARED狀態的所有XA事務。

  • XA COMMIT…ONE PHASE(優化成一階段提交)用於預備和提交事務。xid值將不會被XA RECOVER列出,因為事務終止。

4.對於一個PREPARED狀態的 XA事務,執行XA COMMIT語句來提交和終止事務,或者執行XA ROLLBACK來回滾並終止事務。

注意:

同一個客戶端資料庫連線,XA事務和非XA事務(即本地事務)是互斥的。例如,已經執行了”XA START”命令來開啟一個XA事務,則本地事務不會被啟動,直到XA事務已經被提交或被 回滾為止。相反的,如果已經使用START TRANSACTION啟動一個本地事務,則XA語句不能被使用,直到該事務被提交或被 回滾為止。

回到頂部

3. 測試

 1 package study.xa;
 2 
 3 import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection;
 4 import com.mysql.jdbc.jdbc2.optional.MysqlXid;
 5 
 6 import javax.sql.XAConnection;
 7 import javax.transaction.xa.XAException;
 8 import javax.transaction.xa.XAResource;
 9 import javax.transaction.xa.Xid;
10 import java.sql.Connection;
11 import java.sql.DriverManager;
12 import java.sql.PreparedStatement;
13 import java.sql.SQLException;
14 
15 /***
16  * @Description mysql分散式事務XAConnection模擬
17  * @author denny
18  * @date 2019/4/3 上午9:15
19  */
20 public class MysqlXaConnectionTest {
21 
22     public static void main(String[] args) throws SQLException {
23         //true表示列印XA語句,,用於除錯
24         boolean logXaCommands = true;
25         // 獲得資源管理器操作介面例項 RM1
26         Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "12345");
27         XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn1, logXaCommands);
28         XAResource rm1 = xaConn1.getXAResource();
29 
30         // 獲得資源管理器操作介面例項 RM2
31         Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test2", "root", "12345");
32         XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection)conn2, logXaCommands);
33         XAResource rm2 = xaConn2.getXAResource();
34         // AP請求TM執行一個分散式事務,TM生成全域性事務id
35         byte[] gtrid = "g12345".getBytes();
36         int formatId = 1;
37         try {
38             // ==============分別執行RM1和RM2上的事務分支====================
39             // TM生成rm1上的事務分支id
40             byte[] bqual1 = "b00001".getBytes();
41             Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
42             // 執行rm1上的事務分支 One of TMNOFLAGS, TMJOIN, or TMRESUME.
43             rm1.start(xid1, XAResource.TMNOFLAGS);
44             // 業務1:插入user表
45             PreparedStatement ps1 = conn1.prepareStatement("INSERT into user VALUES ('99', 'user99')");
46             ps1.execute();
47             rm1.end(xid1, XAResource.TMSUCCESS);
48 
49             // TM生成rm2上的事務分支id
50             byte[] bqual2 = "b00002".getBytes();
51             Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
52             // 執行rm2上的事務分支
53             rm2.start(xid2, XAResource.TMNOFLAGS);
54             // 業務2:插入user_msg表
55             PreparedStatement ps2 = conn2.prepareStatement("INSERT into user_msg VALUES ('88', '99', 'user99的備註')");
56             ps2.execute();
57             rm2.end(xid2, XAResource.TMSUCCESS);
58 
59             // ===================兩階段提交================================
60             // phase1:詢問所有的RM 準備提交事務分支
61             int rm1Prepare = rm1.prepare(xid1);
62             int rm2Prepare = rm2.prepare(xid2);
63             // phase2:提交所有事務分支
64             boolean onePhase = false;
65             //TM判斷有2個事務分支,所以不能優化為一階段提交
66             if (rm1Prepare == XAResource.XA_OK
67                 && rm2Prepare == XAResource.XA_OK
68                 ) {
69                 //所有事務分支都prepare成功,提交所有事務分支
70                 rm1.commit(xid1, onePhase);
71                 rm2.commit(xid2, onePhase);
72             } else {
73                 //如果有事務分支沒有成功,則回滾
74                 rm1.rollback(xid1);
75                 rm1.rollback(xid2);
76             }
77         } catch (XAException e) {
78             // 如果出現異常,也要進行回滾
79             e.printStackTrace();
80         }
81     }
82 }

列印日誌:

1 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303031,0x1
2 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303031,0x1
3 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303032,0x1
4 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303032,0x1
5 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303031,0x1
6 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303032,0x1
7 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303031,0x1
8 Tue Jun 04 17:08:18 CST 2019 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303032,0x1

=====參考======

http://www.tianshouzhi.com/api/tutorials/distributed_transaction/384