1. 程式人生 > 其它 >分散式事務一站式解決方案與實現

分散式事務一站式解決方案與實現

1 本地事務

1.1 事務的概述

事務指邏輯上的一組操作,組成這組操作的各個單元,要麼全部成功,要麼全部不成功。從而確保了資料的準確與
安全。

1.2 事務的四大特性

  • 原子性(Atomicity)
    原子性是指事務是一個不可分割的工作單位,事務中的操作要麼都發生,要麼都不發生。
  • 一致性(Consistency)
    事務必須使資料庫從一個一致性狀態變換到另外一個一致性狀態。
    例如轉賬前A有1000,B有1000。轉賬後A+B也得是2000。
  • 隔離性(Isolation)
    事務的隔離性是多個使用者併發訪問資料庫時,資料庫為每一個使用者開啟的事務,每個事務不能被其他事務的運算元
    據所幹擾,多個併發事務之間要相互隔離。
  • 永續性(Durability)
    永續性是指一個事務一旦被提交,它對資料庫中資料的改變就是永久性的,接下來即使資料庫發生故障也不應該對
    其有任何影響。

1.3 事務的隔離級別

不考慮事務隔離級別會在資料併發讀寫的時候出現以下情況(以下情況全是錯誤):
髒讀:一個執行緒中的事務讀到了另一個執行緒中未提交的資料;

不可重複讀:一個執行緒中的事務讀到了另外一個執行緒中已經提交的update的資料(前後內容不一樣)

虛讀(幻讀):一個執行緒中的事務讀到了另外一個執行緒中已經提交的insert的資料(前後條數不一樣)

資料庫共定義了四種隔離級別

  • Serializable(序列化)
    提供嚴格的事務隔離。它要求事務序列化執行,事務只能一個接著一個地執行,但不能併發執行。如果僅僅通過“行級鎖”是無法實現事務序列化的,必須通過其他機制保證新插入的資料不會被剛執行查詢操作的事務訪問到。序列化是最高的事務隔離級別,同時代價也花費最高,效能很低,一般很少使用,在該級別下,事務順序執行,不僅可以避免髒讀、不可重複讀,還避免了幻像讀。
    隔離級別最高
  • Repeatable read(可重複讀)
    可重複讀是指在一個事務內,多次讀同一資料。在這個事務還沒有結束時,另外一個事務也訪問該同一資料。那麼,在第一個事務中的兩次讀資料之間,即使第二個事務對資料進行修改,第一個事務兩次讀到的的資料是一樣的。這樣就發生了在一個事務內兩次讀到的資料是一樣的,因此稱為是可重複讀。讀取資料的事務將會禁止寫事務(但允許讀事務),寫事務則禁止任何其他事務。這樣避免了不可重複讀取和髒讀,但是有時可能出現幻象讀。(讀取資料的事務)這可以通過“共享讀鎖”和“排他寫鎖”實現。
    隔離級別第二
  • Read committed(讀已提交)
    讀取資料的事務允許其他事務繼續訪問該行資料,但是未提交的寫事務將會禁止其他事務訪問該行。該隔離級別避免了髒讀,但是卻可能出現不可重複讀。事務A事先讀取了資料,事務B緊接了更新了資料,並提交了事務,而事務A再次讀取該資料時,資料已經發生了改變。
    隔離級別第三
  • Read uncommitted(讀未提交)
    如果一個事務已經開始寫資料,則另外一個事務則不允許同時進行寫操作,但允許其他事務讀此行資料。該隔離級別可以通過“排他寫鎖”實現。這樣就避免了更新丟失,卻可能出現髒讀。以上情況均無法保證。(讀未提交) 最低 。
    隔離最低級別

注意:級別依次升高,效率依次降低
MySQL的預設隔離級別是:REPEATABLE READ。(Oracle的預設是:READ COMMITTED)

2 分散式事務

2.1 分散式事務概念

