資料庫讀寫分離-springboot事務配置篇
阿新 • • 發佈:2019-02-03
spring事務配置見:https://blog.csdn.net/andyzhaojianhui/article/details/74357100?locationNum=9&fps=1
根據這篇文章做了一些修改以適用於springboot專案,可能還有一些未知問題,目前使用中尚未發現,歡迎指正,不勝感激
注意:我們約定
配置檔案中的寫庫的連線資訊spring.datasource開頭,例如spring.datasource.url=
spring.read.datasource.name這項來確定有多少個讀庫,多個讀庫以英文逗號分隔
例如spring.read.datasource.name=read1,read2
讀庫連線資訊以spring. + 上面的name對應的讀庫名 + .datasource開頭,
例如
spring.read1.datasource.url=
spring.read2.datasource.url=
1.DataSourceConfiguration,例項化資料來源,事務管理等
import com.github.pagehelper.PageInterceptor; import org.apache.ibatis.plugin.Interceptor; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.tomcat.jdbc.pool.PoolProperties; import org.mybatis.spring.SqlSessionFactoryBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.interceptor.TransactionInterceptor; import org.springframework.util.StringUtils; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * 初始化資料來源、事務管理器等 */ @Configuration public class DataSourceConfiguration { @Autowired private Environment env; @Bean public DataSource dataSource() { ReadWriteDataSource dataSource = new ReadWriteDataSource(); dataSource.setWriteDataSource(writeDataSource()); dataSource.setReadDataSourceMap(readDataSourceMap()); return dataSource; } @Autowired private TransactionInterceptor txAdvice; @Bean public ReadWriteDataSourceProcessor readWriteDataSourceTransactionProcessor() { ReadWriteDataSourceProcessor processor = new ReadWriteDataSourceProcessor(); processor.setForceChoiceReadWhenWrite(true); processor.postProcessAfterInitialization(txAdvice.getTransactionAttributeSource(), null); return processor; } //初始化寫資料來源 public DataSource writeDataSource() { PoolProperties properties = new PoolProperties(); properties.setUrl(env.getProperty("spring.datasource.url")); properties.setUsername(env.getProperty("spring.datasource.username")); properties.setPassword(env.getProperty("spring.datasource.password")); properties.setDriverClassName(env.getProperty("spring.datasource.jdbc.driver")); properties.setInitialSize(Integer.valueOf(env.getProperty("spring.datasource.tomcat.initial-size"))); properties.setMinIdle(Integer.valueOf(env.getProperty("spring.datasource.tomcat.min-idle"))); properties.setMaxActive(Integer.valueOf(env.getProperty("spring.datasource.tomcat.max-active"))); properties.setMaxIdle(Integer.valueOf(env.getProperty("spring.datasource.tomcat.max-idle"))); properties.setMaxWait(Integer.valueOf(env.getProperty("spring.datasource.tomcat.max-wait"))); properties.setValidationQuery(env.getProperty("spring.datasource.tomcat.validation-query")); properties.setTestWhileIdle(Boolean.valueOf(env.getProperty("spring.datasource.tomcat.test-while-idle"))); properties.setTimeBetweenEvictionRunsMillis(Integer.valueOf(env.getProperty("spring.datasource.tomcat.time-between-eviction-runs-millis"))); org.apache.tomcat.jdbc.pool.DataSource dataSource = new org.apache.tomcat.jdbc.pool.DataSource(properties); return dataSource; } //初始化讀資料來源,這裡可以看出為什麼上面要進行那樣的約定 public Map<String, DataSource> readDataSourceMap() { String readCount = env.getProperty("spring.read.datasource.name"); if (!StringUtils.isEmpty(readCount)) { String[] split = readCount.split(","); if (split.length > 0) { Map<String, DataSource> dMap = new HashMap<>(split.length); for (String s : split) { PoolProperties properties = new PoolProperties(); properties.setDriverClassName(env.getProperty("spring.datasource.jdbc.driver")); properties.setInitialSize(Integer.valueOf(env.getProperty("spring.datasource.tomcat.initial-size"))); properties.setMinIdle(Integer.valueOf(env.getProperty("spring.datasource.tomcat.min-idle"))); properties.setMaxActive(Integer.valueOf(env.getProperty("spring.datasource.tomcat.max-active"))); properties.setMaxWait(Integer.valueOf(env.getProperty("spring.datasource.tomcat.max-wait"))); properties.setValidationQuery(env.getProperty("spring.datasource.tomcat.validation-query")); properties.setTestWhileIdle(Boolean.valueOf(env.getProperty("spring.datasource.tomcat.test-while-idle"))); properties.setMaxIdle(Integer.valueOf(env.getProperty("spring.datasource.tomcat.max-idle"))); properties.setTimeBetweenEvictionRunsMillis(Integer.valueOf(env.getProperty("spring.datasource.tomcat.time-between-eviction-runs-millis"))); properties.setUrl(env.getProperty("spring." + s + ".datasource.url")); properties.setUsername(env.getProperty("spring." + s + ".datasource.username")); properties.setPassword(env.getProperty("spring." + s + ".datasource.password")); dMap.put(s, new org.apache.tomcat.jdbc.pool.DataSource(properties)); } return dMap; } } return null; } @Bean public PlatformTransactionManager transactionManager() { return new DataSourceTransactionManager(dataSource()); } @Bean public SqlSessionFactory sqlSessionFactoryBean() throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSource()); PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); sqlSessionFactoryBean.setMapperLocations(resolver.getResources(env.getProperty("mybatis.mapper-locations"))); //mybatis的分頁外掛 sqlSessionFactoryBean.setPlugins(mybatisPlugins()); //自定義一些配置 sqlSessionFactoryBean.setConfiguration(myConfiguration()); return sqlSessionFactoryBean.getObject(); } private Interceptor[] mybatisPlugins() { PageInterceptor interceptor = new PageInterceptor(); Properties pageHelperProps = new Properties(); pageHelperProps.setProperty("helperDialect", "mysql"); pageHelperProps.setProperty("offsetAsPageNum", "true"); pageHelperProps.setProperty("pageSizeZero", "true"); pageHelperProps.setProperty("rowBoundsWithCount", "true"); interceptor.setProperties(pageHelperProps); Interceptor[] plugins = {interceptor}; return plugins; } private org.apache.ibatis.session.Configuration myConfiguration() { org.apache.ibatis.session.Configuration conf = new org.apache.ibatis.session.Configuration(); //是否啟用 資料中 a_column 自動對映 到 java類中駝峰命名的屬性。[預設:false] conf.setMapUnderscoreToCamelCase(true); return conf; } }
2.ReadWriteDataSource
借用本文開頭的文章連結中的ReadWriteDataSource類
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.datasource.AbstractDataSource; import org.springframework.util.CollectionUtils; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; /** * 讀/寫動態選擇資料庫實現 * 目前實現功能 * 一寫庫多讀庫選擇功能,請參考 * 預設按順序輪詢使用讀庫 * 預設選擇寫庫 * 已實現:一寫多讀、當寫時預設讀操作到寫庫、當寫時強制讀操作到讀庫 * 讀庫負載均衡、讀庫故障轉移 */ public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean { private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class); private DataSource writeDataSource; private Map<String, DataSource> readDataSourceMap; private String[] readDataSourceNames; private DataSource[] readDataSources; private int readDataSourceCount; private AtomicInteger counter = new AtomicInteger(1); /** * 設定讀庫(name, DataSource) */ public void setReadDataSourceMap(Map<String, DataSource> dMap) { this.readDataSourceMap = dMap; } /** * 配置寫庫 */ public void setWriteDataSource(DataSource dataSource) { this.writeDataSource = dataSource; } private DataSource determineDataSource() { if (ReadWriteDataSourceDecision.isChoiceWrite()) { return writeDataSource; } if (ReadWriteDataSourceDecision.isChoiceNone()) { return writeDataSource; } return determineReadDataSource(); } private DataSource determineReadDataSource() { //按照順序選擇讀庫 //演算法改進 int index = counter.incrementAndGet() % readDataSourceCount; if (index < 0) { index = -index; } return readDataSources[index]; } @Override public Connection getConnection() throws SQLException { return determineDataSource().getConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException { return determineDataSource().getConnection(username, password); } @Override public void afterPropertiesSet() throws Exception { if (writeDataSource == null) { throw new IllegalArgumentException("property 'writeDataSource' is required"); } if (CollectionUtils.isEmpty(readDataSourceMap)) { throw new IllegalArgumentException("property 'readDataSourceMap' is required"); } readDataSourceCount = readDataSourceMap.size(); readDataSources = new DataSource[readDataSourceCount]; readDataSourceNames = new String[readDataSourceCount]; int i = 0; for (Entry<String, DataSource> e : readDataSourceMap.entrySet()) { readDataSources[i] = e.getValue(); readDataSourceNames[i] = e.getKey(); i++; } } }
3.ReadWriteDataSourceDecision
借用本文開頭的文章連結中的ReadWriteDataSourceDecision類
import org.springframework.context.annotation.Configuration;
/**
* 讀/寫動態資料庫 決策者
* 根據DataSourceType是write/read 來決定是使用讀/寫資料庫
* 通過ThreadLocal繫結實現選擇功能
*/
@Configuration
public class ReadWriteDataSourceDecision {
public enum DataSourceType {
write, read;
}
private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<>();
public static void markWrite() {
holder.set(DataSourceType.write);
}
public static void markRead() {
holder.set(DataSourceType.read);
}
public static void reset() {
holder.set(null);
}
public static boolean isChoiceNone() {
return null == holder.get();
}
public static boolean isChoiceWrite() {
return DataSourceType.write == holder.get();
}
public static boolean isChoiceRead() {
return DataSourceType.read == holder.get();
}
}
4.ReadWriteDataSourceProcessor
借用本文開頭的文章連結中的ReadWriteDataSourceProcessor類
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.NestedRuntimeException;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
/**
* 此類實現了兩個職責(為了減少類的數量將兩個功能合併到一起了):
* 讀/寫動態資料庫選擇處理器
* 通過AOP切面實現讀/寫選擇
* <p>
* ★★讀/寫動態資料庫選擇處理器★★
* 1、首先讀取<tx:advice>事務屬性配置
* <p>
* 2、對於所有讀方法設定 read-only="true" 表示讀取操作(以此來判斷是選擇讀還是寫庫),其他操作都是走寫庫
* 如<tx:method name="×××" read-only="true"/>
* <p>
* 3、 forceChoiceReadOnWrite用於確定在如果目前是寫(即開啟了事務),下一步如果是讀,
* 是直接參與到寫庫進行讀,還是強制從讀庫讀<br/>
* forceChoiceReadOnWrite:true 表示目前是寫,下一步如果是讀,強制參與到寫事務(即從寫庫讀)
* 這樣可以避免寫的時候從讀庫讀不到資料
* <p>
* 通過設定事務傳播行為:SUPPORTS實現
* <p>
* forceChoiceReadOnWrite:false 表示不管當前事務是寫/讀,都強制從讀庫獲取資料
* 通過設定事務傳播行為:NOT_SUPPORTS實現(連線是儘快釋放)
* 『此處藉助了 NOT_SUPPORTS會掛起之前的事務進行操作 然後再恢復之前事務完成的』
* 4、配置方式
* <bean id="readWriteDataSourceTransactionProcessor" class="cn.javass.common.datasource.ReadWriteDataSourceProcessor">
* <property name="forceChoiceReadWhenWrite" value="false"/>
* </bean>
* <p>
* 5、目前只適用於<tx:advice>情況
* 支援@Transactional註解事務
* <p>
* ★★通過AOP切面實現讀/寫庫選擇★★
* <p>
* 1、首先將當前方法 與 根據之前【讀/寫動態資料庫選擇處理器】 提取的讀庫方法 進行匹配
* <p>
* 2、如果匹配,說明是讀取資料:
* 2.1、如果forceChoiceReadOnWrite:true,即強制走讀庫
* 2.2、如果之前是寫操作且forceChoiceReadOnWrite:false,將從寫庫進行讀取
* 2.3、否則,到讀庫進行讀取資料
* <p>
* 3、如果不匹配,說明預設將使用寫庫進行操作
* <p>
* 4、配置方式
* <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
* <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/>
* </aop:aspect>
* 4.1、此處order = Integer.MIN_VALUE 即最高的優先順序(請參考http://jinnianshilongnian.iteye.com/blog/1423489)
* 4.2、切入點:txPointcut 和 實施事務的切入點一樣
* 4.3、determineReadOrWriteDB方法用於決策是走讀/寫庫的,請參考
* @see cn.javass.common.datasource.ReadWriteDataSourceDecision
* @see cn.javass.common.datasource.ReadWriteDataSource
*/
@Aspect
public class ReadWriteDataSourceProcessor implements BeanPostProcessor {
private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class);
@Pointcut(TxAdviceInterceptor.AOP_POINTCUT_EXPRESSION)
public void txPointcut() {
}
private boolean forceChoiceReadWhenWrite = false;
private Map<String, Boolean> readMethodMap = new HashMap<>();
/**
* 當之前操作是寫的時候,是否強制從從庫讀 預設(false) 當之前操作是寫,預設強制從寫庫讀
*/
public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) {
this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
if (!(bean instanceof NameMatchTransactionAttributeSource)) {
return bean;
}
try {
NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource) bean;
Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");
nameMapField.setAccessible(true);
@SuppressWarnings("unchecked")
Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource);
for (Entry<String, TransactionAttribute> entry : nameMap.entrySet()) {
RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute) entry.getValue();
// 僅對read-only的處理
if (!attr.isReadOnly()) {
continue;
}
String methodName = entry.getKey();
Boolean isForceChoiceRead = Boolean.FALSE;
if (forceChoiceReadWhenWrite) {
// 不管之前操作是寫,預設強制從讀庫讀 (設定為NOT_SUPPORTED即可)
// NOT_SUPPORTED會掛起之前的事務
attr.setPropagationBehavior(Propagation.NOT_SUPPORTED
.value());
isForceChoiceRead = Boolean.TRUE;
} else {
// 否則 設定為SUPPORTS(這樣可以參與到寫事務)
attr.setPropagationBehavior(Propagation.SUPPORTS.value());
}
readMethodMap.put(methodName, isForceChoiceRead);
}
} catch (Exception e) {
throw new ReadWriteDataSourceTransactionException(
"process read/write transaction error", e);
}
return bean;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}
private class ReadWriteDataSourceTransactionException extends NestedRuntimeException {
private static final long serialVersionUID = 7537763615924915804L;
public ReadWriteDataSourceTransactionException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* 確定選擇哪個資料來源(讀庫還是寫庫)
*
* @param pjp
* @return
* @throws Throwable
*/
@Around("txPointcut()")
public Object determineReadOrWriteDB(ProceedingJoinPoint pjp)
throws Throwable {
if (isChoiceReadDB(pjp.getSignature().getName())) {
ReadWriteDataSourceDecision.markRead();
} else {
ReadWriteDataSourceDecision.markWrite();
}
try {
return pjp.proceed();
} finally {
ReadWriteDataSourceDecision.reset();
}
}
/**
* 根據方法名確定是否選擇 讀庫
*
* @param methodName 方法名
* @return
*/
private boolean isChoiceReadDB(String methodName) {
String bestNameMatch = null;
for (String mappedName : this.readMethodMap.keySet()) {
if (isMatch(methodName, mappedName)) {
bestNameMatch = mappedName;
break;
}
}
Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch);
// 表示強制選擇 讀 庫
if (Objects.equals(isForceChoiceRead, Boolean.TRUE)) {
return true;
}
// 如果之前選擇了寫庫 現在還選擇 寫庫
if (ReadWriteDataSourceDecision.isChoiceWrite()) {
return false;
}
// 表示應該選擇讀庫
if (isForceChoiceRead != null) {
return true;
}
// 預設選擇 寫庫
return false;
}
protected boolean isMatch(String methodName, String mappedName) {
return PatternMatchUtils.simpleMatch(mappedName, methodName);
}
}
5.TxAdviceInterceptor
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* 用於配置事務處理
* requiredTx對應的是寫事務,走寫庫
* readOnlyTx對應的是讀事務,走讀庫
* 一個事務既有寫又有讀,走寫庫
*/
@Configuration
public class TxAdviceInterceptor {
private static final int TX_METHOD_TIMEOUT = 3000;
public static final String AOP_POINTCUT_EXPRESSION = "execution (* com.iclassmate.abel.service.*.*(..))";
@Autowired
private PlatformTransactionManager transactionManager;
@Bean
public TransactionInterceptor txAdvice() {
NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
/*只讀事務,不做更新操作*/
RuleBasedTransactionAttribute readOnlyTx = new RuleBasedTransactionAttribute();
readOnlyTx.setReadOnly(true);
readOnlyTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
/*當前存在事務就使用當前事務,當前不存在事務就建立一個新的事務*/
RuleBasedTransactionAttribute requiredTx = new RuleBasedTransactionAttribute();
requiredTx.setRollbackRules(
Collections.singletonList(new RollbackRuleAttribute(Exception.class)));
requiredTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
requiredTx.setTimeout(TX_METHOD_TIMEOUT);
Map<String, TransactionAttribute> txMap = new HashMap<>();
txMap.put("save*", requiredTx);
txMap.put("add*", requiredTx);
txMap.put("create*", requiredTx);
txMap.put("insert*", requiredTx);
txMap.put("update*", requiredTx);
txMap.put("delete*", requiredTx);
txMap.put("merge*", requiredTx);
txMap.put("remove*", requiredTx);
txMap.put("put*", requiredTx);
txMap.put("drop*", requiredTx);
txMap.put("sync*",requiredTx);
txMap.put("get*", readOnlyTx);
txMap.put("query*", readOnlyTx);
txMap.put("count*", readOnlyTx);
txMap.put("exist*", readOnlyTx);
txMap.put("find*", readOnlyTx);
txMap.put("list*", readOnlyTx);
txMap.put("translate*", readOnlyTx);
txMap.put("select*", readOnlyTx);
txMap.put("*", requiredTx);
source.setNameMap(txMap);
TransactionInterceptor txAdvice = new TransactionInterceptor(transactionManager, source);
return txAdvice;
}
@Bean
public Advisor txAdviceAdvisor() {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression(AOP_POINTCUT_EXPRESSION);
return new DefaultPointcutAdvisor(pointcut, txAdvice());
}
}