1. 程式人生 > 其它 >本地事務與分散式事務

本地事務與分散式事務

本地事務與分散式事務

一、本地事務

1、事務的基本性質

資料庫事務的幾個特性:原子性(Atomicity )、一致性( Consistency )、隔離性或獨立性( Isolation)和永續性(Durabilily),簡稱就是 ACID;

  • 原子性:一系列的操作整體不可拆分,要麼同時成功,要麼同時失敗

  • 一致性:資料在事務的前後,業務整體一致。

    • 轉賬 A:1000 B:1000 轉 200 事務成功 A:800 B:1200
  • 隔離性:事務之間互相隔離。

  • 永續性:一旦事務成功,資料一定會落盤在資料庫。

在以往的單體應用中,我們多個業務操作使用同一條連線操作不同的資料表,一旦有異常, 我們可以很容易的整體回滾;

Business:我們具體的業務程式碼

Storage:庫存業務程式碼;扣庫存

Order:訂單業務程式碼;儲存訂單

Account:賬號業務程式碼;減賬戶餘額

比如買東西業務,扣庫存,下訂單,賬戶扣款,是一個整體;必須同時成功或者失敗

一個事務開始,代表以下的所有操作都在同一個連線裡面;

2、事務的隔離級別

  • lREAD UNCOMMITTED(讀未提交)

    該隔離級別的事務會讀到其它未提交事務的資料 ,此現象也稱之為髒讀。

  • lREAD COMMITTED(讀已提交)

    一個事務可以讀取另一個已提交的事務,多次讀取會造成不一樣的結果,此現象稱為不可重複讀問題,Oracle 和 SQL Server 的預設隔離級別。

  • lREPEATABLE READ(可重複讀)

    該隔離級別是 MySQL 預設的隔離級別,在同一個事務裡,select 的結果是事務開始時時間點的狀態,因此,同樣的 select 操作讀到的結果會是一致的,但是,會有幻讀現象。MySQL 的 InnoDB 引擎可以通過 next-key locks 機制(參考下文"行鎖的演算法"一節)來避免幻讀。

  • lSERIALIZABLE(序列化)

    在該隔離級別下事務都是序列順序執行的,MySQL 資料庫的 InnoDB 引擎會給讀操作隱式 加一把讀共享鎖,從而避免了髒讀、不可重讀復讀和幻讀問題。

3、事務的傳播行為

  • 1、PROPAGATION_REQUIRED:如果當前沒有事務,就建立一個新事務,如果當前存在事務, 就加入該事務,該設定是最常用的設定。

  • 2、PROPAGATION_SUPPORTS:支援當前事務,如果當前存在事務,就加入該事務,如果當 前不存在事務,就以非事務執行。

  • 3、PROPAGATION_MANDATORY:支援當前事務,如果當前存在事務,就加入該事務,如果 當前不存在事務,就丟擲異常。

  • 4、PROPAGATION_REQUIRES_NEW:建立新事務,無論當前存不存在事務,都建立新事務。

  • 5、PROPAGATION_NOT_SUPPORTED:以非事務方式執行操作,如果當前存在事務,就把當 前事務掛起。

  • 6、PROPAGATION_NEVER:以非事務方式執行,如果當前存在事務,則丟擲異常。

  • 7、PROPAGATION_NESTED:如果當前存在事務,則在巢狀事務內執行。如果當前沒有事務, 則執行與 PROPAGATION_REQUIRED 類似的操作。

4、SpringBoot 事務關鍵點

1、事務的自動配置

TransactionAutoConfiguration

2、事務的坑

在同一個類裡面,編寫兩個方法,內部呼叫的時候,會導致事務設定失效。原因是沒有用到代理物件的緣故。 解決:

  • 0)、匯入 spring-boot-starter-aop

  • 1)、@EnableTransactionManagement(proxyTargetClass = true) 2)、@EnableAspectJAutoProxy(exposeProxy=true)

  • 3)、AopContext.currentProxy() 呼叫方法

二、分散式事務

1、為什麼有分散式事務

分散式系統經常出現的異常

機器宕機、網路異常、訊息丟失、訊息亂序、資料錯誤、不可靠的 TCP、儲存資料丟失...

