使用golang理解mysql的兩階段提交
阿新 • • 發佈:2020-04-07
# 使用golang理解mysql的兩階段提交
文章源於一個問題:如果我們現在有兩個mysql例項,在我們要儘量簡單地完成分散式事務,怎麼處理?
# 場景重現
比如我們現在有兩個資料庫,mysql3306和mysql3307。這裡我們使用docker來建立這兩個例項:
```bash
# mysql3306建立命令
docker run -d -p 3306:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3306.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3306:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7
# msyql3306的配置:
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
# mysql3307建立命令
docker run -d -p 3307:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3307.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3307:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7
# msyql3307的配置:
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
server-id = 2
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
```
在mysql3306中
我們有一個user表
```sql
create table user (
id int,
name varchar(10),
score int
);
insert into user values(1, "foo", 10)
```
在mysql3307中,我們有一個wallet表。
```sql
create table wallet (
id int,
money float
);
insert into wallet values(1, 10.1)
```
我們可以看到,id為1的使用者初始分數(score)為10,而它的錢,在wallet中初始錢(money)為10.1。
現在假設我們有一個操作,需要對這個使用者進行操作:每次操作增加分數2,並且增加錢數1.2。
這個操作需要很強的一致性。
# 思考
## 兩階段提交
這裡是一個分散式事務的概念,我們可以使用2PC的方法進行保證事務
![20200331161038](http://tuchuang.funaio.cn/md/20200331161038.png)
2PC的概念如圖所示,引入一個資源協調者的概念,由這個資源協調者進行事務協調。
第一階段,由這個資源協調者對每個mysql例項呼叫prepare命令,讓所有的mysql例項準備好,如果其中由mysql例項沒有準備好,協調者就讓所有例項呼叫rollback命令進行回滾。如果所有mysql都prepare完成,那麼就進入第二階段。
第二階段,資源協調者讓每個mysql例項都呼叫commit方法,進行提交。
mysql裡面也提供了分散式事務的語句XA。
## 用單個例項的事務行不行
等等,這個兩階段提交和我們的事務感覺也差不多,都是進行一次開始,然後執行,最後commit,mysql為什麼還要專門定義一個xa的命令呢?於是我陷入了思考...
思考不如實操,於是我用golang寫了一個使用mysql的事務實現的“兩階段提交”:
```go
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連線
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連線
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
tx1, err := db1.Begin()
if err != nil {
panic(errors.WithStack(err))
}
tx2, err := db2.Begin()
if err != nil {
panic(errors.WithStack(err))
}
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
tx1.Rollback()
tx2.Rollback()
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// DML操作
if _, err = tx1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = tx2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("commit before error"))
// commit
fmt.Println("=== call commit ====")
err = tx1.Commit()
if err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("commit db2 before error"))
err = tx2.Commit()
if err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
```
我這裡已經非常小心地在defer中recover錯誤資訊,並且執行了rollback命令。
如果我在commit命令之前的任意一個地方呼叫了`panic(errors.New("commit before error"))` 那麼命令就會進入到了rollback這裡,就會把兩個例項的事務都進行回滾。
![20200331162451](http://tuchuang.funaio.cn/md/20200331162451.png)
通過結果我們可以看到,分數和錢數都沒有改變。這個是ok的。
但是如果我在db2的commit之前觸發了panic,那麼這個命令進入到了rollback中,但是db1已經commit了,db2還沒有commit,這個時候會出現什麼情況?
![20200331162723](http://tuchuang.funaio.cn/md/20200331162723.png)
非常可惜,我們看到了這裡的score增長了,但是money沒有增長,這個就說明無法做到事務一致性了。
## 回到mysql的xa
那麼還要回歸到2PC,mysql為2PC的實現增加了xa命令,那麼使用這個命令我們能不能避免這個問題呢?
同樣,我用golang寫了一個使用xa命令的程式碼
```go
package main
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連線
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連線
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 啟動
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
```
首先看成功的情況:
![20200331164057](http://tuchuang.funaio.cn/md/20200331164057.png)
一切完美。
如果我們在prepare階段丟擲panic,那麼結果如下:
![20200331164425](http://tuchuang.funaio.cn/md/20200331164425.png)
證明在第一階段出現異常是可以回滾的。
但是如果我們在commit階段丟擲panic:
![20200331164533](http://tuchuang.funaio.cn/md/20200331164533.png)
我們發現,這裡的分數增加了,但是money卻沒有增加。
那麼這個xa和單個事務有什麼區別呢?我又陷入了深深的沉思...
## xa的用法不對
經過在技術群(全棧神盾局)請教,討論之後,發現這裡對2pc的兩個階段理解還沒到位,這裡之所以分為兩個階段,是強調的是每個階段都會持久化,就是第一個階段完成了之後,每個mysql例項就把第一個階段的請求例項化了,這個時候不管是mysql例項停止了還是其他問題,每次重啟的時候都會重新回覆這個commit。
我們把這個程式碼的rollback去掉,假設commit必須成功。
```go
package main
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連線
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連線
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
// db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
// db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 啟動
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
```
![20200331165500](http://tuchuang.funaio.cn/md/20200331165500.png)
這個時候,我們停掉程式(停掉mysql的連結),使用`xa recover`可以發現,db2的xa事務還留在db2中了。
![20200331165622](http://tuchuang.funaio.cn/md/20200331165622.png)
我們在控制檯直接呼叫`xa commit '1585644880'` 還能繼續把這個xa事務進行提交。
![20200331165742](http://tuchuang.funaio.cn/md/20200331165742.png)
這下money就進行了提交,又恢復了一致性。
所以呢,我琢磨了一下,我們寫xa的程式碼應該如下:
```go
package main
import (
"database/sql"
"fmt"
"log"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連線
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連線
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 啟動
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
// TODO: 嘗試重新提交COMMIT
// TODO: 如果還失敗,記錄xid,進入資料恢復邏輯,等待資料庫恢復重新提交
log.Println("xid:" + xid)
}
// panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
log.Println("xid:" + xid)
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
```
就是第二階段的commit,我們必須設定它一定會“成功”,如果有不成功的情況,那麼就需要記錄下不成功的xid,有一個數據恢復邏輯,重新commit這個xid。來保證最終一致性。
## binlog
其實我們使用binlog也能看出一些端倪
```
# 這裡的mysql-bin.0003替換成為你當前的log
SHOW BINLOG EVENTS in 'mysql-bin.000003';
```
```
## XA的binlog
| mysql-bin.000003 | 1967 | Anonymous_Gtid | 1 | 2032 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2032 | Query | 1 | 2138 | XA START X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2138 | Table_map | 1 | 2190 | table_id: 108 (hade1.user) |
| mysql-bin.000003 | 2190 | Update_rows | 1 | 2252 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000003 | 2252 | Query | 1 | 2356 | XA END X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2356 | XA_prepare | 1 | 2402 | XA PREPARE X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2402 | Anonymous_Gtid | 1 | 2467 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2467 | Query | 1 | 2574 | XA COMMIT X'31353835363338363233',X'',1
## 非xa的事務
| mysql-bin.000003 | 2574 | Anonymous_Gtid | 1 | 2639 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2639 | Query | 1 | 2712 | BEGIN |
| mysql-bin.000003 | 2712 | Table_map | 1 | 2764 | table_id: 108 (hade1.user) |
| mysql-bin.000003 | 2764 | Update_rows | 1 | 2826 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000003 | 2826 | Xid | 1 | 2857 | COMMIT /* xid=67 */
```
我們很明顯可以看到兩階段提交中是有兩個GTID的,生成一個GTID就代表內部生成一個事務,所以第一個階段prepare結束之後,第二個階段commit的時候就持久化了第一個階段的內容,並且生成了第二個事務。當commit失敗的時候,最多就是第二個事務丟失,第一個事務實際上已經儲存起來了了(只是還沒commit)。
而非xa的事務,只有一個GTID,在commit之前任意一個階段出現問題,整個事務就全部丟失,無法找回了。所以這就是mysql xa命令的機制。
# 總結
看了一些資料,原來mysql從5.7之後才真正實現了兩階段的xa。當然這個兩階段方式在真實的工程中的使用其實很少的,xa的第一定律是避免使用xa。工程中會有很多方式來避免這種分庫的事務情況。
不過,不妨礙掌握了mysql的xa,在一些特定的場合,我們也能完美解決