分散式事務是由本地事務演變而來的。分散式事務即組成事務的各個單元處於不同資料庫伺服器上。比如,電商系統中的生成訂單,賬戶扣款,減少庫存,增加會員積分等等,他們就是組成事務的各個單元,它們要麼全部發生,要麼全部不發生,從而保證最終一致,資料準確。

2.2 分散式事務解決方案分類

剛性事務
剛性事務指的就是遵循本地事務四大特性(ACID)的強一致性事務。它的特點就是強一致性,要求組成事務的各個 單元馬上提交或者馬上回滾,沒有時間彈性,要求以同步的方式執行。通常在單體架構專案中應用較多,一般都是 企業級應用(或者區域網應用)。例如:生成合同時記錄日誌,付款成功後生成憑據等等。但是,在時下流行的互 聯網專案中,這種剛性事務來解決分散式事務就會有很多的弊端。其中最明顯的或者說最致命的就是效能問題。如 下圖所示:

因為某個參與者不能自己提交事務,必須等待所有參與者執行OK了之後,一起提交事務,那麼事務鎖住的時間就 變得非常長,從而導致效能非常低下。

柔性事務
柔性事務是針對剛性事務而說的,我們剛才分析了剛性事務,它有兩個特點,第一個強一致性,第二個近實時性 (NRT)。而柔性事務的特點是不需要立刻馬上執行(同步性),且不需要強一致性。它只要滿足基本可用和最終一致就可以了。要想真正的明白,需要從BASE理論和CAP理論說起。

剛性事務和柔性事務比較

事務型別 時間要求 一致性要求 應用型別 應用場景
剛性事務 立即 強一致性 企業級應用(單體架構) 訂單/訂單項/日誌
柔性事務 有時間彈性 最終一致性 網際網路應用(分散式架構) 訂單/支付/庫存

3 分散式事務解決方案

3.1 二階段提交(2PC)

將一個分散式的事務過程拆分成兩個階段: 準備階段事務提交 。為了讓整個資料庫叢集能夠正常的執行,該協議指定了一個 協調者 單點,用於協調整個資料庫叢集各節點的執行。為了簡化描述,我們將資料庫叢集中的各個節點稱為 參與者,三階段提交協議中同樣包含協調者和參與者這兩個角色定義。

如果第一階段執行失敗進入回滾,如下圖:

兩階段提交缺點:事務協調者存在單點故障;第一階段存在同步阻塞,當事務參與者過多效率較低;

3.2 三階段提交協議(3PC)

針對兩階段提交存在的問題,三階段提交協議通過引入一個 預詢盤 階段,以及超時策略來減少整個叢集的阻塞時間,提升系統性能。三階段提交的三個階段分別為:預詢盤(can_commit)預提交(pre_commit),以及事務提交(do_commit)

如果事務執行失敗進入回滾,如下圖;

兩階段提交協議中所存在的長時間阻塞狀態發生的機率還是非常低的,所以雖然三階段提交協議相對於兩階段提交協議對於資料強一致性更有保障,但是因為效率問題,兩階段提交協議在實際系統中反而更加受寵。

3.3 TCC+補償型方案(3PC)

TCC分別指的是Try,Confirm,Cancel。它是補償型分散式事務解決方案。何為補償呢?其實我們把TCC這3個部 分分別做什麼捋清楚,就很容理解了。首先,我們先來看下它們的主要作用:
Try 階段主要是對業務系統做檢測及資源預留。
Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,預設 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。
在Try階段進行嘗試提交事務,當Try執行OK了,Confirm執行,且預設認為它一定 成功。但是當Try提交失敗了,則由Cancel處理回滾和資源釋放。

TCC事務的處理流程與2PC兩階段提交做比較,首先TCC是柔性事務,只要符合最終一致性即可。而2PC是剛性事 務,它是強一致性的,在任何一個分散式階段沒有返回執行成功或失敗的結果時,其事務一直會處於等待狀態。並 且2PC是利用DTP模型和XA規範,要求資料庫支援XA規範,且通常都是在跨庫的DB層面。