分散式事務是企業整合中的一個技術難點,也是每一個分散式系統架構中都會涉及到的一個 東西,特別是在微服務架構中,幾乎可以說是無法避免。

2、CAP 定理與 BASE 理論

1、CAP 定理

CAP 原則又稱 CAP 定理,指的是在一個分散式系統中

  • 一致性(Consistency):

    • 在分散式系統中的所有資料備份,在同一時刻是否同樣的值。(等同於所有節點訪問同一份最新的資料副本)
  • 可用性(Availability)

    • 在叢集中一部分節點故障後,叢集整體是否還能響應客戶端的讀寫請求。(對資料 更新具備高可用性)
  • 分割槽容錯性(Partition tolerance)

    • 大多數分散式系統都分佈在多個子網路。每個子網路就叫做一個區(partition)。 分割槽容錯的意思是,區間通訊可能失敗。比如,一臺伺服器放在中國,另一臺服務 器放在美國,這就是兩個區,它們之間可能無法通訊。

CAP 原則指的是,這三個要素最多隻能同時實現兩點,不可能三者兼顧

一般來說,分割槽容錯無法避免,因此可以認為 CAP 的 P 總是成立。CAP 定理告訴我們, 剩下的 C 和 A 無法同時做到。

分散式系統中實現一致性的 raft 演算法、paxos http://thesecretlivesofdata.com/raft/

2、面臨的問題

對於多數大型網際網路應用的場景,主機眾多、部署分散,而且現在的叢集規模越來越大,所 以節點故障、網路故障是常態,而且要保證服務可用性達到 99.99999%(N 個 9),即保證 P 和 A,捨棄 C。

3、BASE 理論

是對 CAP 理論的延伸,思想以採用適當的採取弱一致性是即使無法做到強一致性(CAP 的一致性就是強一致性),但可 ,即最終一致性

BASE 是指

  • 基本可用(Basically Available)

    • 基本可用是指分散式系統在出現故障的時候,允許損失部分可用性(例如響應時間、 功能上的可用性),允許損失部分可用性。需要注意的是,基本可用絕不等價於系 統不可用。
      • 響應時間上的損失:正常情況下搜尋引擎需要在 0.5 秒之內返回給使用者相應的 查詢結果,但由於出現故障(比如系統部分機房發生斷電或斷網故障),查詢 結果的響應時間增加到了 1~2 秒。
      • 功能上的損失:購物網站在購物高峰(如雙十一)時,為了保護系統的穩定性, 部分消費者可能會被引導到一個降級頁面。
  • 軟狀態( Soft State)

    • 軟狀態是指允許系統存在中間狀態,而該中間狀態不會影響系統整體可用性。分佈 式儲存中一般一份資料會有多個副本,允許不同副本同步的延時就是軟狀態的體 現。mysql replication 的非同步複製也是一種體現。
  • 最終一致性( Eventual Consistency)

    • 最終一致性是指系統中的所有資料副本經過一定時間後,最終能夠達到一致的狀 態。弱一致性和強一致性相反,最終一致性是弱一致性的一種特殊情況。

4、強一致性、弱一致性、最終一致性

從客戶端角度,多程序併發訪問時,更新過的資料在不同程序如何獲取的不同策略,決定了 不同的一致性。對於關係型資料庫,要求更新過的資料能被後續的訪問都能看到,這是強一 致性。如果能容忍後續的部分或者全部訪問不到,則是弱一致性。如果經過一段時間後要求 能訪問到更新後的資料,則是最終一致性

3、分散式事務幾種方案

1)、2PC 模式

資料庫支援的 2PC【2 phase commit 二階提交】,又叫做 XA Transactions。 MySQL 從 5.5 版本開始支援,SQL Server 2005 開始支援,Oracle 7 開始支援。 其中,XA 是一個兩階段提交協議,該協議分為以下兩個階段:

第一階段:事務協調器要求每個涉及到事務的資料庫預提交(precommit)此操作,並反映是 否可以提交.

第二階段:事務協調器要求每個資料庫提交資料。

