ShardingSphere源码解析

根据官网的介绍,SS去连接数据库的时候有2种模式,一种是内存限制模式,一种是连接限制模式

  • 内存限制模式:使用此模式的前提是,ShardingSphere对一次操作所耗费的数据库连接数量不做限制。 如果实际执行的SQL需要对某数据库实例中的200张表做操作,则对每张表创建一个新的数据库连接,并通过多线程的方式并发处理,以达成执行效率最大化。并且在SQL满足条件情况下,优先选择流式归并,以防止出现内存溢出或避免频繁垃圾回收情况。
  • 连接限制模式:使用此模式的前提是,ShardingSphere严格控制对一次操作所耗费的数据库连接数量。 如果实际执行的SQL需要对某数据库实例中的200张表做操作,那么只会创建唯一的数据库连接,并对其200张表串行处理。 如果一次操作中的分片散落在不同的数据库,仍然采用多线程处理对不同库的操作,但每个库的每次操作仍然只创建一个唯一的数据库连接。 这样即可以防止对一次请求对数据库连接占用过多所带来的问题。该模式始终选择内存归并。

上面是官网对连接模式的定义,针对每次操作,SS都会自动去选择使用内存限制模式还是连接限制模式,我们可以通过maxConnectionSizePerQuery去配置一次查询时每个数据库所允许使用的最大连接数,如果这里配置的大于1,那么在使用连接限制模式的情况下会采用分组异步去操作数据库,ConnectionSizePerQuery配置了多少就分多少组

1.前面2节分析了sql的解析,路由,改写,现在到了sql执行这一步了,我们回到最初的execute方法中,SS执行引擎分为准备阶段和执行阶段,分别对应initPreparedStatementExecutor()和preparedStatementExecutor.execute(),下面进入initPreparedStatementExecutor方法

publicbooleanexecute()throws SQLException {

try {

clearPrevious();

prepare();

initPreparedStatementExecutor();

return preparedStatementExecutor.execute();

} finally {

clearBatch();

}

}

2.PreparedStatementExecutor#init方法,executionContext参数是我们在sql改写完成之后实例化出来的对象,里面包含的属性包含sqlStatementContext和sql执行单元

然后我们关注obtainExecuteGroups方法,这个方法里实现了执行分组

publicvoidinit(final ExecutionContext executionContext)throws SQLException {

setSqlStatementContext(executionContext.getSqlStatementContext());

getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));

cacheStatements();

}

3.这里创建了一个回调,SQLExecutePrepareCallback包含2个方法,getConnections获取数据库连接集合,createStatementExecuteUnit创建sql执行单元,留个印象,现在我们还不知道他的具体用法,继续往后看

private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {

return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {

@Override

public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, finalint connectionSize)throws SQLException {

return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);

}

@Override

public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode)throws SQLException {

returnnew StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);

}

});

}

4.getSQLUnitGroups方法以数据源分组,把相同数据源的执行单元放到同一个组下,然后进入getSQLExecuteGroups,这里选择了连接模式

private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(

final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {

Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);

Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();

for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {

result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));

}

return result;

}

5.我现在需要执行的sql数量为5,配置的maxConnectionsSizePerQuery=3,计算desiredPartitionSize=2

private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,

final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {

List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();

int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);

List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);

ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;

List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());

int count = 0;

for (List<SQLUnit> each : sqlUnitPartitions) {

result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));

}

return result;

}

5.1 sqlUnitPartitions是根据执行的sql数量和desiredPartitionSize来给sql分组,按照上面的数据,下图展示了分组情况,分了3组,3代表了需要获取3个数据库连接

5.2 ConnectionMode计算方式如下图,我们现在sql数量5 大于 maxConnectionsSizePerQuery,所以这次的查询为连接限制模式(CONNECTION_STRICTLY)

