使用spring解決分散式事物
阿新 • • 發佈:2019-01-08
概述:提及分散式事務,各位可能都不陌生,在網際網路流量如此大的今天,可以說網站的搭建再也不是一臺伺服器就能搞定的,大量的伺服器叢集和資料庫叢集為網站的高壓力提供了支援,但是同時系統的複雜性,編碼中的需要考慮的問題也越來越多,單點故障怎麼辦,網路通訊延遲造成資料混亂怎麼解決,這些都讓當今的架構和編碼難度成倍的增加,今天就和大家聊一聊分散式架構中常見的分散式事務問題—多源資料庫事務的管理
我們藉助一個場景來說:
處理程式裡面有這一個這樣的步驟,它需要把訊息處理之後傳送的MQ,同時往日誌裡面插一條資料,這個過程很簡單,但是如果處理不得當會出現資料錯亂問題,在與DATA和MQ通訊的時候如果任何一方發生故障則會導致資料的不一致,這兩者顯然是具有事務性的。
理論:
1.強一致性
2.弱一致性
3.最終一致性
強一致性是最嚴格的標準,它要求資料的實時一致,拿上面的情況來說,如果我們往MQ中推送訊息成功那麼要求在DATA中也必須同時更新資料成功,如果第二個操作失敗則必須撤回第一個操作,資料要求實時一致
而弱一致性就要求沒那麼嚴格了,美團網的訂客房系統不知道大家有沒有體驗過,全國的客房資訊很多,美團顯然不可能實時拉取所有的客房資訊,他是每隔一個時間段來拉去一次,這個時候顯示的資料就可能和最新的資料不一樣,但是當你下單的時候會有一個二次驗證,對當前下單的客房進行一次拉取,這樣就避免了延遲資料的下單,這種一致性的解決就是弱一致性。它可能並不是實時的一致,會有延遲。
而最總一致性則是三種之中要求最弱的一種,它更像是一種不作為的處理方法,在系統出錯率低的情況下保證效率優先,通過後期的補救手段來將資料完成一致。
解決思路:
理論說了這麼多,現在說下實際處理的方法,我們先拿第一種方式來說:
思路:回想一下單資料庫的解決思路
上圖就是經典的資料庫提交過程,先是一次prepare,準備階段就像是一次試提交,試提交沒問題了然後才真正提交。如果試提交有問題就會重複的試提交直到連線超時。如果是多個數據源是不是也能有個試提交的過程呢,答案是顯然的
道理是一樣的,向兩個資料來源進行準備階段的試提交,如果這裡都OK了,然後開始第二階段的提交,試提交依然充當了一個排除員的角色,我們所擔心的一個成功一個失敗的情況顯然是能被排查掉的。
程式碼:
java提供了事務的介面規範JPA來規範各種需要事物的操作,spring使用template的模式提供了各種支援,我們這裡不討論底層的實現,以rabbitmq和mysql為例,來看看spring怎麼結合兩者實現同步事物的。
spring-mysql的配置
<!-- 配置連線池 -->
<bean id="dataSource" class="com.jolbox.bonecp.BoneCPDataSource" destroy-method="close">
<!-- 資料庫驅動 -->
<property name="driverClass" value="${jdbc.driver}" />
<!-- 相應驅動的jdbcUrl -->
<property name="jdbcUrl" value="${jdbc.url}" />
<!-- 資料庫的使用者名稱 -->
<property name="username" value="${jdbc.username}" />
<!-- 資料庫的密碼 -->
<property name="password" value="${jdbc.password}" />
<!-- 檢查資料庫連線池中空閒連線的間隔時間,單位是分,預設值:240,如果要取消則設定為0 -->
<property name="idleConnectionTestPeriod" value="60" />
<!-- 連線池中未使用的連結最大存活時間,單位是分,預設值:60,如果要永遠存活設定為0 -->
<property name="idleMaxAge" value="30" />
<!-- 每個分割槽最大的連線數 -->
<property name="maxConnectionsPerPartition" value="150" />
<!-- 每個分割槽最小的連線數 -->
<property name="minConnectionsPerPartition" value="5" />
</bean>
<!-- 定義事務管理器 -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- 定義事務策略 -->
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<!--定義查詢方法都是隻讀的 -->
<tx:method name="query*" read-only="true" />
<tx:method name="find*" read-only="true" />
<tx:method name="get*" read-only="true" />
<tx:method name="select*" read-only="true" />
<!-- 主庫執行操作,事務傳播行為定義為預設行為 -->
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="delete*" propagation="REQUIRED" />
<!--其他方法使用預設事務策略 -->
<tx:method name="*" />
</tx:attributes>
</tx:advice>
<aop:config>
<!-- 定義切面,所有的service的所有方法 -->
<aop:pointcut id="txPointcut" expression="execution(* com.DF.service.*.*(..))" />
<!-- 應用事務策略到Service切面 -->
<aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/>
</aop:config>
rabbitmq配置
<!-- 連線配置 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" username="${rabbit.username}"
password="${rabbit.password}" port="${rabbit.port}" virtual-host="${rabbit.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- spring template宣告-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
**程式碼部分:**
public class FenbushiServiceImpl implements FenbushiService {
@Autowired
private MessageLogService messageLogService;
@Override
public Long saveLogAndSendMq(){
/*
* service邏輯:
* 1.查詢當天最大的messageid+1 作為當前的messageid
* 2.傳送到佇列
* 3.更新messageid
* 步驟二和三是可以調換的
* 這個過程是明顯具有事物性的 如果說步驟2成功 3失敗 資料將錯亂
* 這裡嘗試幾種解決的思路和辦法
*/
Long messageid;
try {
messageid = schemeOne();
} catch (Exception e) {
return null;
}
return messageid;
}
/*
*1.方案一 在程式中控制 採用失敗回滾的方式
*這種方案就是先進行步驟1和3 然後執行二 如果2失敗處理異常的時候進行3的回滾
*這種主要是利用了spring事物的管理 把資料庫的事物依然交給spring管理 而異構
*資料來源的事物交給程式碼管理
*/
Long schemeOne(){
//步驟1
long messageid = messageLogService.selectMaxIdToday();
//步驟2
messageLogService.savelog(new Message_Log(messageid, 1, 1, "在程式中嘗試解決事物",
null, null, null, null, null, null, null, null, null, null, null, null, null));
try {
//步驟3
EventMessage message = new EventMessage(null,ParsePropUtil.getProp(ConfigurtionManager.RABBITMQ_ENTERE_ROATINGKEY)
, ParsePropUtil.getProp(ConfigurtionManager.RABBIT_ENTER_EXCHANGE)
, "在程式中嘗試解決事物".getBytes());
int i = 1/0; // 模擬步驟3出現問題
MessageSender.sendMessage2Exchange(message);
} catch (Exception e) {
//spring中宣告式事物的管理 所以這裡丟擲執行時異常 spring會捕捉 並回滾service
throw new RuntimeException();//rollback!
}
return messageid;
}
看下封裝的MessageSender類:
@Service
public class MessageSender {
// 傳送模板 靜態變數需要set方法注入
private static RabbitTemplate amqpTemplate ;
@Autowired
public void setAmqpTemplate(RabbitTemplate amqpTemplate) {
MessageSender.amqpTemplate = amqpTemplate;
}
// 將訊息傳送到交換機
public static void sendMessage2Exchange(final EventMessage eventMessage){
amqpTemplate.convertAndSend(eventMessage.getExchangeName(), eventMessage.getRoatingKey(),
eventMessage.getEventData(),new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(eventMessage.getProrityLevel());
return message;
}
});
}
在同一個service中,spring通過template的方式使兩者具有事物,資料具有實時一致性,但是很明顯由於service中的方法沒有執行完成時,事物是始終掛起的,這將導致程式的效率大大降低,所以這種方法雖然讓資料實時一致,但在當今的IT領域採用率並不高。
實際上,在絕大多數情況下,操作一和操作二是不容易出現異常資料的,在很低錯誤率的情況下更多的企業希望採用方式三來解決問題。在最終一致的前提下,我們不需要考慮對操作一和操作二的成功與否,這樣效率大大提高,我們只需要在定期的排查異常,發現異常的時候後期處理異常的資料,保證資料最終的一致。