其中,如果有任何一個數據庫否決此次提交,那麼所有資料庫都會被要求回滾它們在此事務 中的那部分資訊。

  • XA 協議比較簡單,而且一旦商業資料庫實現了 XA 協議,使用分散式事務的成本也比較 低。

  • XA 效能不理想,特別是在交易下單鏈路,往往併發量很高,XA 無法滿足高併發場景

  • XA 目前在商業資料庫支援的比較理想,在 mysql 資料庫中支援的不太理想,mysql 的XA 實現,沒有記錄 prepare 階段日誌,主備切換回導致主庫與備庫資料不一致。

  • 許多 nosql 也沒有支援 XA,這讓 XA 的應用場景變得非常狹隘。

  • 也有 3PC,引入了超時機制(無論協調者還是參與者,在向對方傳送請求後,若長時間 未收到迴應則做出相應處理)

2)、柔性事務-TCC 事務補償型方案

剛性事務: ACID 原則,強一致性。 柔性事務:遵遵循循 BASE 理論,最終一致性;

與剛性事務不同,柔性事務允許一定時間內,不同節點的資料不一致,但要求最終一致。

一階段 prepare 行為:呼叫 自定義 的 prepare 邏輯。

二階段 commit 行為:呼叫 自定義 的 commit 邏輯。

二階段 rollback 行為:呼叫 自定義 的 rollback 邏輯。

所謂 TCC 模式,是指支援把 自定義 的分支事務納入到全域性事務的管理中。

3)、柔性事務-最大努力通知型方案

按規律進行通知,不保證資料一定能通知成功,但會提供可查詢操作介面進行核對。這種 方案主要用在與第三方系統通訊時,比如:呼叫微信或支付寶支付後的支付結果通知。這種 方案也是結合 MQ 進行實現,例如:通過 MQ 傳送 http 請求,設定最大通知次數。達到通 知次數後即不再通知。

案例:銀行通知、商戶通知等(各大交易業務平臺間的商戶通知:多次通知、查詢校對、對 賬檔案),支付寶的支付成功非同步回撥

4)、柔性事務-可靠訊息+最終一致性方案(非同步確保型)

實現:業務處理服務在業務事務提交之前,向實時訊息服務請求傳送訊息,實時訊息服務只 記錄訊息資料,而不是真正的傳送。業務處理服務在業務事務提交之後,向實時訊息服務確 認傳送。只有在得到確認傳送指令後,實時訊息服務才會真正傳送。

防止訊息丟失:

1、做好訊息確認機制( pulisher, consumer【 手動 ack】 )
2、每一個傳送的訊息都在資料庫做好記錄。 定期將失敗的訊息再次傳送一遍

