Sharding-JDBC讀寫分離探祕
前段時間寫了篇如何使用Sharding-JDBC進行分庫分表的例子,相信能夠感受到Sharding-JDBC的強大了,而且使用配置都非常乾淨。官方支援的功能還包括讀寫分離、分散式主鍵、強制路由等。這裡再介紹下如何在分庫分表的基礎上整合讀寫分離的功能。
讀寫分離的概念
就是為了緩解資料庫壓力,將寫入和讀取操作分離為不同資料來源,寫庫稱為主庫,讀庫稱為從庫,一主庫可配置多從庫。
設定主從庫後,第一個問題是如何進行主從的同步。官方不支援主從的同步,也不支援因為主從同步延遲導致的資料不一致問題。工程實踐上進行主從同步有很多做法,一種常用的做法是每天定時同步或者實時同步。這個話題太大,暫不展開。
讀寫分離快速入門
讀寫可以單獨使用,也可以配合分庫分表進行使用,由於上個分庫分表的例子是基於1.5.4.1
版本進行說明的,這裡為了緊跟官方的步伐,升級Sharding-JDBC到最新的2.0.0.M2
專案結構如下:
pom依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> </dependency> <!-- Sharding-JDBC核心依賴 --> <dependency> <groupId>io.shardingjdbc</groupId> <artifactId>sharding-jdbc-core</artifactId> </dependency> <!-- Sharding-JDBC Spring Boot Starter --> <dependency> <groupId>io.shardingjdbc</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
主從資料庫配置
在配置前,我們希望分庫分表規則和之前保持一致:
基於
t_user
表,根據city_id
進行分庫,如果city_id mod 2
為奇數則落在ds_master_1
庫,偶數則落在ds_master_0
庫;根據user_id
進行分表,如果user_id mod 2
為奇數則落在t_user_1
表,偶數則落在t_user_0
表
讀寫分離規則:
讀都落在從庫,寫落在主庫
因為使用Sharding-JDBC Spring Boot Starter
,所以只需要在properties配置檔案配置主從庫的資料來源即可:
spring.application.name=spring-boot-mybatis-sharding-jdbc-masterslave
server.context-path=/springboot
mybatis.config-location=classpath:mybatis-config.xml
# 所有主從庫
sharding.jdbc.datasource.names=ds_master_0,ds_master_1,ds_master_0_slave_0,ds_master_0_slave_1,ds_master_1_slave_0,ds_master_1_slave_1
# ds_master_0
sharding.jdbc.datasource.ds_master_0.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds_master_0.driverClassName=com.mysql.jdbc.Driver
sharding.jdbc.datasource.ds_master_0.url=jdbc:mysql://127.0.0.1:3306/ds_master_0?useSSL=false
sharding.jdbc.datasource.ds_master_0.username=travis
sharding.jdbc.datasource.ds_master_0.password=
# slave for ds_master_0
sharding.jdbc.datasource.ds_master_0_slave_0.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds_master_0_slave_0.driverClassName=com.mysql.jdbc.Driver
sharding.jdbc.datasource.ds_master_0_slave_0.url=jdbc:mysql://127.0.0.1:3306/ds_master_0_slave_0?useSSL=false
sharding.jdbc.datasource.ds_master_0_slave_0.username=travis
sharding.jdbc.datasource.ds_master_0_slave_0.password=
sharding.jdbc.datasource.ds_master_0_slave_1.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds_master_0_slave_1.driverClassName=com.mysql.jdbc.Driver
sharding.jdbc.datasource.ds_master_0_slave_1.url=jdbc:mysql://127.0.0.1:3306/ds_master_0_slave_1?useSSL=false
sharding.jdbc.datasource.ds_master_0_slave_1.username=travis
sharding.jdbc.datasource.ds_master_0_slave_1.password=
# ds_master_1
sharding.jdbc.datasource.ds_master_1.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds_master_1.driverClassName=com.mysql.jdbc.Driver
sharding.jdbc.datasource.ds_master_1.url=jdbc:mysql://127.0.0.1:3306/ds_master_1?useSSL=false
sharding.jdbc.datasource.ds_master_1.username=travis
sharding.jdbc.datasource.ds_master_1.password=
# slave for ds_master_1
sharding.jdbc.datasource.ds_master_1_slave_0.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds_master_1_slave_0.driverClassName=com.mysql.jdbc.Driver
sharding.jdbc.datasource.ds_master_1_slave_0.url=jdbc:mysql://127.0.0.1:3306/ds_master_1_slave_0?useSSL=false
sharding.jdbc.datasource.ds_master_1_slave_0.username=travis
sharding.jdbc.datasource.ds_master_1_slave_0.password=
sharding.jdbc.datasource.ds_master_1_slave_1.type=com.alibaba.druid.pool.DruidDataSource
sharding.jdbc.datasource.ds_master_1_slave_1.driverClassName=com.mysql.jdbc.Driver
sharding.jdbc.datasource.ds_master_1_slave_1.url=jdbc:mysql://127.0.0.1:3306/ds_master_1_slave_1?useSSL=false
sharding.jdbc.datasource.ds_master_1_slave_1.username=travis
sharding.jdbc.datasource.ds_master_1_slave_1.password=
# 分庫規則
sharding.jdbc.config.sharding.default-database-strategy.inline.sharding-column=city_id
sharding.jdbc.config.sharding.default-database-strategy.inline.algorithm-expression=ds_${city_id % 2}
# 分表規則
sharding.jdbc.config.sharding.tables.t_user.actualDataNodes=ds_${0..1}.t_user_${0..1}
sharding.jdbc.config.sharding.tables.t_user.tableStrategy.inline.shardingColumn=user_id
sharding.jdbc.config.sharding.tables.t_user.tableStrategy.inline.algorithmExpression=t_user_${user_id % 2}
# 使用user_id作為分散式主鍵
sharding.jdbc.config.sharding.tables.t_user.keyGeneratorColumnName=user_id
# 邏輯主從庫名和實際主從庫對映關係
sharding.jdbc.config.sharding.master-slave-rules.ds_0.masterDataSourceName=ds_master_0
sharding.jdbc.config.sharding.master-slave-rules.ds_0.slaveDataSourceNames=ds_master_0_slave_0, ds_master_0_slave_1
sharding.jdbc.config.sharding.master-slave-rules.ds_1.masterDataSourceName=ds_master_1
sharding.jdbc.config.sharding.master-slave-rules.ds_1.slaveDataSourceNames=ds_master_1_slave_0, ds_master_1_slave_1
Test
測試程式碼如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserMapperTest {
/** Logger */
private static Logger log = LoggerFactory.getLogger(UserMapperTest.class);
@Resource
private UserMapper userMapper;
@Before
public void setup() throws Exception {
create();
clear();
}
private void create() throws SQLException {
userMapper.createIfNotExistsTable();
}
private void clear() {
userMapper.truncateTable();
}
@Test
public void insert() throws Exception {
UserEntity user = new UserEntity();
user.setCityId(1);
user.setUserName("insertTest");
user.setAge(10);
user.setBirth(new Date());
assertTrue(userMapper.insert(user) > 0);
Long userId = user.getUserId();
log.info("Generated Key--userId:" + userId);
userMapper.delete(userId);
}
@Test
public void find() throws Exception {
UserEntity userEntity = userMapper.find(138734796783222784L);
log.info("user:{}", userEntity);
}
}
先執行insert
方法,插入一條資料後,獲取插入的user_id
為138734796783222784L
(每次執行會不一樣),由於city_id=1
,讀寫分離約定,會落在主庫,又根據分庫規則會落在ds_master_1
,再根據分表規則,會落在t_user_0
再執行find
方法,指定userId,你會發現查出來是空的,這是因為Sharding-JDBC不支援主從同步以及主從同步延遲造成的資料不一致。這裡我們顯然術語第一種,因為根本就沒有進行主從同步,那麼從從庫讀取肯定是空的。
我們可以反向推理下,假如開啟了主從同步,現在資料落在主庫ds_master_1
,這個主庫有兩個從庫:ds_master_1_slave_0
和ds_master_1_slave_1
,所以我們可以往這兩個主庫的t_user_0
表插入剛才的資料,語句如下:
INSERT INTO t_user_0(user_id,city_id,user_name,age,birth) values(138734796783222784,1,'insertTest',10,'2017-11-18 00:00:00');
先往ds_master_1_slave_0
的t_user_0
表插入該條資料,可以理解為主庫同步到從庫的資料。重新執行find
方法,發現返回的資料和主庫的一致,表明Sharding-JDBC從ds_master_1
的從庫ds_master_1_slave_0
的t_user_0
表查到了資料。
再刪掉ds_master_1_slave_0
的t_user_0
表的資料,往ds_master_1_slave_1
的t_user_0
表插入剛才那條資料,重新執行發現返回的結果為空,表明從ds_master_1
的從庫ds_master_1_slave_1
的t_user_0
表沒有查到資料。
最後往ds_master_1_slave_0
的t_user_0
表重新插入剛才的資料,再執行發現又返回了資料。
基於以上現象,可以推論選擇從庫查詢的時候經過了某種演算法得到訪問的從庫,然後在從庫根據分表規則查詢資料。
讀寫分離實現
這裡包括幾個問題:
- 讀寫分離的查詢流程?
- 如何做結果歸併?
- 如何路由到某個從庫進行查詢?
- 可以強制路由主庫進行讀操作嗎?
讀寫分離的流程
- 獲取主從庫配置規則,資料來源封裝成
MasterSlaveDataSource
- 根據路由計算,得到PreparedStatementUnit單元列表,合併每個PreparedStatementUnit的執行結果返回
- 執行每個PrepareStatementUnit的時候需要獲取連線,這裡根據輪詢負載均衡演算法
RoundRobinMasterSlaveLoadBalanceAlgorithm
得到從庫資料來源,拿到連線後就開始執行具體的SQL查詢了,這裡通過PreparedStatementExecutor.execute()
得到執行結果 - 結果歸併後返回
MasterSlaveDataSource:
public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
private static final ThreadLocal<Boolean> DML_FLAG = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
// 主從配置關係
private MasterSlaveRule masterSlaveRule;
public MasterSlaveDataSource(final MasterSlaveRule masterSlaveRule) throws SQLException {
super(getAllDataSources(masterSlaveRule.getMasterDataSource(), masterSlaveRule.getSlaveDataSourceMap().values()));
this.masterSlaveRule = masterSlaveRule;
}
private static Collection<DataSource> getAllDataSources(final DataSource masterDataSource, final Collection<DataSource> slaveDataSources) {
Collection<DataSource> result = new LinkedList<>(slaveDataSources);
result.add(masterDataSource);
return result;
}
...省略部分程式碼
// 獲取資料來源
public NamedDataSource getDataSource(final SQLType sqlType) {
// 強制路由到主庫查詢
if (isMasterRoute(sqlType)) {
DML_FLAG.set(true);
return new NamedDataSource(masterSlaveRule.getMasterDataSourceName(), masterSlaveRule.getMasterDataSource());
}
// 獲取選中的從庫資料來源
String selectedSourceName = masterSlaveRule.getStrategy().getDataSource(masterSlaveRule.getName(),
masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceMap().keySet()));
DataSource selectedSource = selectedSourceName.equals(masterSlaveRule.getMasterDataSourceName())
? masterSlaveRule.getMasterDataSource() : masterSlaveRule.getSlaveDataSourceMap().get(selectedSourceName);
Preconditions.checkNotNull(selectedSource, "");
return new NamedDataSource(selectedSourceName, selectedSource);
}
MasterSlaveRule:
public final class MasterSlaveRule {
// 名稱(這裡是ds_0和ds_1)
private final String name;
// 主庫資料來源名稱(這裡是ds_master_0和ds_master_1)
private final String masterDataSourceName;
// 主庫資料來源
private final DataSource masterDataSource;
// 所屬從庫列表,key為從庫資料來源名稱,value是真實的資料來源
private final Map<String, DataSource> slaveDataSourceMap;
// 主從庫負載均衡演算法
private final MasterSlaveLoadBalanceAlgorithm strategy;
RoundRobinMasterSlaveLoadBalanceAlgorithm:
// 輪詢負載均衡策略,按照每個從節點訪問次數均衡
public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
private static final ConcurrentHashMap<String, AtomicInteger> COUNT_MAP = new ConcurrentHashMap<>();
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
AtomicInteger count = COUNT_MAP.containsKey(name) ? COUNT_MAP.get(name) : new AtomicInteger(0);
COUNT_MAP.putIfAbsent(name, count);
count.compareAndSet(slaveDataSourceNames.size(), 0);
return slaveDataSourceNames.get(count.getAndIncrement() % slaveDataSourceNames.size());
}
}
DefaultResultSetHandler:
@Override
public List<Object> handleResultSets(Statement stmt) throws SQLException {
ErrorContext.instance().activity("handling results").object(mappedStatement.getId());
// 返回的結果集
final List<Object> multipleResults = new ArrayList<Object>();
int resultSetCount = 0;
ResultSetWrapper rsw = getFirstResultSet(stmt);
List<ResultMap> resultMaps = mappedStatement.getResultMaps();
int resultMapCount = resultMaps.size();
validateResultMapsCount(rsw, resultMapCount);
while (rsw != null && resultMapCount > resultSetCount) {
ResultMap resultMap = resultMaps.get(resultSetCount);
// 將ResultSetWrapper的結果集新增到multipleResults中
handleResultSet(rsw, resultMap, multipleResults, null);
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
String[] resultSets = mappedStatement.getResultSets();
if (resultSets != null) {
while (rsw != null && resultSetCount < resultSets.length) {
ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
if (parentMapping != null) {
String nestedResultMapId = parentMapping.getNestedResultMapId();
ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
handleResultSet(rsw, resultMap, null, parentMapping);
}
rsw = getNextResultSet(stmt);
cleanUpAfterHandlingResultSet();
resultSetCount++;
}
}
return collapseSingleResultList(multipleResults);
}
private void handleResultSet(ResultSetWrapper rsw, ResultMap resultMap, List<Object> multipleResults, ResultMapping parentMapping) throws SQLException {
try {
if (parentMapping != null) {
handleRowValues(rsw, resultMap, null, RowBounds.DEFAULT, parentMapping);
} else {
if (resultHandler == null) {
DefaultResultHandler defaultResultHandler = new DefaultResultHandler(objectFactory);
// 按照resultMap解析到defaultResultHandler中
handleRowValues(rsw, resultMap, defaultResultHandler, rowBounds, null);
// 最後的結果就是這裡加進去的
multipleResults.add(defaultResultHandler.getResultList());
} else {
handleRowValues(rsw, resultMap, resultHandler, rowBounds, null);
}
}
} finally {
// issue #228 (close resultsets)
closeResultSet(rsw.getResultSet());
}
}