> [TOC] # 1、使用注解形式配置多数据源 ```java @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface DBRouter { /** * 分库分表字段 */ String key() default "UId"; /** * 是否分表 */ boolean splitTable() default false; } // 使用切面进行处理 @Pointcut("@annotation(com.seamew.middleware.db.router.annotation.DBRouter)") public void aopPoint() { } @Around("aopPoint() && @annotation(dbRouter)") public Object doRouter(ProceedingJoinPoint jp, DBRouter dbRouter) throws Throwable { String dbKey = dbRouter.key(); if (StringUtils.isBlank(dbKey)) { throw new RuntimeException("annotation DBRouter key is null!"); } // 路由属性 String dbKeyAttr = getAttrValue(dbKey, jp.getArgs()); // 路由策略 dbRouterStrategy.doRouter(dbKeyAttr); // 返回结果 try { return jp.proceed(); } finally { dbRouterStrategy.clear(); } } ``` # 2、使用@Transactional 在基于事务@Transactional情况下,当我们在执行事务方法时,会通过AOP机制先执行`DataSourceTransactionManager`的doBegin()方法,该方法进一步调用`AbstractRoutingDataSource`的getConnection()方法,再调用`determineCurrentLookupKey()`决定当前线程使用哪个数据源。 * DataSourceTransactionManager ```java @Override protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 这里会获取connection Connection newCon = obtainDataSource().getConnection(); // 忽略大部分代码 ..... } ``` * AbstractRoutingDataSource ```java @Override public Connection getConnection() throws SQLException { return determineTargetDataSource().getConnection(); } // 最后调用这个determineTargetDataSource方法获取数据源 // 需要注意此时并没有对自定义注解进行处理 protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } return dataSource; } ``` # 3、造成失效原因 如果使用事务mybatis会执行一下代码 1. 获取sqlsession ```java private class SqlSessionInterceptor implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 获取sqlsession SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator); try { Object result = method.invoke(sqlSession, args); if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) { // force commit even on non-dirty sessions because some databases require // a commit/rollback before calling close() sqlSession.commit(true); } return result; ``` 2. 反射执行方法 ```java @Override public List doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { Statement stmt = null; try { // 这一步就是获取mysql connection Configuration configuration = ms.getConfiguration(); StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql); stmt = prepareStatement(handler, ms.getStatementLog()); return handler.query(stmt, resultHandler); } finally { closeStatement(stmt); } } ``` 3. 获取sql connnection ```java private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Statement stmt; // 直接获取缓存的connection Connection connection = getConnection(statementLog); stmt = handler.prepare(connection, transaction.getTimeout()); handler.parameterize(stmt); return stmt; } ``` **简单总结:** > 在开启事务后,便将默认连接放入缓存中,后续操作中直接复用了该连接,导致数据源切换失效。 # 4、解决方法 在事务开启前,实现数据源切换。改写决定数据源的核心方法`determineCurrentLookupKey`,先执行特定方法,将数据源放入`ThreadLocal`中,后续事务执行该方法中,从ThreadLocal中获取到要连接到数据源。 这里用编程式事务举例: ```java @Override protected Result grabActivity(PartakeReq partake, ActivityBillVO bill) { try { dbRouter.doRouter(partake.getUId()); return transactionTemplate.execute(status -> { try { // 执行sql 事务 insert(); updata(); return Result.buildResult(Constants.ResponseCode.INDEX_DUP); } return Result.buildSuccessResult(); }); } finally { // 注意清理ThreadLocal缓存,不然会导致内存泄漏 dbRouter.clear(); } } ``` ```java @Override public void doRouter(String dbKeyAttr) { int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount(); // 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。 int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16)); // 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号 int dbIdx = idx / dbRouterConfig.getTbCount() + 1; int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1); // 设置到 ThreadLocal DBContextHolder.setDBKey(String.format("%02d", dbIdx)); DBContextHolder.setTBKey(String.format("%03d", tbIdx)); log.debug("数据库路由 dbIdx:{} tbIdx:{}", dbIdx, tbIdx); } ``` # 5、参考文章 [注意清理ThreadLocal缓存](../Java/JAVA高阶/JUC编程/Threadlocal.md)