ShardingSphere源码解析(五)---结果归并

归并引擎看起来还是挺复杂的,ShardingSphere支持的结果归并从功能上分为遍历、排序、分组、分页和聚合5种类型,它们是组合而非互斥的关系。 从结构划分,可分为流式归并、内存归并和装饰者归并。流式归并和内存归并是互斥的,装饰者归并可以在流式归并和内存归并之上做进一步的处理。

我们以一条order by的sql语句为例走一遍代码,接着上一篇,我们回到ShardingPreparedStatement#mergeQuery方法中,我们看到MergeEngine,从这里开始就是属于结果集的归并

private MergedResult mergeQuery(final List<QueryResult> queryResults)throws SQLException {

ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();

MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());

return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());

}

1.归并最核心的代码在ShardingDQLResultMerger#build中,这里存在流失归并和内存怒归并,我们先进OrderByStreamMergedResult方法去看了流式归并

private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,

final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData)throws SQLException {

if (isNeedProcessGroupBy(selectStatementContext)) {

return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);

}

if (isNeedProcessDistinctRow(selectStatementContext)) {

setGroupByForDistinctRow(selectStatementContext);

return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);

}

if (isNeedProcessOrderBy(selectStatementContext)) {

returnnew OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);

}

returnnew IteratorStreamMergedResult(queryResults);

}

  1. 创建了PriorityQueue优先级队列,在orderResultSetsToQueue方法中把结果集放入队列中

publicOrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData)throws SQLException {

this.orderByItems = selectStatementContext.getOrderByContext().getItems();

this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());

orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);

isFirstNext = true;

}

  1. 最后一行代码,流式结果集归并,设置当前的流式归并结果集,这里存储是当前队列第一个元素结果集所以不会出现内存溢出问题

privatevoidorderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData)throws SQLException {

for (QueryResult each : queryResults) {

OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);

if (orderByValue.next()) {

orderByValuesQueue.offer(orderByValue);

}

}

setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());

}

4.看一下内存归并,进入MemoryMergedResult#MemoryMergedResult方法,这里可以看到需要把数据都加载到内存中

protectedMemoryMergedResult(final T rule, final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults)throws SQLException {

List<MemoryQueryResultRow> memoryQueryResultRowList = init(rule, schemaMetaData, sqlStatementContext, queryResults);

memoryResultSetRows = memoryQueryResultRowList.iterator();

if (!memoryQueryResultRowList.isEmpty()) {

currentResultSetRow = memoryQueryResultRowList.get(0);

}

}

  1. 最后看一下装饰归并,如果有分页就会用到,下面代码中,LimitDecoratorMergedResult就是装饰归并

private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult)throws SQLException {

PaginationContext paginationContext = selectStatementContext.getPaginationContext();

if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {

return mergedResult;

}

String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();

if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {

returnnew LimitDecoratorMergedResult(mergedResult, paginationContext);

}

if ("Oracle".equals(trunkDatabaseName)) {

returnnew RowNumberDecoratorMergedResult(mergedResult, paginationContext);

}

if ("SQLServer".equals(trunkDatabaseName)) {

returnnew TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);

}

return mergedResult;

}

以上是 ShardingSphere源码解析(五)---结果归并 的全部内容, 来源链接: utcz.com/a/19225.html

回到顶部