事务导致多数据源切换失败.md 6.6 KB

[TOC]

1、使用注解形式配置多数据源

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DBRouter {

    /**
     * 分库分表字段
     */
    String key() default "UId";

    /**
     * 是否分表
     */
    boolean splitTable() default false;

}
// 使用切面进行处理
@Pointcut("@annotation(com.seamew.middleware.db.router.annotation.DBRouter)")
public void aopPoint() {
}

@Around("aopPoint() && @annotation(dbRouter)")
public Object doRouter(ProceedingJoinPoint jp, DBRouter dbRouter) throws Throwable {
    String dbKey = dbRouter.key();
    if (StringUtils.isBlank(dbKey)) {
        throw new RuntimeException("annotation DBRouter key is null!");
    }

    // 路由属性
    String dbKeyAttr = getAttrValue(dbKey, jp.getArgs());
    // 路由策略
    dbRouterStrategy.doRouter(dbKeyAttr);
    // 返回结果
    try {
        return jp.proceed();
    } finally {
        dbRouterStrategy.clear();
    }
}

2、使用@Transactional

在基于事务@Transactional情况下,当我们在执行事务方法时,会通过AOP机制先执行DataSourceTransactionManager的doBegin()方法,该方法进一步调用AbstractRoutingDataSource的getConnection()方法,再调用determineCurrentLookupKey()决定当前线程使用哪个数据源。

  • DataSourceTransactionManager
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 这里会获取connection
            Connection newCon = obtainDataSource().getConnection();
            // 忽略大部分代码 .....
        }
  • AbstractRoutingDataSource
@Override
public Connection getConnection() throws SQLException {
    return determineTargetDataSource().getConnection();
}
// 最后调用这个determineTargetDataSource方法获取数据源
// 需要注意此时并没有对自定义注解进行处理
protected DataSource determineTargetDataSource() {
    Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
    Object lookupKey = determineCurrentLookupKey();
    DataSource dataSource = this.resolvedDataSources.get(lookupKey);
    if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
        dataSource = this.resolvedDefaultDataSource;
    }
    if (dataSource == null) {
        throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
    }
    return dataSource;
}

3、造成失效原因

如果使用事务mybatis会执行一下代码

  1. 获取sqlsession
private class SqlSessionInterceptor implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 获取sqlsession
        SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
                                              SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
        try {
            Object result = method.invoke(sqlSession, args);
            if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
                // force commit even on non-dirty sessions because some databases require
                // a commit/rollback before calling close()
                sqlSession.commit(true);
            }
            return result;
  1. 反射执行方法
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
        // 这一步就是获取mysql connection
        Configuration configuration = ms.getConfiguration();
        StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
        stmt = prepareStatement(handler, ms.getStatementLog());
        return handler.query(stmt, resultHandler);
    } finally {
        closeStatement(stmt);
    }
}
  1. 获取sql connnection
  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    // 直接获取缓存的connection
    Connection connection = getConnection(statementLog);
    stmt = handler.prepare(connection, transaction.getTimeout());
    handler.parameterize(stmt);
    return stmt;
  }

简单总结:

在开启事务后,便将默认连接放入缓存中,后续操作中直接复用了该连接,导致数据源切换失效。

4、解决方法

在事务开启前,实现数据源切换。改写决定数据源的核心方法determineCurrentLookupKey,先执行特定方法,将数据源放入ThreadLocal中,后续事务执行该方法中,从ThreadLocal中获取到要连接到数据源。

这里用编程式事务举例:

@Override
protected Result grabActivity(PartakeReq partake, ActivityBillVO bill) {
    try {
        dbRouter.doRouter(partake.getUId());
        return transactionTemplate.execute(status -> {
            try {
                // 执行sql 事务
                insert();
                updata();
                return Result.buildResult(Constants.ResponseCode.INDEX_DUP);
            }
            return Result.buildSuccessResult();
        });
    } finally {
        // 注意清理ThreadLocal缓存,不然会导致内存泄漏
        dbRouter.clear();
    }
}
@Override
public void doRouter(String dbKeyAttr) {
    int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount();

    // 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
    int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16));

    // 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
    int dbIdx = idx / dbRouterConfig.getTbCount() + 1;
    int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1);

    // 设置到 ThreadLocal
    DBContextHolder.setDBKey(String.format("%02d", dbIdx));
    DBContextHolder.setTBKey(String.format("%03d", tbIdx));
    log.debug("数据库路由 dbIdx:{} tbIdx:{}",  dbIdx, tbIdx);
}

5、参考文章

注意清理ThreadLocal缓存