CREATE TABLE `mq_message` (
`message_id` char(32) NOT NULL,
`content` text,
`to_exchane` varchar(255) DEFAULT NULL,
`routing_key` varchar(255) DEFAULT NULL,
`class_type` varchar(255) DEFAULT NULL,
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已傳送 2-錯誤抵達 3-已抵達',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

三、使用seata整合分散式事務(上述第一種方案2pc方式)

示例倉庫: seata-samples

2019 年 1 月,阿里巴巴中介軟體團隊發起了開源專案 Fescar(Fast & EaSy Commit And Rollback),和社群一起共建開源分散式事務解決方案。Fescar 的願景是讓分散式事務的使用像本地事務的使用一樣,簡單和高效,並逐步解決開發者們遇到的分散式事務方面的所有難題。

Fescar 開源後,螞蟻金服加入 Fescar 社群參與共建,並在 Fescar 0.4.0 版本中貢獻了 TCC 模式。

為了打造更中立、更開放、生態更加豐富的分散式事務開源社群,經過社群核心成員的投票,大家決定對 Fescar 進行品牌升級,並更名為 Seata,意為:Simple Extensible Autonomous Transaction Architecture,是一套一站式分散式事務解決方案。

Seata 融合了阿里巴巴和螞蟻金服在分散式事務技術上的積累,並沉澱了新零售、雲端計算和新金融等場景下豐富的實踐經驗,但要實現適用於所有的分散式事務場景的願景,仍有很長的路要走。因此,我們決定建立一個完全中立的分散式事務組織,希望更多的企業、開發者能夠加入我們,一起打造 Seata。

歷史:

Ant Financial

XTS:Extended Transaction Service,可擴充套件事務服務。螞蟻金服中介軟體團隊自2007年以來開發了分散式事務中介軟體,廣泛應用於Ant Financial,解決了跨資料庫和服務的資料一致性問題。

DTX:Distributed Transaction Extended。自2013年以來,XTS已在Ant Financial Cloud上釋出,名稱為DTX。

阿里巴巴

TXC:Taobao Transaction Constructor。阿里巴巴中介軟體團隊自2014年起啟動該專案,以解決因應用程式架構從微控制器改為微服務而導致的分散式事務問題。

GTS:Global Transaction Service。 TXC作為Aliyun中介軟體產品,新名稱GTS自2016年起釋出。

Fescar:我們從2019年開始基於TXC / GTS開源開源專案Fescar,以便在未來與社群密切合作。

Seata社群

Seata:簡單的可擴充套件自治交易架構。 Ant Financial加入Fescar,使其成為一個更加中立和開放的分散式服務社群,並將Fescar更名為Seata。

3.1. 結構

Seata有3個基本元件:

  • Transaction Coordinator(TC):事務協調器,維護全域性事務的執行狀態,負責協調並驅動全域性事務的提交或回滾。

  • Transaction Manager(TM):事務管理器,控制全域性事務的邊界,負責開啟一個全域性事務,並最終發起全域性提交或全域性回滾的決議。

  • Resource Manager(RM):資源管理器,控制分支事務,負責分支註冊、狀態彙報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。

全域性事務與分支事務:

a Distributed Transaction is a Global Transaction which is made up with a batch of Branch Transaction, and normally Branch Transaction is just Local Transaction.

Seata管理分散式事務的典型生命週期:

  • TM 向 TC 申請開啟一個全域性事務,全域性事務建立成功並生成一個全域性唯一的 XID。
  • XID 在微服務呼叫鏈路的上下文中傳播。
  • RM 向 TC 註冊分支事務,將其納入 XID 對應全域性事務的管轄。
  • TM 向 TC 發起針對 XID 的全域性提交或回滾決議。
  • TC 排程 XID 下管轄的全部分支事務完成提交或回滾請求。

至此,seata的協議機制總體上看與 XA 是一致的。但是是有差別的:

XA 方案的 RM 實際上是在資料庫層,RM 本質上就是資料庫自身(通過提供支援 XA 的驅動程式來供應用使用)。

而 Fescar 的 RM 是以二方包的形式作為中介軟體層部署在應用程式這一側的,不依賴於資料庫本身對協議的支援,當然也不需要資料庫支援 XA 協議。這點對於微服務化的架構來說是非常重要的:應用層不需要為本地事務和分散式事務兩類不同場景來適配兩套不同的資料庫驅動。

這個設計,剝離了分散式事務方案對資料庫在 協議支援 上的要求。

3.2. 快速入門

中文文件:https://seata.io/zh-cn/docs/user/quickstart.html

建立 UNDO_LOG 表

SEATA AT 模式需要 UNDO_LOG

-- 注意此處0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

下載、解壓、更改、啟動seata伺服器

https://github.com/seata/seata/releases,下載伺服器軟體包,將其解壓縮。

解壓後可以更改registry.conf註冊中心相關配置,指定註冊中心type=nacos,預設配置檔案是檔案來配,也可以指定用nacos配置中心,如:

雙擊seata-server.bat啟動

匯入依賴

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>

加入配置檔案

所有想要用到分散式事務的微服務使用seata DatasourceProxy代理自己的資料來源

package com.atguigu.gulimall.order.config;

import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;

/**
 * <p>Title: MySeataConfig</p>
 * Description:
 * date:2020/7/3 14:04
 */
@Configuration
public class MySeataConfig {

	@Bean
	public DataSource dataSource(DataSourceProperties dataSourceProperties){
		HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
		if(StringUtils.hasText(dataSourceProperties.getName())){
			dataSource.setPoolName(dataSourceProperties.getName());
		}
		return new DataSourceProxy(dataSource);
	}
}

匯入檔案

每個微服務,都必須匯入registry.cof file.conf(conf目錄下)

file.conf 的 service.vgroup_mapping 配置必須和spring.application.name一致

file.conf

更改後

org.springframework.cloud:spring-cloud-starter-alibaba-seataorg.springframework.cloud.alibaba.seata.GlobalTransactionAutoConfiguration 類中,預設會使用 ${spring.application.name}-fescar-service-group作為服務名註冊到 Seata Server上,如果和file.conf 中的配置不一致,會提示 no available server to connect錯誤

也可以通過配置 spring.cloud.alibaba.seata.tx-service-group修改後綴,但是必須和file.conf中的配置保持一致

給分散式大事務的方法標註@GlobalTransactional

每一個遠端的小事務用 @Transactional

四、使用RabbitMQ延時佇列整合分散式事務(上述第四種方案)

RabbitMQ的一些準備工作見RabbitMQ的部落格

建立交換機、佇列、繫結可以直接在配置檔案中直接匯入@bean的方式直接注入

實現如圖:

加入配置檔案

package com.atguigu.gulimall.order.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * <p>Title: MyMQConfig</p>
 * Description:容器中的所有bean都會自動建立到RabbitMQ中 [RabbitMQ沒有這個佇列、交換機、繫結]
 * date:2020/7/3 17:03
 */
@Configuration
public class MyMQConfig {

	@Value("${myRabbitmq.MQConfig.queues}")
	private String queues;

	@Value("${myRabbitmq.MQConfig.eventExchange}")
	private String eventExchange;

	@Value("${myRabbitmq.MQConfig.routingKey}")
	private String routingKey;

	@Value("${myRabbitmq.MQConfig.delayQueue}")
	private String delayQueue;

	@Value("${myRabbitmq.MQConfig.createOrder}")
	private String createOrder;

	@Value("${myRabbitmq.MQConfig.ReleaseOther}")
	private String ReleaseOther;

	@Value("${myRabbitmq.MQConfig.ReleaseOtherKey}")
	private String ReleaseOtherKey;

	@Value("${myRabbitmq.MQConfig.ttl}")
	private String ttl;

	/**
	 * String name, boolean durable, boolean exclusive, boolean autoDelete,  @Nullable Map<String, Object> arguments
	 */
	@Bean
	public Queue orderDelayQueue(){
		Map<String ,Object> arguments = new HashMap<>();
		arguments.put("x-dead-letter-exchange", eventExchange);
		arguments.put("x-dead-letter-routing-key", routingKey);
		arguments.put("x-message-ttl", ttl);
		Queue queue = new Queue(delayQueue, true, false, false, arguments);
		return queue;
	}

	@Bean
	public Queue orderReleaseOrderQueue(){
		Queue queue = new Queue(queues, true, false, false);
		return queue;
	}

	/**
	 * String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
	 * @return
	 */
	@Bean
	public Exchange orderEventExchange(){

		return new TopicExchange(eventExchange, true, false);
	}

	/**
	 * String destination, DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
	 */
	@Bean
	public Binding orderCreateOrderBinding(){

		return new Binding(delayQueue, Binding.DestinationType.QUEUE, eventExchange, createOrder, null);
	}

	@Bean
	public Binding orderReleaseOrderBinding(){

		return new Binding(queues, Binding.DestinationType.QUEUE, eventExchange, routingKey, null);
	}

	/**
	 * 訂單釋放直接和庫存釋放進行繫結
	 */
	@Bean
	public Binding orderReleaseOtherBinding(){

		return new Binding(ReleaseOther, Binding.DestinationType.QUEUE, eventExchange, ReleaseOtherKey + ".#", null);
	}

	@Bean
	public Queue orderSecKillQueue(){
		return new Queue("order.seckill.order.queue", true, false, false);
	}

	@Bean
	public Binding orderSecKillQueueBinding(){
		return new Binding("order.seckill.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.seckill.order", null);
	}
}

yaml

myRabbitmq:
  queue: mall-queue
  exchange: mall-exchange
  routeKey: mall
  MQConfig:
    # 訂單佇列
    createOrder: order.create.order	//延時佇列的路由
    delayQueue: order.delay.queue	//延時佇列佇列
    eventExchange: order-event-exchange	//交換機
    routingKey: order.release.order	//死信佇列的路由
    queues: order.release.order.queue	//死信佇列
    # 訂單自動過期時間 單位:(毫秒)
    ttl: 900000
    # 庫存解鎖佇列
    ReleaseOther: stock.release.stock.queue
    ReleaseOtherKey: order.release.other

根據業務進行處理......