JTA初步學習
JTA初步學習
1.介紹
1.1.定義
允許應用程式執行分散式事務處理——在兩個或多個網路計算機資源上訪問並且更新資料。JDBC驅動程式的JTA支援極大地增強了資料訪問能力。
1.2.XA協議
一個分散式事務的規範,其定義了一個分散式事務的處理模型——DTP。在DTP中定義了三個元件:
(1)Application Program(AP):應用程式,即業務層,它定義了事務的邊界,以及構成該事務的特定操作;
(2)Resource Manager(RM):資源管理器,可以理解為一個DBMS系統,或者訊息伺服器管理系統;
(3)Transaction Manager(TM):事務管理器,負責協調和管理事務。
關係:AP與RM之間,AP通過RM提供的API進行互動,當需要進行分散式事務時,則向TM發起一個全域性事務,TM與RM之間則通過XA介面進行互動,TM管理了到RM的連結,並實現了兩階段提交。
1.3.兩階段提交流程(2PC)
(1)兩個階段內容:
1.請求階段(commit-request phase,或稱表決階段,voting phase)在請求階段,協調者將通知事務參與者準備提交或取消事務,然後進入表決過程。在表決過程中,參與者將告知協調者自己的決策:同意(事務參與者本地作業執行成功)或取消(本地作業執行故障)。
2.提交階段(commit phase)
在該階段,協調者將基於第一個階段的投票結果進行決策:提交或取消。當且僅當所有的參與者同意提交事務協調者才通知所有的參與者提交事務,否則協調者將通知所有的參與者取消事務。參與者在接收到協調者發來的訊息後將執行響應的操作。
(2)缺點:
①同步阻塞問題:當參與者佔有公共資源時,其他第三方節點訪問公共資源不得不處於阻塞狀態;
②單點故障:一旦協調者發生故障。參與者會一直阻塞下去。尤其在第二階段,協調者發生故障,那麼所有的參與者還都處於鎖定事務資源的狀態中,而無法繼續完成事務操作;
③資料不一致:在二階段提交的階段二中,當協調者向參與者傳送commit請求之後,發生了局部網路異常或者在傳送commit請求過程中協調者發生了故障,這回導致只有一部分參與者接受到了commit請求。而在這部分參與者接到commit請求之後就會執行commit操作。但是其他部分未接到commit請求的機器則無法執行事務提交。於是整個分散式系統便出現了資料不一致性的現象。
2.springboot中應用
Atomikos在XA中作為一個事務管理器(TM)存在。在Spring Boot應用中,可以通過Atomikos在應用中方便的引入分散式事務。
步驟:
2.1.建表指令碼
資料庫db1中建立book表:
CREATE TABLE `book` (
`id` int(255) NOT NULL AUTO_INCREMENT,
`book_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`isbn` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`create_date` datetime(0) NULL DEFAULT NULL,
`price` decimal(10, 0) NULL DEFAULT NULL,
`quantity` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 112 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
資料庫db2中建立表book_order:
CREATE TABLE `book_order` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`order_num` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`status` varchar(2) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`comments` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`total` decimal(10, 0) NULL DEFAULT NULL,
`create_date` datetime(0) NULL DEFAULT NULL,
`book_id` int(11) NULL DEFAULT NULL,
`quantity` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 16 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
2.2.匯入依賴
jta的依賴單獨加入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
2.3.配置application.yml
配置兩表的資料來源:
spring:
datasource:
test1:
jdbcurl: jdbc:mysql://localhost:3306/school?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
username: root
password:
initial-size: 1
min-idle: 1
max-active: 20
test-on-borrow: true
driver-class-name: com.mysql.jdbc.Driver
minPoolSize: 3
maxPoolSize: 25
maxLifetime: 20000
borrowConnectionTimeout: 30
loginTimeout: 30
maintenanceInterval: 60
maxIdleTime: 60
test2:
jdbcurl: jdbc:mysql://localhost:3306/vue?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
username: root
password:
driver-class-name: com.mysql.jdbc.Driver
minPoolSize: 3
maxPoolSize: 25
maxLifetime: 20000
borrowConnectionTimeout: 30
loginTimeout: 30
maintenanceInterval: 60
maxIdleTime: 60
mybatis:
configuration:
map-underscore-to-camel-case: true
type-aliases-package: com.xxx.proj.jtademo
2.4.資料來源-屬性配置類
(1)DBConfig1:
package com.xxx.proj.jtademo.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.datasource.test1") // 注意這個字首要和application.yml檔案的字首一樣
public class DBConfig1 {
// @Value("${mysql.datasource.test1.jdbcurl}")
//@Value("${jdbcurl}")
private String jdbcurl;
//private String url;
// 比如這個url在properties中是這樣子的mysql.datasource.test1.username = root
private String username;
private String password;
private int minPoolSize;
private int maxPoolSize;
private int maxLifetime;
private int borrowConnectionTimeout;
private int loginTimeout;
private int maintenanceInterval;
private int maxIdleTime;
private String testQuery;
}
(2)DBConfig2:
package com.xxx.proj.jtademo.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.datasource.test2") // 注意這個字首要和application.yml檔案的字首一樣
public class DBConfig2 {
// @Value("${mysql.datasource.test1.jdbcurl}")
//@Value("${jdbcurl}")
private String jdbcurl;
//private String url;
// 比如這個url在properties中是這樣子的mysql.datasource.test1.username = root
private String username;
private String password;
private int minPoolSize;
private int maxPoolSize;
private int maxLifetime;
private int borrowConnectionTimeout;
private int loginTimeout;
private int maintenanceInterval;
private int maxIdleTime;
private String testQuery;
}
2.5.JTA配置類
(1)DbConfig1Atomikos:
package com.xxx.proj.jtademo.config;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import com.xxx.proj.jtademo.properties.DBConfig1;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.sql.SQLException;
@Configuration
@MapperScan(basePackages = "com.xxx.proj.jtademo.mapper1", sqlSessionTemplateRef = "test1SqlSessionTemplate")
public class DbConfig1Atomikos {
@Bean(name = "test1DataSource") //test1DataSource
public DataSource testDataSource(DBConfig1 testConfig) throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
//mysqlXaDataSource.setUrl(testConfig.getUrl());
mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(testConfig.getPassword());
mysqlXaDataSource.setUser(testConfig.getUsername());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
// 將本地事務註冊到創 Atomikos全域性事務
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("test1DataSource");
xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
xaDataSource.setTestQuery(testConfig.getTestQuery());
return xaDataSource;
}
@Bean(name = "test1SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1DataSource") DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
return bean.getObject();
}
@Bean(name = "test1SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(
@Qualifier("test1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
(2)DbConfig2Atomikos:
package com.xxx.proj.jtademo.config;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import com.xxx.proj.jtademo.properties.DBConfig1;
import com.xxx.proj.jtademo.properties.DBConfig2;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.sql.SQLException;
@Configuration
@MapperScan(basePackages = "com.xxx.proj.jtademo.mapper2", sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class DbConfig2Atomikos {
@Bean(name = "test2DataSource") //test2DataSource
public DataSource testDataSource(DBConfig2 testConfig) throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
//mysqlXaDataSource.setUrl(testConfig.getUrl());
mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(testConfig.getPassword());
mysqlXaDataSource.setUser(testConfig.getUsername());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
// 將本地事務註冊到創 Atomikos全域性事務
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("test2DataSource");
xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
xaDataSource.setTestQuery(testConfig.getTestQuery());
return xaDataSource;
}
@Bean(name = "test2SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
return bean.getObject();
}
@Bean(name = "test2SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(
@Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
2.6.實體類
(1)Book
package com.xxx.proj.jtademo.entity;
import lombok.Data;
import java.util.Date;
@Data
public class Book {
private Integer id;
private String bookName;
private String isbn;
private Date createDate;
private Double price;
private Integer quantity;
}
(2)BookOrder
package com.xxx.proj.jtademo.entity;
import lombok.Data;
import java.util.Date;
@Data
public class BookOrder {
private Integer id;
private String orderNum;
private String status;
private String comments;
private String total;
private Integer bookId;
private String quantity;
private Date createDate;
}
2.7.持久層
注意這裡兩個mapper屬於不同的包!!!!
(1)BookMapper
package com.xxx.proj.jtademo.mapper1;
import com.xxx.proj.jtademo.entity.Book;
public interface BookMapper {
void insert(Book book);
}
(2)BookMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xxx.proj.jtademo.mapper1.BookMapper">
<insert id="insert" parameterType="com.xxx.proj.jtademo.entity.Book">
insert into book( book_name) values(#{bookName})
</insert>
</mapper>
(3)BookOrderMapper
package com.xxx.proj.jtademo.mapper2;
import com.xxx.proj.jtademo.entity.BookOrder;
public interface BookOrderMapper {
void insert(BookOrder bookOrder);
}
(4)BookOrderMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xxx.proj.jtademo.mapper2.BookOrderMapper">
<insert id="insert" parameterType="com.xxx.proj.jtademo.entity.BookOrder">
insert into book_order( comments) values(#{comments})
</insert>
</mapper>
2.8.服務層
(1)OrderService
package com.xxx.proj.jtademo.service;
public interface OrderService {
void saveSuccess();
void saveFail();
}
(2)OrderServiceImpl
package com.xxx.proj.jtademo.service.impl;
import com.xxx.proj.jtademo.entity.Book;
import com.xxx.proj.jtademo.entity.BookOrder;
import com.xxx.proj.jtademo.mapper1.BookMapper;
import com.xxx.proj.jtademo.mapper2.BookOrderMapper;
import com.xxx.proj.jtademo.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private BookMapper bookMapper;
@Autowired
private BookOrderMapper bookOrderMapper;
@Override
@Transactional
public void saveSuccess() {
int x = (int) (Math.random() * 100);
Book book = new Book();
book.setBookName("圖書" + x);
BookOrder bookOrder = new BookOrder();
bookOrder.setComments("購買圖書" + x);
bookMapper.insert(book);
bookOrderMapper.insert(bookOrder);
}
@Override
@Transactional
public void saveFail() {
int x = (int) (Math.random() * 100);
Book book = new Book();
book.setBookName("圖書" + x);
BookOrder bookOrder = new BookOrder();
bookOrder.setComments("購買圖書" + x);
bookMapper.insert(book);
bookOrderMapper.insert(bookOrder);
int y = 1 / 0;
}
}
2.9.控制層
OrderController
package com.xxx.proj.jtademo.controller;
import com.xxx.proj.jtademo.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping("/s")
public String success() {
orderService.saveSuccess();
return "success";
}
@RequestMapping("/f")
public String fail() {
orderService.saveFail();
return "fail";
}
}
2.10.啟動類
package com.xxx.proj.jtademo;
import com.xxx.proj.jtademo.properties.DBConfig1;
import com.xxx.proj.jtademo.properties.DBConfig2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties(value = {DBConfig1.class, DBConfig2.class})
public class JtaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(JtaDemoApplication.class, args);
}
}
2.11.網頁請求測試
執行JtaDemoApplication 中的main方法以啟動伺服器:
(1)訪問localhost:8080/s:
book資料存入了school資料庫中表,而書籍的訂單資料存入了vue資料庫中的表,則實現了分散式事務處理
(2)訪問localhost:8080/f
看回滾結果