1. 程式人生 > >使用spring解決分散式事物

使用spring解決分散式事物

概述:提及分散式事務,各位可能都不陌生,在網際網路流量如此大的今天,可以說網站的搭建再也不是一臺伺服器就能搞定的,大量的伺服器叢集和資料庫叢集為網站的高壓力提供了支援,但是同時系統的複雜性,編碼中的需要考慮的問題也越來越多,單點故障怎麼辦,網路通訊延遲造成資料混亂怎麼解決,這些都讓當今的架構和編碼難度成倍的增加,今天就和大家聊一聊分散式架構中常見的分散式事務問題—多源資料庫事務的管理
我們藉助一個場景來說:
這裡寫圖片描述
處理程式裡面有這一個這樣的步驟,它需要把訊息處理之後傳送的MQ,同時往日誌裡面插一條資料,這個過程很簡單,但是如果處理不得當會出現資料錯亂問題,在與DATA和MQ通訊的時候如果任何一方發生故障則會導致資料的不一致,這兩者顯然是具有事務性的。
理論:

從理論上說資料的一致性分為一下幾種:
1.強一致性
2.弱一致性
3.最終一致性
強一致性是最嚴格的標準,它要求資料的實時一致,拿上面的情況來說,如果我們往MQ中推送訊息成功那麼要求在DATA中也必須同時更新資料成功,如果第二個操作失敗則必須撤回第一個操作,資料要求實時一致
而弱一致性就要求沒那麼嚴格了,美團網的訂客房系統不知道大家有沒有體驗過,全國的客房資訊很多,美團顯然不可能實時拉取所有的客房資訊,他是每隔一個時間段來拉去一次,這個時候顯示的資料就可能和最新的資料不一樣,但是當你下單的時候會有一個二次驗證,對當前下單的客房進行一次拉取,這樣就避免了延遲資料的下單,這種一致性的解決就是弱一致性。它可能並不是實時的一致,會有延遲。
而最總一致性則是三種之中要求最弱的一種,它更像是一種不作為的處理方法,在系統出錯率低的情況下保證效率優先,通過後期的補救手段來將資料完成一致。
解決思路:

理論說了這麼多,現在說下實際處理的方法,我們先拿第一種方式來說:
思路:回想一下單資料庫的解決思路
這裡寫圖片描述
上圖就是經典的資料庫提交過程,先是一次prepare,準備階段就像是一次試提交,試提交沒問題了然後才真正提交。如果試提交有問題就會重複的試提交直到連線超時。如果是多個數據源是不是也能有個試提交的過程呢,答案是顯然的
這裡寫圖片描述
道理是一樣的,向兩個資料來源進行準備階段的試提交,如果這裡都OK了,然後開始第二階段的提交,試提交依然充當了一個排除員的角色,我們所擔心的一個成功一個失敗的情況顯然是能被排查掉的。
程式碼:
java提供了事務的介面規範JPA來規範各種需要事物的操作,spring使用template的模式提供了各種支援,我們這裡不討論底層的實現,以rabbitmq和mysql為例,來看看spring怎麼結合兩者實現同步事物的。
spring-mysql的配置

<!-- 配置連線池 -->
<bean id="dataSource" class="com.jolbox.bonecp.BoneCPDataSource" destroy-method="close">
    <!-- 資料庫驅動 -->
    <property name="driverClass" value="${jdbc.driver}" />
    <!-- 相應驅動的jdbcUrl -->
    <property name="jdbcUrl" value="${jdbc.url}" />
    <!-- 資料庫的使用者名稱 -->
    <property name="username" value="${jdbc.username}" />
    <!-- 資料庫的密碼 -->
    <property name="password" value="${jdbc.password}" />
    <!-- 檢查資料庫連線池中空閒連線的間隔時間,單位是分,預設值:240,如果要取消則設定為0 -->
    <property name="idleConnectionTestPeriod" value="60" />
    <!-- 連線池中未使用的連結最大存活時間,單位是分,預設值:60,如果要永遠存活設定為0 -->
    <property name="idleMaxAge" value="30" />
    <!-- 每個分割槽最大的連線數 -->
    <property name="maxConnectionsPerPartition" value="150" />
    <!-- 每個分割槽最小的連線數 -->
    <property name="minConnectionsPerPartition" value="5" />
</bean>

<!-- 定義事務管理器 -->
<bean id="transactionManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
</bean>  

