1. 程式人生 > >使用golang理解mysql的兩階段提交

使用golang理解mysql的兩階段提交

# 使用golang理解mysql的兩階段提交 文章源於一個問題:如果我們現在有兩個mysql例項,在我們要儘量簡單地完成分散式事務,怎麼處理? # 場景重現 比如我們現在有兩個資料庫,mysql3306和mysql3307。這裡我們使用docker來建立這兩個例項: ```bash # mysql3306建立命令 docker run -d -p 3306:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3306.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3306:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7 # msyql3306的配置: [mysqld] pid-file = /var/run/mysqld/mysqld.pid socket = /var/run/mysqld/mysqld.sock datadir = /var/lib/mysql server-id = 1 log_bin = mysql-bin binlog_format = ROW expire_logs_days = 30 # mysql3307建立命令 docker run -d -p 3307:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3307.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3307:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7 # msyql3307的配置: [mysqld] pid-file = /var/run/mysqld/mysqld.pid socket = /var/run/mysqld/mysqld.sock datadir = /var/lib/mysql server-id = 2 log_bin = mysql-bin binlog_format = ROW expire_logs_days = 30 ``` 在mysql3306中 我們有一個user表 ```sql create table user ( id int, name varchar(10), score int ); insert into user values(1, "foo", 10) ``` 在mysql3307中,我們有一個wallet表。 ```sql create table wallet ( id int, money float ); insert into wallet values(1, 10.1) ``` 我們可以看到,id為1的使用者初始分數(score)為10,而它的錢,在wallet中初始錢(money)為10.1。 現在假設我們有一個操作,需要對這個使用者進行操作:每次操作增加分數2,並且增加錢數1.2。 這個操作需要很強的一致性。 # 思考 ## 兩階段提交 這裡是一個分散式事務的概念,我們可以使用2PC的方法進行保證事務 ![20200331161038](http://tuchuang.funaio.cn/md/20200331161038.png) 2PC的概念如圖所示,引入一個資源協調者的概念,由這個資源協調者進行事務協調。 第一階段,由這個資源協調者對每個mysql例項呼叫prepare命令,讓所有的mysql例項準備好,如果其中由mysql例項沒有準備好,協調者就讓所有例項呼叫rollback命令進行回滾。如果所有mysql都prepare完成,那麼就進入第二階段。 第二階段,資源協調者讓每個mysql例項都呼叫commit方法,進行提交。 mysql裡面也提供了分散式事務的語句XA。 ## 用單個例項的事務行不行 等等,這個兩階段提交和我們的事務感覺也差不多,都是進行一次開始,然後執行,最後commit,mysql為什麼還要專門定義一個xa的命令呢?於是我陷入了思考... 思考不如實操,於是我用golang寫了一個使用mysql的事務實現的“兩階段提交”: ```go package main import ( "database/sql" "fmt" _ "github.com/go-sql-driver/mysql" "github.com/pkg/errors" ) func main() { var err error // db1的連線 db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1") if err != nil { panic(err.Error()) } defer db1.Close() // db2的連線 db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2") if err != nil { panic(err.Error()) } defer db2.Close() // 開始前顯示 var score int db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) var money float64 db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) tx1, err := db1.Begin() if err != nil { panic(errors.WithStack(err)) } tx2, err := db2.Begin() if err != nil { panic(errors.WithStack(err)) } defer func() { if err := recover(); err != nil { fmt.Printf("%+v\n", err) fmt.Println("=== call rollback ====") tx1.Rollback() tx2.Rollback() } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) }() // DML操作 if _, err = tx1.Exec("update user set score=score+2 where id =1"); err != nil { panic(errors.WithStack(err)) } if _, err = tx2.Exec("update wallet set money=money+1.2 where id=1"); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("commit before error")) // commit fmt.Println("=== call commit ====") err = tx1.Commit() if err != nil { panic(errors.WithStack(err)) } // panic(errors.New("commit db2 before error")) err = tx2.Commit() if err != nil { panic(errors.WithStack(err)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) } ``` 我這裡已經非常小心地在defer中recover錯誤資訊,並且執行了rollback命令。 如果我在commit命令之前的任意一個地方呼叫了`panic(errors.New("commit before error"))` 那麼命令就會進入到了rollback這裡,就會把兩個例項的事務都進行回滾。 ![20200331162451](http://tuchuang.funaio.cn/md/20200331162451.png) 通過結果我們可以看到,分數和錢數都沒有改變。這個是ok的。 但是如果我在db2的commit之前觸發了panic,那麼這個命令進入到了rollback中,但是db1已經commit了,db2還沒有commit,這個時候會出現什麼情況? ![20200331162723](http://tuchuang.funaio.cn/md/20200331162723.png) 非常可惜,我們看到了這裡的score增長了,但是money沒有增長,這個就說明無法做到事務一致性了。 ## 回到mysql的xa 那麼還要回歸到2PC,mysql為2PC的實現增加了xa命令,那麼使用這個命令我們能不能避免這個問題呢? 同樣,我用golang寫了一個使用xa命令的程式碼 ```go package main import ( "database/sql" "fmt" "strconv" "time" _ "github.com/go-sql-driver/mysql" "github.com/pkg/errors" ) func main() { var err error // db1的連線 db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1") if err != nil { panic(err.Error()) } defer db1.Close() // db2的連線 db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2") if err != nil { panic(err.Error()) } defer db2.Close() // 開始前顯示 var score int db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) var money float64 db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) // 生成xid xid := strconv.FormatInt(time.Now().Unix(), 10) fmt.Println("=== xid:" + xid + " ====") defer func() { if err := recover(); err != nil { fmt.Printf("%+v\n", err) fmt.Println("=== call rollback ====") db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) }() // XA 啟動 fmt.Println("=== call start ====") if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // DML操作 if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil { panic(errors.WithStack(err)) } // XA end fmt.Println("=== call end ====") if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // prepare fmt.Println("=== call prepare ====") if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("db2 prepare error")) if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // commit fmt.Println("=== call commit ====") if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("db2 commit error")) if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { panic(errors.WithStack(err)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) } ``` 首先看成功的情況: ![20200331164057](http://tuchuang.funaio.cn/md/20200331164057.png) 一切完美。 如果我們在prepare階段丟擲panic,那麼結果如下: ![20200331164425](http://tuchuang.funaio.cn/md/20200331164425.png) 證明在第一階段出現異常是可以回滾的。 但是如果我們在commit階段丟擲panic: ![20200331164533](http://tuchuang.funaio.cn/md/20200331164533.png) 我們發現,這裡的分數增加了,但是money卻沒有增加。 那麼這個xa和單個事務有什麼區別呢?我又陷入了深深的沉思... ## xa的用法不對 經過在技術群(全棧神盾局)請教,討論之後,發現這裡對2pc的兩個階段理解還沒到位,這裡之所以分為兩個階段,是強調的是每個階段都會持久化,就是第一個階段完成了之後,每個mysql例項就把第一個階段的請求例項化了,這個時候不管是mysql例項停止了還是其他問題,每次重啟的時候都會重新回覆這個commit。 我們把這個程式碼的rollback去掉,假設commit必須成功。 ```go package main import ( "database/sql" "fmt" "strconv" "time" _ "github.com/go-sql-driver/mysql" "github.com/pkg/errors" ) func main() { var err error // db1的連線 db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1") if err != nil { panic(err.Error()) } defer db1.Close() // db2的連線 db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2") if err != nil { panic(err.Error()) } defer db2.Close() // 開始前顯示 var score int db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) var money float64 db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) // 生成xid xid := strconv.FormatInt(time.Now().Unix(), 10) fmt.Println("=== xid:" + xid + " ====") defer func() { if err := recover(); err != nil { fmt.Printf("%+v\n", err) fmt.Println("=== call rollback ====") // db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) // db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) }() // XA 啟動 fmt.Println("=== call start ====") if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // DML操作 if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil { panic(errors.WithStack(err)) } // XA end fmt.Println("=== call end ====") if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // prepare fmt.Println("=== call prepare ====") if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("db2 prepare error")) if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // commit fmt.Println("=== call commit ====") if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { panic(errors.WithStack(err)) } panic(errors.New("db2 commit error")) if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { panic(errors.WithStack(err)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) } ``` ![20200331165500](http://tuchuang.funaio.cn/md/20200331165500.png) 這個時候,我們停掉程式(停掉mysql的連結),使用`xa recover`可以發現,db2的xa事務還留在db2中了。 ![20200331165622](http://tuchuang.funaio.cn/md/20200331165622.png) 我們在控制檯直接呼叫`xa commit '1585644880'` 還能繼續把這個xa事務進行提交。 ![20200331165742](http://tuchuang.funaio.cn/md/20200331165742.png) 這下money就進行了提交,又恢復了一致性。 所以呢,我琢磨了一下,我們寫xa的程式碼應該如下: ```go package main import ( "database/sql" "fmt" "log" "strconv" "time" _ "github.com/go-sql-driver/mysql" "github.com/pkg/errors" ) func main() { var err error // db1的連線 db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1") if err != nil { panic(err.Error()) } defer db1.Close() // db2的連線 db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2") if err != nil { panic(err.Error()) } defer db2.Close() // 開始前顯示 var score int db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) var money float64 db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) // 生成xid xid := strconv.FormatInt(time.Now().Unix(), 10) fmt.Println("=== xid:" + xid + " ====") defer func() { if err := recover(); err != nil { fmt.Printf("%+v\n", err) fmt.Println("=== call rollback ====") db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid)) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) }() // XA 啟動 fmt.Println("=== call start ====") if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // DML操作 if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil { panic(errors.WithStack(err)) } // XA end fmt.Println("=== call end ====") if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // prepare fmt.Println("=== call prepare ====") if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // panic(errors.New("db2 prepare error")) if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil { panic(errors.WithStack(err)) } // commit fmt.Println("=== call commit ====") if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { // TODO: 嘗試重新提交COMMIT // TODO: 如果還失敗,記錄xid,進入資料恢復邏輯,等待資料庫恢復重新提交 log.Println("xid:" + xid) } // panic(errors.New("db2 commit error")) if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil { log.Println("xid:" + xid) } db1.QueryRow("select score from user where id = 1").Scan(&score) fmt.Println("user1 score:", score) db2.QueryRow("select money from wallet where id = 1").Scan(&money) fmt.Println("wallet1 money:", money) } ``` 就是第二階段的commit,我們必須設定它一定會“成功”,如果有不成功的情況,那麼就需要記錄下不成功的xid,有一個數據恢復邏輯,重新commit這個xid。來保證最終一致性。 ## binlog 其實我們使用binlog也能看出一些端倪 ``` # 這裡的mysql-bin.0003替換成為你當前的log SHOW BINLOG EVENTS in 'mysql-bin.000003'; ``` ``` ## XA的binlog | mysql-bin.000003 | 1967 | Anonymous_Gtid | 1 | 2032 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | mysql-bin.000003 | 2032 | Query | 1 | 2138 | XA START X'31353835363338363233',X'',1 | | mysql-bin.000003 | 2138 | Table_map | 1 | 2190 | table_id: 108 (hade1.user) | | mysql-bin.000003 | 2190 | Update_rows | 1 | 2252 | table_id: 108 flags: STMT_END_F | | mysql-bin.000003 | 2252 | Query | 1 | 2356 | XA END X'31353835363338363233',X'',1 | | mysql-bin.000003 | 2356 | XA_prepare | 1 | 2402 | XA PREPARE X'31353835363338363233',X'',1 | | mysql-bin.000003 | 2402 | Anonymous_Gtid | 1 | 2467 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | mysql-bin.000003 | 2467 | Query | 1 | 2574 | XA COMMIT X'31353835363338363233',X'',1 ## 非xa的事務 | mysql-bin.000003 | 2574 | Anonymous_Gtid | 1 | 2639 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' | | mysql-bin.000003 | 2639 | Query | 1 | 2712 | BEGIN | | mysql-bin.000003 | 2712 | Table_map | 1 | 2764 | table_id: 108 (hade1.user) | | mysql-bin.000003 | 2764 | Update_rows | 1 | 2826 | table_id: 108 flags: STMT_END_F | | mysql-bin.000003 | 2826 | Xid | 1 | 2857 | COMMIT /* xid=67 */ ``` 我們很明顯可以看到兩階段提交中是有兩個GTID的,生成一個GTID就代表內部生成一個事務,所以第一個階段prepare結束之後,第二個階段commit的時候就持久化了第一個階段的內容,並且生成了第二個事務。當commit失敗的時候,最多就是第二個事務丟失,第一個事務實際上已經儲存起來了了(只是還沒commit)。 而非xa的事務,只有一個GTID,在commit之前任意一個階段出現問題,整個事務就全部丟失,無法找回了。所以這就是mysql xa命令的機制。 # 總結 看了一些資料,原來mysql從5.7之後才真正實現了兩階段的xa。當然這個兩階段方式在真實的工程中的使用其實很少的,xa的第一定律是避免使用xa。工程中會有很多方式來避免這種分庫的事務情況。 不過,不妨礙掌握了mysql的xa,在一些特定的場合,我們也能完美解決