6 通过callback的getConnection方法去获取连接,PreparedStatementExecutor.super.getConnection()获取到的是ShardingConnection,具体执行getConnections是父类AbstractConnectionAdapter中执行的

public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, finalint connectionSize)throws SQLException {

return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);

}

7.cachedConnections是一个LinkedHashMultimap对象,先判断map中有没有目标数据源的connection,如果没有那么先创建,看下createConnections方法

publicfinal List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, finalint connectionSize)throws SQLException {

DataSource dataSource = getDataSourceMap().get(dataSourceName);

Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);

Collection<Connection> connections;

synchronized (cachedConnections) {

connections = cachedConnections.get(dataSourceName);

}

List<Connection> result;

if (connections.size() >= connectionSize) {

result = new ArrayList<>(connections).subList(0, connectionSize);

} elseif (!connections.isEmpty()) {

result = new ArrayList<>(connectionSize);

result.addAll(connections);

List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());

result.addAll(newConnections);

synchronized (cachedConnections) {

cachedConnections.putAll(dataSourceName, newConnections);

}

} else {

result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));

synchronized (cachedConnections) {

cachedConnections.putAll(dataSourceName, result);

}

}

return result;

}

8.AbstractConnectionAdapter#createConnections()中,对于获取连接做了一点优化,如果是内存限制模式则需要同步去创建connection,因为可能会发生死锁,所以需要同步

if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {

return createConnections(dataSourceName, dataSource, connectionSize);

}

synchronized (dataSource) {

return createConnections(dataSourceName, dataSource, connectionSize);

}

9.创建connection的是在AbstractConnectionAdapter#createConnections()中,createConnection方式是一个抽象方法,实现方法在子类ShardingConnection#createConnection中,判断是否需要连接是否需要事物,如果不需要直接通过datasource.getConnection创建连接,如果需要事物则通过shardingTransactionManager去创建连接

private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, finalint connectionSize)throws SQLException {

List<Connection> result = new ArrayList<>(connectionSize);

for (int i = 0; i < connectionSize; i++) {

try {

Connection connection = createConnection(dataSourceName, dataSource);

replayMethodsInvocation(connection);

result.add(connection);

...

10.创建完connections,会通过线程安全的方式放入cachedConnections中,现在再回到SQLExecutePrepareTemplate#getSQLExecuteGroups中,getSQLExecuteGroup方法中取创建最终的执行单元

for (List<SQLUnit> each : sqlUnitPartitions) {

result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));

}

11.sqlUnitGroup是我们之前分组得到的每个connection中需要执行的sql的组数,我们测试场景下这里的sqlUnitGroup size是2,每个连接需要执行2次sql,调用callback的createStatementExecuteUnit创建执行单元

private InputGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection,

final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback)throws SQLException {

List<StatementExecuteUnit> result = new LinkedList<>();

for (SQLUnit each : sqlUnitGroup) {

result.add(callback.createStatementExecuteUnit(connection, new ExecutionUnit(dataSourceName, each), connectionMode));

}

returnnew InputGroup<>(result);

}

12.最终在PreparedStatementExecutor#createPreparedStatement方法通过connection.prepareStatement来得到PreparedStatement,之后返回InputGroup StatementExecuteUnit

private PreparedStatement createPreparedStatement(final Connection connection, final String sql)throws SQLException {

return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)

: connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());

}

13.回到PreparedStatementExecutor#init中,现在inputGroups对象已经准备好了,然后在ShardingPreparedStatement#initPreparedStatementExecutor中,通过setParametersForStatements,replayMethodForStatements完成相关参数和属性设置

publicvoidinit(final ExecutionContext executionContext)throws SQLException {

setSqlStatementContext(executionContext.getSqlStatementContext());

getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));

cacheStatements();

}

SQL执行准备阶段这里就结束了,接下来我们进入SQL执行的执行阶段

以上是 ShardingSphere源码解析 的全部内容, 来源链接: utcz.com/a/19235.html

回到顶部