<!-- 定義事務策略 -->
<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <!--定義查詢方法都是隻讀的 -->
        <tx:method name="query*" read-only="true" />
        <tx:method name="find*" read-only="true" />
        <tx:method name="get*" read-only="true" />
        <tx:method name="select*" read-only="true" />

        <!-- 主庫執行操作,事務傳播行為定義為預設行為 -->
        <tx:method name="save*" propagation="REQUIRED" />
        <tx:method name="update*" propagation="REQUIRED" />
        <tx:method name="delete*" propagation="REQUIRED" />

        <!--其他方法使用預設事務策略 -->
        <tx:method name="*" />
    </tx:attributes>
</tx:advice>

<aop:config>
    <!-- 定義切面,所有的service的所有方法 -->
    <aop:pointcut id="txPointcut" expression="execution(* com.DF.service.*.*(..))" />
    <!-- 應用事務策略到Service切面 -->
    <aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/>
</aop:config>

rabbitmq配置

<!-- 連線配置 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" username="${rabbit.username}" 
    password="${rabbit.password}" port="${rabbit.port}"  virtual-host="${rabbit.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>

<!-- spring template宣告-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

**程式碼部分:**

public class FenbushiServiceImpl implements FenbushiService {
@Autowired
private MessageLogService messageLogService;
@Override
public Long saveLogAndSendMq(){
    /*
     * service邏輯:
     * 1.查詢當天最大的messageid+1 作為當前的messageid 
     * 2.傳送到佇列
     * 3.更新messageid
     * 步驟二和三是可以調換的
     * 這個過程是明顯具有事物性的  如果說步驟2成功 3失敗  資料將錯亂 
     * 這裡嘗試幾種解決的思路和辦法
     */
    Long messageid;
    try {
        messageid = schemeOne();

    } catch (Exception e) {
        return null;
    }
    return messageid;
}

/*
 *1.方案一  在程式中控制  採用失敗回滾的方式 
 *這種方案就是先進行步驟1和3  然後執行二  如果2失敗處理異常的時候進行3的回滾
 *這種主要是利用了spring事物的管理  把資料庫的事物依然交給spring管理  而異構
 *資料來源的事物交給程式碼管理
 */
Long schemeOne(){

    //步驟1
    long messageid = messageLogService.selectMaxIdToday();

    //步驟2
    messageLogService.savelog(new Message_Log(messageid, 1, 1, "在程式中嘗試解決事物", 
            null, null, null, null, null, null, null, null, null, null, null, null, null));


    try {
        //步驟3
        EventMessage message = new EventMessage(null,ParsePropUtil.getProp(ConfigurtionManager.RABBITMQ_ENTERE_ROATINGKEY)
                , ParsePropUtil.getProp(ConfigurtionManager.RABBIT_ENTER_EXCHANGE)
                , "在程式中嘗試解決事物".getBytes());
        int i = 1/0; // 模擬步驟3出現問題
        MessageSender.sendMessage2Exchange(message);

    } catch (Exception e) {
        //spring中宣告式事物的管理  所以這裡丟擲執行時異常 spring會捕捉 並回滾service
        throw new RuntimeException();//rollback!
    }
    return messageid;
}

看下封裝的MessageSender類:

@Service
public class MessageSender {
// 傳送模板  靜態變數需要set方法注入
private static RabbitTemplate amqpTemplate ;    
@Autowired
public void setAmqpTemplate(RabbitTemplate amqpTemplate) {
    MessageSender.amqpTemplate = amqpTemplate;
}

// 將訊息傳送到交換機
public  static void sendMessage2Exchange(final EventMessage eventMessage){
    amqpTemplate.convertAndSend(eventMessage.getExchangeName(), eventMessage.getRoatingKey(),
            eventMessage.getEventData(),new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setPriority(eventMessage.getProrityLevel());
                    return message;
                }
            });
}

在同一個service中,spring通過template的方式使兩者具有事物,資料具有實時一致性,但是很明顯由於service中的方法沒有執行完成時,事物是始終掛起的,這將導致程式的效率大大降低,所以這種方法雖然讓資料實時一致,但在當今的IT領域採用率並不高。
實際上,在絕大多數情況下,操作一和操作二是不容易出現異常資料的,在很低錯誤率的情況下更多的企業希望採用方式三來解決問題。在最終一致的前提下,我們不需要考慮對操作一和操作二的成功與否,這樣效率大大提高,我們只需要在定期的排查異常,發現異常的時候後期處理異常的資料,保證資料最終的一致。