ShardingSphere源码解析(五)---结果归并
归并引擎看起来还是挺复杂的,ShardingSphere支持的结果归并从功能上分为遍历、排序、分组、分页和聚合5种类型,它们是组合而非互斥的关系。 从结构划分,可分为流式归并、内存归并和装饰者归并。流式归并和内存归并是互斥的,装饰者归并可以在流式归并和内存归并之上做进一步的处理。
我们以一条order by的sql语句为例走一遍代码,接着上一篇,我们回到ShardingPreparedStatement#mergeQuery方法中,我们看到MergeEngine,从这里开始就是属于结果集的归并
private MergedResult mergeQuery(final List<QueryResult> queryResults)throws SQLException {ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
}
1.归并最核心的代码在ShardingDQLResultMerger#build中,这里存在流失归并和内存怒归并,我们先进OrderByStreamMergedResult方法去看了流式归并
private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData)
throws SQLException {if (isNeedProcessGroupBy(selectStatementContext)) {
return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
}
if (isNeedProcessDistinctRow(selectStatementContext)) {
setGroupByForDistinctRow(selectStatementContext);
return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
}
if (isNeedProcessOrderBy(selectStatementContext)) {
returnnew OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
}
returnnew IteratorStreamMergedResult(queryResults);
}
- 创建了PriorityQueue优先级队列,在orderResultSetsToQueue方法中把结果集放入队列中
publicOrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData)throws SQLException {this.orderByItems = selectStatementContext.getOrderByContext().getItems();
this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
isFirstNext = true;
}
- 最后一行代码,流式结果集归并,设置当前的流式归并结果集,这里存储是当前队列第一个元素结果集所以不会出现内存溢出问题
privatevoidorderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData)throws SQLException {for (QueryResult each : queryResults) {
OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
4.看一下内存归并,进入MemoryMergedResult#MemoryMergedResult方法,这里可以看到需要把数据都加载到内存中
protectedMemoryMergedResult(final T rule, final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults)throws SQLException {List<MemoryQueryResultRow> memoryQueryResultRowList = init(rule, schemaMetaData, sqlStatementContext, queryResults);
memoryResultSetRows = memoryQueryResultRowList.iterator();
if (!memoryQueryResultRowList.isEmpty()) {
currentResultSetRow = memoryQueryResultRowList.get(0);
}
}
- 最后看一下装饰归并,如果有分页就会用到,下面代码中,LimitDecoratorMergedResult就是装饰归并
private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult)throws SQLException {PaginationContext paginationContext = selectStatementContext.getPaginationContext();
if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
return mergedResult;
}
String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();
if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
returnnew LimitDecoratorMergedResult(mergedResult, paginationContext);
}
if ("Oracle".equals(trunkDatabaseName)) {
returnnew RowNumberDecoratorMergedResult(mergedResult, paginationContext);
}
if ("SQLServer".equals(trunkDatabaseName)) {
returnnew TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
}
return mergedResult;
}
以上是 ShardingSphere源码解析(五)---结果归并 的全部内容, 来源链接: utcz.com/a/19225.html