而TCC則在應用層面的處理,需要通過自己編寫邏輯程式碼來實現補償。它的優勢在於,可以讓應用自己定義資料操作的粒度,使得降低鎖衝突、提高吞吐量成為可能。而不足之處則在於對應用的侵入性非常強,業務邏輯的每個分支都需要實現try、confirm、cancel三個操作。此外,其實現難度也比較大,需要按照網路狀態、系統故障等不同 的失敗原因實現不同的回滾策略。

3.4 最終一致性方案

3.4.1 本地訊息表

這種實現方式應該是業界使用最多的,其核心思想是將分散式事務拆分成本地事務進行處理,這種思路是來源於 ebay。它和MQ事務訊息的實現思路都是一樣的,都是利用MQ通知不同的服務實現事務的操作。不同的是,針對 訊息佇列的信任情況,分成了兩種不同的實現。本地訊息表它是對訊息佇列的穩定性處於不信任的態度,認為訊息 可能會出現丟失,或者訊息佇列的執行網路會出現阻塞,於是在資料庫中建立一張獨立的表,用於存放事務執行的 狀態,配合訊息佇列實現事務的控制。
優點: 一種非常經典的實現,避免了分散式事務,實現了最終一致性。
缺點: 訊息表會耦合到業務系統中,如果沒有封裝好的解決方案,會有很多雜活需要處理。

3.4.2 MQ事務訊息

有一些第三方的MQ是支援事務訊息的,比如RocketMQ,ActiveMQ,他們支援事務訊息的方式也是類似於採用的 二階段提交。但是有一些常用的MQ也不支援事務訊息,比如 RabbitMQ 和 Kafka 都不支援。
以阿里的 RocketMQ 中介軟體為例,其思路大致為: 第一階段Prepared訊息,會拿到訊息的地址。 第二階段執行本地事務。 第三階段通過第一階段拿到的地址去訪問訊息,並修改狀態。
也就是說在業務方法內要想訊息佇列提交兩次請求,一次傳送訊息和一次確認訊息。如果確認訊息傳送失敗了 RocketMQ會定期掃描訊息叢集中的事務訊息,這時候發現了Prepared訊息,它會向訊息傳送者確認,所以生產方 需要實現一個check介面,RocketMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。這樣就保證 了訊息傳送與本地事務同時成功或同時失敗。下圖描述了它的工作原理:
正常情況

異常情況

本地訊息表與MQ訊息表對比

分類 共同點 優勢 弊端
本地訊息表 都需要自己寫業務補償 一種非常經典的實現,避免了分散式事務,實現了最終一致性。 訊息表會耦合到業務系統中,如果沒有封裝好的解決方案,會有很多雜活需要處理
MQ事務訊息 都需要自己寫業務補償 實現了最終一致性,不需要依賴 本地資料庫事務。 用訊息佇列的方式實現分散式事 務,效率較高 目前主流MQ中有ActiveMQ RocketMQ支援 事務訊息 實現難度較大,和業務耦合比較緊密

4 分散式事務實現

4.1 atomikos實現二階段提交

atomikos一套開源的,JTA規範的實現。它是基於二階段提交思想實現分散式事務的控制。是分散式剛性事務的一 種解決方案。在當下網際網路開發中選擇此種解決方案的場景也不是很多了。實現原理參考IBM社群
場景說明
一個企業級應用專案:進銷存系統。系統要對針對庫存記錄訪問日誌。並且,庫存系統資料庫和日誌資料庫不是同
一個數據庫。程式碼詳見GitHub
服務拓撲圖

測試程式碼

@Slf4j
@Service
@Transactional(rollbackOn = Exception.class)
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private LogMapper logMapper;

    @Override
    public void addOrder(OrderInfo orderInfo) {
        int insert = orderMapper.insertOrderInfo(orderInfo);
        log.info("order庫新增sql執行條數:{}",insert);

        //測試1 異常回滾
        int i=1/0;

        LogInfo logInfo=new LogInfo();
        logInfo.setId((new Random().nextInt()));
        logInfo.setCreateTime(new Date());
        logInfo.setContent(orderInfo.toString());
        int insert1 = logMapper.insertLogInfo(logInfo);
        log.info("logs庫新增sql執行條數:{}",insert1);

        //測試2 異常回滾
//        int i=1/0;
    }
}

