[TOC]
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DBRouter {
/**
* 分库分表字段
*/
String key() default "UId";
/**
* 是否分表
*/
boolean splitTable() default false;
}
// 使用切面进行处理
@Pointcut("@annotation(com.seamew.middleware.db.router.annotation.DBRouter)")
public void aopPoint() {
}
@Around("aopPoint() && @annotation(dbRouter)")
public Object doRouter(ProceedingJoinPoint jp, DBRouter dbRouter) throws Throwable {
String dbKey = dbRouter.key();
if (StringUtils.isBlank(dbKey)) {
throw new RuntimeException("annotation DBRouter key is null!");
}
// 路由属性
String dbKeyAttr = getAttrValue(dbKey, jp.getArgs());
// 路由策略
dbRouterStrategy.doRouter(dbKeyAttr);
// 返回结果
try {
return jp.proceed();
} finally {
dbRouterStrategy.clear();
}
}
在基于事务@Transactional情况下,当我们在执行事务方法时,会通过AOP机制先执行DataSourceTransactionManager
的doBegin()方法,该方法进一步调用AbstractRoutingDataSource
的getConnection()方法,再调用determineCurrentLookupKey()
决定当前线程使用哪个数据源。
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 这里会获取connection
Connection newCon = obtainDataSource().getConnection();
// 忽略大部分代码 .....
}
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
// 最后调用这个determineTargetDataSource方法获取数据源
// 需要注意此时并没有对自定义注解进行处理
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
如果使用事务mybatis会执行一下代码
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 获取sqlsession
SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
// 这一步就是获取mysql connection
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
// 直接获取缓存的connection
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
简单总结:
在开启事务后,便将默认连接放入缓存中,后续操作中直接复用了该连接,导致数据源切换失效。
在事务开启前,实现数据源切换。改写决定数据源的核心方法determineCurrentLookupKey
,先执行特定方法,将数据源放入ThreadLocal
中,后续事务执行该方法中,从ThreadLocal中获取到要连接到数据源。
这里用编程式事务举例:
@Override
protected Result grabActivity(PartakeReq partake, ActivityBillVO bill) {
try {
dbRouter.doRouter(partake.getUId());
return transactionTemplate.execute(status -> {
try {
// 执行sql 事务
insert();
updata();
return Result.buildResult(Constants.ResponseCode.INDEX_DUP);
}
return Result.buildSuccessResult();
});
} finally {
// 注意清理ThreadLocal缓存,不然会导致内存泄漏
dbRouter.clear();
}
}
@Override
public void doRouter(String dbKeyAttr) {
int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount();
// 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16));
// 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
int dbIdx = idx / dbRouterConfig.getTbCount() + 1;
int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1);
// 设置到 ThreadLocal
DBContextHolder.setDBKey(String.format("%02d", dbIdx));
DBContextHolder.setTBKey(String.format("%03d", tbIdx));
log.debug("数据库路由 dbIdx:{} tbIdx:{}", dbIdx, tbIdx);
}