測試結果
測試1:業務執行前出現異常,資料庫未進行插入操作,資料庫無資料。

測試2:業務執行後出現異常
異常發生前控制檯日誌顯示插入成功,但是資料庫中並沒有資料,當異常出現後,實際資料從頭到尾並沒有提交

測試3:無異常情況,logs和order資料庫表中均有資料

二階段提交總結
二階段提交作為早期分散式事務的解決方案,逐漸的淡出了主流方案的圈子。這裡面其最重要的原因就是它是剛性事務,即需要滿足強一致性。它的優點就是可以在多資料庫間實現事務控制,而擺脫單一資料庫使用事務的宿命。但是阻塞式這個缺點確是致命的,因為參與全域性事務的資料庫被動聽從事務管理器的命令,執行或放棄事務,如果執行事務管理器的機器宕機,那整個系統就不能用了。當然,在極端情況下還可能同時影響其他系統,如果事務管理器掛了,但是這個資料庫的表鎖還沒釋放,因為資料庫還在等待事務管理器的命令,因此,使用這個資料庫的其他應用也會收到影響。

4.2 RocketMQ實現分散式事務

事務訊息,它是訊息佇列中一種特殊的訊息型別,只不過不是所有的訊息佇列產品都支援事務訊息。目前支援事務 訊息的佇列一個是阿里的RocketMQ(已經被apache收錄),一個是ActiveMQ。
RocketMQ事務訊息(Transactional Message)是指應用本地事務和傳送訊息操作可以被定義到全域性事務中,要 麼同時成功,要麼同時失敗。RocketMQ的事務訊息提供類似 X/Open XA 的分佈事務功能,通過事務訊息能達到 分散式事務的最終一致。
此處我們需要明確一件事,分散式事務和事務訊息兩者並沒有關係,事務訊息僅僅保證本地事務和MQ訊息傳送形 成整體的原子性,而投遞到MQ伺服器後,消費者是否能一定消費成功是無法保證的

4.2.1 RocketMQ事務訊息執行流程

完整流程

事務執行正常流程

本地事務執行異常流程

事務回查

4.2.2 案例場景

以訂單、支付為例,在下單成功後,馬上緊跟著的就是需要付款。只有付款成功了之後,訂單的狀態才會改為已付款,進而繼續走出庫,發貨,物流等等的流程,而如果訂單遲遲不付款的話,超過一個時限之後就自動關閉了。下圖描述了場景的業務流程:

4.2.3 核心程式碼

支付服務
PayController

@RestController
public class PayController {

    @Resource
    private TransactionListener transactionListener;


    @RequestMapping(value = "/pay/updateOrder", method = RequestMethod.POST)
    public String payOrder(@RequestParam("payid") String id, @RequestParam("ispay") int ispay) {
        try {
            //1.建立訊息的生產者
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("txmessage_trans-client-group");
            //2.指定伺服器地址nameserver
            transactionMQProducer.setNamesrvAddr("127.0.0.1:9876");
            //3.設定訊息回查的監聽器
            transactionMQProducer.setTransactionListener(transactionListener);
            //4.建立訊息物件
            Message message = new Message("txmessage_topic", 
                                                             "txmessage_tags", 
                                                            "txmessage_keys", 
                                                            "txmessage_事務訊息".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //啟動生產者
            transactionMQProducer.start();
            //5.準備資料
            Map<String,Object> payArgs = new HashMap<>();
            payArgs.put("id",id);
            payArgs.put("ispay",ispay);
            //6.傳送訊息
            transactionMQProducer.sendMessageInTransaction(message,payArgs);
            //7.釋放資源(關閉訊息傳送者)
            transactionMQProducer.shutdown();
        }catch (Exception e){
            e.printStackTrace();
            return "傳送訊息給mq失敗";
        }
        //如果沒有問題,
        return "傳送訊息給mq成功";
    }
}

PayTransactionListener

@Component
public class PayTransactionListener implements TransactionListener {

    /**
     * 對於MQ來說,它就3個狀態  1正在執行  2執行成功  3執行失敗
     * 只有在執行成功時(事務狀態是Commit時)才傳送訊息
     * 如果執行失敗了(事務狀態是rollback),就不再發送訊息,同時會把half(收取支付憑證)訊息刪除。
     */
    //用於儲存事務的唯一標識和事務的執行狀態
    private ConcurrentHashMap<String,Integer> transMap = new ConcurrentHashMap<>();
    
    @Autowired
    private PayService payService;

    /**
     * 獲取本地事務執行的狀態(執行本地事務,返回執行結果)
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        //1.獲取當前事務訊息的唯一標識
        String transactionId = message.getTransactionId();
        //2.設定事務的執行狀態是正在執行
        transMap.put(transactionId,1);
        //3.獲取引數
        Map<String,Object> payArgs = (Map<String,Object>)o;
        String id = (String)payArgs.get("id");
        Integer ispay = (Integer)payArgs.get("ispay");

        try {
            //4.執行更新支付狀態
            System.out.println("支付狀態更新開始");
            Pay pay = new Pay();
            pay.setId(id);
            pay.setIspay(ispay);
            payService.update(pay);
            System.out.println("支付狀態更新成功");
//            int i=1/0;
            //5.記錄執行狀態
            transMap.put(transactionId,2);
        }catch (Exception e){
            e.printStackTrace();
            //設定事務狀態
            transMap.put(transactionId,3);
            //返回本地事務狀態為Rollback
            System.out.println("本地事務執行的結果是"+LocalTransactionState.ROLLBACK_MESSAGE);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        System.out.println("本地事務執行的結果是"+LocalTransactionState.COMMIT_MESSAGE);
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    /**
     * 回查本地事務執行狀態
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        //1.獲取當前事務訊息的唯一標識
        String transactionId = messageExt.getTransactionId();
        //2.根據事務id,從map中獲取事務執行的狀態
        Integer state = transMap.get(transactionId);
        LocalTransactionState localTransactionState = null;
        //3.判斷事務狀態執行的結果
        switch (state){
            case 2:
                //執行成功
                System.out.println("本地事務執行的結果是"+LocalTransactionState.COMMIT_MESSAGE);
                localTransactionState = LocalTransactionState.COMMIT_MESSAGE;
                break;
            case 3:
                //執行失敗
                System.out.println("本地事務執行的結果是"+LocalTransactionState.ROLLBACK_MESSAGE);
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            case 1:
                //正在執行
                System.out.println("本地事務執行的結果是"+LocalTransactionState.UNKNOW);
                localTransactionState = LocalTransactionState.UNKNOW;
                break;
             default:
                 return null;
        }
        return localTransactionState;
    }
}

訂單服務

@SpringBootApplication
public class OrderApplication {

    public static void main(String[] args)throws Exception {
        //啟動方法
        SpringApplication.run(OrderApplication.class);

        //1.建立事務訊息的消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txmessage_trans-client-group");
        //2.設定nameserver的地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //3.設定單次消費的訊息數量
        consumer.setConsumeMessageBatchMaxSize(5);
        //4.設定訊息消費順序
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //5.設定監聽訊息的資訊topic和tags
        consumer.subscribe("txmessage_topic","txmessage_tags");
        //6.編寫監聽訊息的監聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt  messageExt : msgs){
                        //取出訊息內容
                        String topic = messageExt.getTopic();
                        String tags = messageExt.getTags();
                        String keys = messageExt.getKeys();
                        String msg = new String(messageExt.getBody(),"UTF-8");
                        String transactionId = messageExt.getTransactionId();
                        System.out.println("獲取到的訊息:topic="+topic+",tags="+tags+",keys="+keys+",transactionId="+transactionId+",message="+msg);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //啟動消費者
        System.out.println("啟動消費者");
        consumer.start();
    }
}

4.3 Seata實現分散式事務

由於篇幅有限,詳情檢視 《Seata實現分散式事務》