[ApacheDoris]ApacheDoris元数据设计及DDL操作源码阅读

database

元数据设计

如上图,Doris 的元数据主要存储4类数据:

  • 用户数据信息。包括数据库、表的 Schema、分片信息等。
  • 各类作业信息。如导入作业,Clone 作业、SchemaChange 作业等。
  • 用户及权限信息
  • 集群及节点信息

元数据目录

元数据目录通过 FE 的配置项 meta_dir 指定。

bdb/ 目录下为 bdbje 的数据存放目录。

image/ 目录下为 image 文件的存放目录。

image.[logid] 是最新的 image 文件。后缀 logid 表明 image 所包含的最后一条日志的 id。

image.ckpt 是正在写入的 image 文件,如果写入成功,会重命名为 image.[logid],并替换掉旧的 image 文件。

VERSION 文件中记录着 cluster_id。cluster_id 唯一标识一个 Doris 集群。是在 leader 第一次启动时随机生成的一个 32 位整型。也可以通过 fe 配置项 cluster_id 来指定一个 cluster id。

ROLE 文件中记录的 FE 自身的角色。只有 FOLLOWER 和 OBSERVER 两种。其中 FOLLOWER 表示 FE 为一个可选举的节点。(注意:即使是 leader 节点,其角色也为 FOLLOWER)

DDL相关源代码阅读

启动MySQL服务

org.apache.doris.qe.QeService

if (nioEnabled) {

mysqlServer = new NMysqlServer(port, scheduler);

} else {

mysqlServer = new MysqlServer(port, scheduler);

}

DDL代码调用过程

org.apache.doris.qe.ConnectProcessor#dispatch 命令识别

switch (command) {

case COM_INIT_DB:

handleInitDb();

break;

case COM_QUIT:

handleQuit();

break;

case COM_QUERY:

handleQuery();

break;

case COM_FIELD_LIST:

handleFieldList();

break;

case COM_PING:

handlePing();

break;

default:

ctx.getState().setError("Unsupported command(" + command + ")");

LOG.warn("Unsupported command(" + command + ")");

break;

org.apache.doris.qe.ConnectProcessor#analyze 词法语法解析

// Parse statement with parser generated by CUP&FLEX

SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());

SqlParser parser = new SqlParser(input);

从连接中读取原始语句字符串

词法解析文件

• fe/fe-core/src/main/jflex/sql_scanner.flex

• 语法解析文件

• fe/fe-core/src/main/cup/sql_parser.cup

所有语法实现类:

StatementBase     [vim org/apache/doris/analysis/StatementBase.java +33]

├── ExportStmt [vim org/apache/doris/analysis/ExportStmt.java +48]

├── ImportColumnsStmt [vim org/apache/doris/analysis/ImportColumnsStmt.java +21]

├── ImportDeleteOnStmt [vim org/apache/doris/analysis/ImportDeleteOnStmt.java +19]

├── ImportSequenceStmt [vim org/apache/doris/analysis/ImportSequenceStmt.java +19]

├── ImportWhereStmt [vim org/apache/doris/analysis/ImportWhereStmt.java +19]

├── KillStmt [vim org/apache/doris/analysis/KillStmt.java +19]

├── SetStmt [vim org/apache/doris/analysis/SetStmt.java +24]

├── UseStmt [vim org/apache/doris/analysis/UseStmt.java +33]

├── QueryStmt [vim org/apache/doris/analysis/QueryStmt.java +38]

│ ├── SelectStmt [vim org/apache/doris/analysis/SelectStmt.java +65]

│ └── SetOperationStmt [vim org/apache/doris/analysis/SetOperationStmt.java +36]

├── ShowStmt [vim org/apache/doris/analysis/ShowStmt.java +22]

│ ├── AdminShowConfigStmt [vim org/apache/doris/analysis/AdminShowConfigStmt.java +33]

│ ├── AdminShowDataSkewStmt [vim org/apache/doris/analysis/AdminShowDataSkewStmt.java +32]

│ ├── AdminShowReplicaDistributionStmt [vim org/apache/doris/analysis/AdminShowReplicaDistributionStmt.java +34]

│ ├── AdminShowReplicaStatusStmt [vim org/apache/doris/analysis/AdminShowReplicaStatusStmt.java +39]

│ ├── DescribeStmt [vim org/apache/doris/analysis/DescribeStmt.java +54]

│ ├── HelpStmt [vim org/apache/doris/analysis/HelpStmt.java +26]

│ ├── ShowAlterStmt [vim org/apache/doris/analysis/ShowAlterStmt.java +46]

│ ├── ShowAuthorStmt [vim org/apache/doris/analysis/ShowAuthorStmt.java +23]

│ ├── ShowBackendsStmt [vim org/apache/doris/analysis/ShowBackendsStmt.java +30]

│ ├── ShowBackupStmt [vim org/apache/doris/analysis/ShowBackupStmt.java +38]

│ ├── ShowBrokerStmt [vim org/apache/doris/analysis/ShowBrokerStmt.java +30]

│ ├── ShowCharsetStmt [vim org/apache/doris/analysis/ShowCharsetStmt.java +23]

│ ├── ShowClusterStmt [vim org/apache/doris/analysis/ShowClusterStmt.java +34]

│ ├── ShowCollationStmt [vim org/apache/doris/analysis/ShowCollationStmt.java +24]

│ ├── ShowColumnStatsStmt [vim org/apache/doris/analysis/ShowColumnStatsStmt.java +28]

│ ├── ShowColumnStmt [vim org/apache/doris/analysis/ShowColumnStmt.java +28]

│ ├── ShowCreateDbStmt [vim org/apache/doris/analysis/ShowCreateDbStmt.java +36]

│ ├── ShowCreateFunctionStmt [vim org/apache/doris/analysis/ShowCreateFunctionStmt.java +32]

│ ├── ShowCreateRoutineLoadStmt [vim org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java +24]

│ ├── ShowCreateTableStmt [vim org/apache/doris/analysis/ShowCreateTableStmt.java +29]

│ ├── ShowDataStmt [vim org/apache/doris/analysis/ShowDataStmt.java +56]

│ ├── ShowDbIdStmt [vim org/apache/doris/analysis/ShowDbIdStmt.java +29]

│ ├── ShowDbStmt [vim org/apache/doris/analysis/ShowDbStmt.java +27]

│ ├── ShowDeleteStmt [vim org/apache/doris/analysis/ShowDeleteStmt.java +31]

│ ├── ShowDynamicPartitionStmt [vim org/apache/doris/analysis/ShowDynamicPartitionStmt.java +29]

│ ├── ShowEncryptKeysStmt [vim org/apache/doris/analysis/ShowEncryptKeysStmt.java +32]

│ ├── ShowEnginesStmt [vim org/apache/doris/analysis/ShowEnginesStmt.java +23]

│ ├── ShowEventsStmt [vim org/apache/doris/analysis/ShowEventsStmt.java +23]

│ ├── ShowExportStmt [vim org/apache/doris/analysis/ShowExportStmt.java +40]

│ ├── ShowFrontendsStmt [vim org/apache/doris/analysis/ShowFrontendsStmt.java +30]

│ ├── ShowFunctionsStmt [vim org/apache/doris/analysis/ShowFunctionsStmt.java +32]

│ ├── ShowGrantsStmt [vim org/apache/doris/analysis/ShowGrantsStmt.java +32]

│ ├── ShowIndexStmt [vim org/apache/doris/analysis/ShowIndexStmt.java +32]

│ ├── ShowLoadProfileStmt [vim org/apache/doris/analysis/ShowLoadProfileStmt.java +27]

│ ├── ShowLoadStmt [vim org/apache/doris/analysis/ShowLoadStmt.java +42]

│ ├── ShowLoadWarningsStmt [vim org/apache/doris/analysis/ShowLoadWarningsStmt.java +36]

│ ├── ShowMigrationsStmt [vim org/apache/doris/analysis/ShowMigrationsStmt.java +31]

│ ├── ShowOpenTableStmt [vim org/apache/doris/analysis/ShowOpenTableStmt.java +23]

│ ├── ShowPartitionIdStmt [vim org/apache/doris/analysis/ShowPartitionIdStmt.java +29]

│ ├── ShowPartitionsStmt [vim org/apache/doris/analysis/ShowPartitionsStmt.java +49]

│ ├── ShowPluginsStmt [vim org/apache/doris/analysis/ShowPluginsStmt.java +23]

│ ├── ShowProcStmt [vim org/apache/doris/analysis/ShowProcStmt.java +32]

│ ├── ShowProcedureStmt [vim org/apache/doris/analysis/ShowProcedureStmt.java +23]

│ ├── ShowProcesslistStmt [vim org/apache/doris/analysis/ShowProcesslistStmt.java +24]

│ ├── ShowQueryProfileStmt [vim org/apache/doris/analysis/ShowQueryProfileStmt.java +27]

│ ├── ShowRepositoriesStmt [vim org/apache/doris/analysis/ShowRepositoriesStmt.java +25]

│ ├── ShowResourcesStmt [vim org/apache/doris/analysis/ShowResourcesStmt.java +37]

│ ├── ShowRestoreStmt [vim org/apache/doris/analysis/ShowRestoreStmt.java +38]

│ ├── ShowRolesStmt [vim org/apache/doris/analysis/ShowRolesStmt.java +29]

│ ├── ShowRollupStmt [vim org/apache/doris/analysis/ShowRollupStmt.java +28]

│ ├── ShowRoutineLoadStmt [vim org/apache/doris/analysis/ShowRoutineLoadStmt.java +34]

│ ├── ShowRoutineLoadTaskStmt [vim org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java +32]

│ ├── ShowSmallFilesStmt [vim org/apache/doris/analysis/ShowSmallFilesStmt.java +32]

│ ├── ShowSnapshotStmt [vim org/apache/doris/analysis/ShowSnapshotStmt.java +29]

│ ├── ShowSqlBlockRuleStmt [vim org/apache/doris/analysis/ShowSqlBlockRuleStmt.java +31]

│ ├── ShowStatusStmt [vim org/apache/doris/analysis/ShowStatusStmt.java +23]

│ ├── ShowStreamLoadStmt [vim org/apache/doris/analysis/ShowStreamLoadStmt.java +39]

│ ├── ShowSyncJobStmt [vim org/apache/doris/analysis/ShowSyncJobStmt.java +33]

│ ├── ShowTableIdStmt [vim org/apache/doris/analysis/ShowTableIdStmt.java +30]

│ ├── ShowTableStatsStmt [vim org/apache/doris/analysis/ShowTableStatsStmt.java +32]

│ ├── ShowTableStatusStmt [vim org/apache/doris/analysis/ShowTableStatusStmt.java +35]

│ ├── ShowTableStmt [vim org/apache/doris/analysis/ShowTableStmt.java +34]

│ ├── ShowTabletStmt [vim org/apache/doris/analysis/ShowTabletStmt.java +39]

│ ├── ShowTransactionStmt [vim org/apache/doris/analysis/ShowTransactionStmt.java +35]

│ ├── ShowTrashDiskStmt [vim org/apache/doris/analysis/ShowTrashDiskStmt.java +33]

│ ├── ShowTrashStmt [vim org/apache/doris/analysis/ShowTrashStmt.java +36]

│ ├── ShowTriggersStmt [vim org/apache/doris/analysis/ShowTriggersStmt.java +23]

│ ├── ShowUserPropertyStmt [vim org/apache/doris/analysis/ShowUserPropertyStmt.java +42]

│ ├── ShowUserStmt [vim org/apache/doris/analysis/ShowUserStmt.java +25]

│ ├── ShowVariablesStmt [vim org/apache/doris/analysis/ShowVariablesStmt.java +29]

│ ├── ShowViewStmt [vim org/apache/doris/analysis/ShowViewStmt.java +39]

│ ├── ShowWarningStmt [vim org/apache/doris/analysis/ShowWarningStmt.java +23]

│ └── ShowWhiteListStmt [vim org/apache/doris/analysis/ShowWhiteListStmt.java +23]

├── TransactionStmt [vim org/apache/doris/analysis/TransactionStmt.java +22]

│ ├── TransactionBeginStmt [vim org/apache/doris/analysis/TransactionBeginStmt.java +24]

│ ├── TransactionCommitStmt [vim org/apache/doris/analysis/TransactionCommitStmt.java +19]

│ └── TransactionRollbackStmt [vim org/apache/doris/analysis/TransactionRollbackStmt.java +19]

├── UnsupportedStmt [vim org/apache/doris/analysis/UnsupportedStmt.java +22]

│ └── EmptyStmt [vim org/apache/doris/analysis/EmptyStmt.java +19]

└── DdlStmt [vim org/apache/doris/analysis/DdlStmt.java +19]

├── AdminCancelRepairTableStmt [vim org/apache/doris/analysis/AdminCancelRepairTableStmt.java +33]

├── AdminCheckTabletsStmt [vim org/apache/doris/analysis/AdminCheckTabletsStmt.java +33]

├── AdminCleanTrashStmt [vim org/apache/doris/analysis/AdminCleanTrashStmt.java +34]

├── AdminRepairTableStmt [vim org/apache/doris/analysis/AdminRepairTableStmt.java +33]

├── AdminSetConfigStmt [vim org/apache/doris/analysis/AdminSetConfigStmt.java +32]

├── AdminSetReplicaStatusStmt [vim org/apache/doris/analysis/AdminSetReplicaStatusStmt.java +30]

├── AlterClusterStmt [vim org/apache/doris/analysis/AlterClusterStmt.java +29]

├── AlterColumnStatsStmt [vim org/apache/doris/analysis/AlterColumnStatsStmt.java +33]

├── AlterDatabasePropertyStmt [vim org/apache/doris/analysis/AlterDatabasePropertyStmt.java +24]

├── AlterDatabaseQuotaStmt [vim org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +30]

├── AlterDatabaseRename [vim org/apache/doris/analysis/AlterDatabaseRename.java +34]

├── AlterRoutineLoadStmt [vim org/apache/doris/analysis/AlterRoutineLoadStmt.java +34]

├── AlterSqlBlockRuleStmt [vim org/apache/doris/analysis/AlterSqlBlockRuleStmt.java +31]

├── AlterSystemStmt [vim org/apache/doris/analysis/AlterSystemStmt.java +28]

├── AlterTableStatsStmt [vim org/apache/doris/analysis/AlterTableStatsStmt.java +33]

├── AlterTableStmt [vim org/apache/doris/analysis/AlterTableStmt.java +37]

├── CancelLoadStmt [vim org/apache/doris/analysis/CancelLoadStmt.java +26]

├── CreateClusterStmt [vim org/apache/doris/analysis/CreateClusterStmt.java +33]

├── CreateDataSyncJobStmt [vim org/apache/doris/analysis/CreateDataSyncJobStmt.java +36]

├── CreateDbStmt [vim org/apache/doris/analysis/CreateDbStmt.java +31]

├── CreateEncryptKeyStmt [vim org/apache/doris/analysis/CreateEncryptKeyStmt.java +30]

├── CreateFileStmt [vim org/apache/doris/analysis/CreateFileStmt.java +35]

├── CreateFunctionStmt [vim org/apache/doris/analysis/CreateFunctionStmt.java +47]

├── CreateMaterializedViewStmt [vim org/apache/doris/analysis/CreateMaterializedViewStmt.java +43]

├── CreateRepositoryStmt [vim org/apache/doris/analysis/CreateRepositoryStmt.java +28]

├── CreateResourceStmt [vim org/apache/doris/analysis/CreateResourceStmt.java +32]

├── CreateRoleStmt [vim org/apache/doris/analysis/CreateRoleStmt.java +28]

├── CreateRoutineLoadStmt [vim org/apache/doris/analysis/CreateRoutineLoadStmt.java +48]

├── CreateSqlBlockRuleStmt [vim org/apache/doris/analysis/CreateSqlBlockRuleStmt.java +37]

├── CreateTableAsSelectStmt [vim org/apache/doris/analysis/CreateTableAsSelectStmt.java +26]

├── CreateTableLikeStmt [vim org/apache/doris/analysis/CreateTableLikeStmt.java +31]

├── CreateTableStmt [vim org/apache/doris/analysis/CreateTableStmt.java +56]

├── CreateUserStmt [vim org/apache/doris/analysis/CreateUserStmt.java +36]

├── DeleteStmt [vim org/apache/doris/analysis/DeleteStmt.java +35]

├── DropClusterStmt [vim org/apache/doris/analysis/DropClusterStmt.java +31]

├── DropDbStmt [vim org/apache/doris/analysis/DropDbStmt.java +30]

├── DropEncryptKeyStmt [vim org/apache/doris/analysis/DropEncryptKeyStmt.java +28]

├── DropFileStmt [vim org/apache/doris/analysis/DropFileStmt.java +34]

├── DropFunctionStmt [vim org/apache/doris/analysis/DropFunctionStmt.java +27]

├── DropMaterializedViewStmt [vim org/apache/doris/analysis/DropMaterializedViewStmt.java +29]

├── DropRepositoryStmt [vim org/apache/doris/analysis/DropRepositoryStmt.java +27]

├── DropResourceStmt [vim org/apache/doris/analysis/DropResourceStmt.java +27]

├── DropRoleStmt [vim org/apache/doris/analysis/DropRoleStmt.java +28]

├── DropSqlBlockRuleStmt [vim org/apache/doris/analysis/DropSqlBlockRuleStmt.java +30]

├── DropTableStmt [vim org/apache/doris/analysis/DropTableStmt.java +28]

├── DropUserStmt [vim org/apache/doris/analysis/DropUserStmt.java +27]

├── EnterStmt [vim org/apache/doris/analysis/EnterStmt.java +25]

├── GrantStmt [vim org/apache/doris/analysis/GrantStmt.java +39]

├── InsertStmt [vim org/apache/doris/analysis/InsertStmt.java +66]

├── InstallPluginStmt [vim org/apache/doris/analysis/InstallPluginStmt.java +31]

├── LinkDbStmt [vim org/apache/doris/analysis/LinkDbStmt.java +31]

├── LoadStmt [vim org/apache/doris/analysis/LoadStmt.java +45]

├── MigrateDbStmt [vim org/apache/doris/analysis/MigrateDbStmt.java +29]

├── PauseRoutineLoadStmt [vim org/apache/doris/analysis/PauseRoutineLoadStmt.java +26]

├── PauseSyncJobStmt [vim org/apache/doris/analysis/PauseSyncJobStmt.java +22]

├── RecoverDbStmt [vim org/apache/doris/analysis/RecoverDbStmt.java +33]

├── RecoverPartitionStmt [vim org/apache/doris/analysis/RecoverPartitionStmt.java +32]

├── RecoverTableStmt [vim org/apache/doris/analysis/RecoverTableStmt.java +32]

├── ResumeRoutineLoadStmt [vim org/apache/doris/analysis/ResumeRoutineLoadStmt.java +26]

├── ResumeSyncJobStmt [vim org/apache/doris/analysis/ResumeSyncJobStmt.java +22]

├── RevokeStmt [vim org/apache/doris/analysis/RevokeStmt.java +32]

├── SetUserPropertyStmt [vim org/apache/doris/analysis/SetUserPropertyStmt.java +31]

├── StopRoutineLoadStmt [vim org/apache/doris/analysis/StopRoutineLoadStmt.java +23]

├── StopSyncJobStmt [vim org/apache/doris/analysis/StopSyncJobStmt.java +22]

├── SyncStmt [vim org/apache/doris/analysis/SyncStmt.java +22]

├── TruncateTableStmt [vim org/apache/doris/analysis/TruncateTableStmt.java +27]

├── UninstallPluginStmt [vim org/apache/doris/analysis/UninstallPluginStmt.java +28]

├── UpdateStmt [vim org/apache/doris/analysis/UpdateStmt.java +35]

├── AbstractBackupStmt [vim org/apache/doris/analysis/AbstractBackupStmt.java +36]

│ ├── BackupStmt [vim org/apache/doris/analysis/BackupStmt.java +29]

│ └── RestoreStmt [vim org/apache/doris/analysis/RestoreStmt.java +33]

├── BaseViewStmt [vim org/apache/doris/analysis/BaseViewStmt.java +39]

│ ├── AlterViewStmt [vim org/apache/doris/analysis/AlterViewStmt.java +31]

│ └── CreateViewStmt [vim org/apache/doris/analysis/CreateViewStmt.java +33]

└── CancelStmt [vim org/apache/doris/analysis/CancelStmt.java +19]

├── CancelAlterSystemStmt [vim org/apache/doris/analysis/CancelAlterSystemStmt.java +28]

├── CancelAlterTableStmt [vim org/apache/doris/analysis/CancelAlterTableStmt.java +31]

└── CancelBackupStmt [vim org/apache/doris/analysis/CancelBackupStmt.java +30]

org.apache.doris.qe.StmtExecutor#execute(TUniqueId)

analyze(context.getSessionVariable().toThrift()); //语义解析

if (isForwardToMaster()) {

forwardToMaster(); //转发处理

if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {

context.setQueryId(masterOpExecutor.getQueryId());

}

return;

} else {

LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());

}

//命令执行

else if (parsedStmt instanceof DdlStmt) {

handleDdlStmt();

} else if (parsedStmt instanceof ShowStmt) {

handleShow();

} else if (parsedStmt instanceof KillStmt) {

handleKill();

}

语义解析

判断含义的正确性

@Override

public void analyze(Analyzer analyzer) throws UserException {

super.analyze(analyzer);

tableName.analyze(analyzer);

checkTblPriv(ConnectContext.get(), tableName.getDb(),

tableName.getTbl(), PrivPredicate.CREATE)

analyzeEngineName();

keysDesc.analyze(columnDefs);

for (ColumnDef columnDef : columnDefs) {

columnDef.analyze(engineName.equals("olap"));

}

partitionDesc.analyze(columnDefs, properties);

distributionDesc.analyze(columnSet);

}

验证名称是否合法

权限是否正确

分区是否合法

列类型是否合法

转发处理

Master、Follower、Observer

只有Master有元数据的修改能力

所有需要修改元数据的操作,需要转发到Master去执行

转发类型:

FORWARD_NO_SYNC

FORWARD_WITH_SYNC

NO_FORWARD

DDL 采用 FORWARD_WITH_SYNC

命令执行

org.apache.doris.qe.DdlExecutor#execute()

//根据语句类型执行相应的函数

if (ddlStmt instanceof CreateTableStmt) {

catalog.createTable((CreateTableStmt) ddlStmt);

}

支持多种表类型, 除了olap 表, 其余都为映射表

if (engineName.equals("olap")) {

createOlapTable(db, stmt);

return;

} else if (engineName.equals("odbc")) {

createOdbcTable(db, stmt);

return;

} else if (engineName.equals("mysql")) {

createMysqlTable(db, stmt);

return;

} else if (engineName.equals("broker")) {

createBrokerTable(db, stmt);

return;

} else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {

createEsTable(db, stmt);

return;

} else if (engineName.equalsIgnoreCase("hive")) {

createHiveTable(db, stmt);

return;

}

org.apache.doris.catalog.Catalog#createOlapTable

//将语法对象转为元数据对象

String tableName = stmt.getTableName();

LOG.debug("begin create olap table: {}", tableName);

// create columns

List<Column> baseSchema = stmt.getColumns();

validateColumns(baseSchema);

// create partition info

PartitionDesc partitionDesc = stmt.getPartitionDesc();

PartitionInfo partitionInfo = null;

//创建table对象

long tableId = Catalog.getCurrentCatalog().getNextId();

OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo,

distributionInfo, indexes);

// 创建Partition 对象

if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {

// this is a 1-level partitioned table

// use table name as partition name

String partitionName = tableName;

long partitionId = partitionNameToId.get(partitionName);

// create partition

Partition partition = createPartitionWithIndices()

olapTable.addPartition(partition);

}

//添加元数据并进行持久化

Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());

org.apache.doris.catalog.Catalog#createPartitionWithIndices

分区是表的实体

// create base index first.

Preconditions.checkArgument(baseIndexId != -1);

MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);

// create partition with base index

Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);

// add to index map

Map<Long, MaterializedIndex> indexMap = new HashMap<>();

indexMap.put(baseIndexId, baseIndex);

// create rollup index if has

for (long indexId : indexIdToMeta.keySet()) {

if (indexId == baseIndexId) {

continue;

}

MaterializedIndex rollup = new MaterializedIndex(indexId, IndexState.NORMAL);

indexMap.put(indexId, rollup);

}

for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {

// create tablets

int schemaHash = indexMeta.getSchemaHash();

TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);

createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, versionHash,

replicaAlloc, tabletMeta, tabletIdSet);

// add create replica task for olap

short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();

TStorageType storageType = indexMeta.getStorageType();

List<Column> schema = indexMeta.getSchema();

KeysType keysType = indexMeta.getKeysType();

int totalTaskNum = index.getTablets().size() * totalReplicaNum;

MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);

AgentBatchTask batchTask = new AgentBatchTask();

for (Tablet tablet : index.getTablets()) {

long tabletId = tablet.getId();

for (Replica replica : tablet.getReplicas()) {

long backendId = replica.getBackendId();

countDownLatch.addMark(backendId, tabletId);

CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,

partitionId, indexId, tabletId,

shortKeyColumnCount, schemaHash,

version, versionHash,

keysType,

storageType, storageMedium,

schema, bfColumns, bfFpp,

countDownLatch,

indexes,

isInMemory,

tabletType);

task.setStorageFormat(storageFormat);

batchTask.addTask(task);

// add to AgentTaskQueue for handling finish report.

// not for resending task

AgentTaskQueue.addTask(task);

}

}

AgentTaskExecutor.submit(batchTask);

}

整体流程:

  • 创建Partition 对象
  • 创建MaterializedIndex对象
  • 对于每个MaterializedIndex对象 创建创建Tablet
  • 创建replica并下发任务到BE

// estimate timeout

long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum;

timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000);

try {

ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

LOG.warn("InterruptedException: ", e);

ok = false;

}

等待BE执行任务完成

org.apache.doris.catalog.Database#createTableWithLock

idToTable.put(table.getId(), table);

nameToTable.put(table.getName(), table);

lowerCaseToTableName.put(tableName.toLowerCase(), tableName);

if (!isReplay) {

// Write edit log

CreateTableInfo info = new CreateTableInfo(fullQualifiedName, table);

Catalog.getCurrentCatalog().getEditLog().logCreateTable(info);

}

if (table.getType() == TableType.ELASTICSEARCH) {

Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable) table);

}

将table添加到DataBase对象里

判断是否replay

写入元数据日志

流程总结:

FE与BE交互

FE与BE交互流程图

FE 发送任务

BE执行

BE汇报执行结果

FE汇总结果

AgentBatchTask batchTask = new AgentBatchTask();

for (Tablet tablet : index.getTablets()) {

long tabletId = tablet.getId();

for (Replica replica : tablet.getReplicas()) {

long backendId = replica.getBackendId();

countDownLatch.addMark(backendId, tabletId);

CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,

partitionId, indexId, tabletId,

shortKeyColumnCount, schemaHash,

version, versionHash,

keysType,

storageType, storageMedium,

schema, bfColumns, bfFpp,

countDownLatch,

indexes,

isInMemory,

tabletType);

task.setStorageFormat(storageFormat);

batchTask.addTask(task);

// add to AgentTaskQueue for handling finish report.

// not for resending task

AgentTaskQueue.addTask(task);

}

}

AgentTaskExecutor.submit(batchTask);

AgentBatchTask:

收集Task并按照Be分组

AgentTaskExecutor:

发送AgentBatchTask

AgentTaskQueue:

处理任务完成的上报

BE任务接收

be/src/agent/agent_server.cpp

接收Task

// resend request when something is wrong(BE may need some logic to guarantee idempotence.

void AgentServer::submit_tasks(TAgentResult& agent_result,

const std::vector<TAgentTaskRequest>& tasks) {

Status ret_st;

// TODO check master_info here if it is the same with that of heartbeat rpc

if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {

Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");

ret_st.to_thrift(&agent_result.status);

return;

}

for (auto task : tasks) {

VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();

TTaskType::type task_type = task.task_type;

int64_t signature = task.signature;

#define HANDLE_TYPE(t_task_type, work_pool, req_member)

case t_task_type:

if (task.__isset.req_member) {

work_pool->submit_task(task);

} else {

ret_st = Status::InvalidArgument(strings::Substitute(

"task(signature=$0) has wrong request member", signature));

}

break;

...

ret_st.to_thrift(&agent_result.status);

}

工作线程

while (_is_work) {

TAgentTaskRequest agent_task_req;

TCreateTabletReq create_tablet_req;

{

lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);

while (_is_work && _tasks.empty()) {

_worker_thread_condition_variable.wait();

}

if (!_is_work) {

return;

}

//从队列中取出任务

agent_task_req = _tasks.front();

create_tablet_req = agent_task_req.create_tablet_req;

_tasks.pop_front();

//执行

OLAPStatus create_status = _env->storage_engine()->create_tablet(create_tablet_req);

TFinishTaskRequest finish_task_request;

finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);

finish_task_request.__set_backend(_backend);

finish_task_request.__set_report_version(_s_report_version);

finish_task_request.__set_task_type(agent_task_req.task_type);

finish_task_request.__set_signature(agent_task_req.signature);

finish_task_request.__set_task_status(task_status);

//汇报结果

_finish_task(finish_task_request);

}

处理任务汇报

org.apache.doris.service.FrontendServiceImpl#finishTask

org.apache.doris.master.MasterImpl#finishTask

FE、BE通过Thrift协议通信

错误处理

org.apache.doris.task.AgentTaskQueue 存储正在执行的Task

org.apache.doris.master.ReportHandler#handleReport

org.apache.doris.master.ReportHandler#taskReport

BE: Report tasks/olap tablet/disk state to the master server

FE master 处理任务,超时会进行重试

private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {

...

// to escape sending duplicate agent task to be

if (task.shouldResend(taskReportTime)) {

batchTask.addTask(task);

}

...

}

元数据持久化

Edit类似WAL

BDBJE 分布式KV存储

元数据持久化:org.apache.doris.catalog.Database#createTableWithLock

public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {

...

//更新内存

nameToTable.put(table.getName(), table);

// Write edit log

//构建元数据日志

CreateTableInfo info = new CreateTableInfo(fullQualifiedName, table);

//写入元数据日志

Catalog.getCurrentCatalog().getEditLog().logCreateTable(info);

...

}

元数据回放

元数据回放发生在FE leader 给 其他FE节点同步的时候

逐一回放元数据

在内存中复原元数据

org.apache.doris.catalog.Catalog#replayCreateTable

public void replayCreateTable(String dbName, Table table) {

Database db = this.fullNameToDb.get(dbName);

db.createTableWithLock(table, true, false);

...

}

如何实现一个新的语句

fe/fe-core/src/main/cup/sql_parser.cup 语法文件

KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name

LPAREN column_definition_list:columns COMMA index_definition_list:indexes RPAREN opt_engine:engineName

opt_keys:keys

opt_comment:tableComment

opt_partition:partition

opt_distribution:distribution

opt_rollup:index

opt_properties:tblProperties

opt_ext_properties:extProperties

{:

RESULT = new CreateTableStmt(ifNotExists, isExternal, name, columns, indexes, engineName, keys, partition,

distribution, tblProperties, extProperties, tableComment, index);

:}

fe/fe-core/src/main/jflex/sql_scanner.flex 词法文件

keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE));

keywordMap.put("cross", new Integer(SqlParserSymbols.KW_CROSS));

keywordMap.put("cube", new Integer(SqlParserSymbols.KW_CUBE));

keywordMap.put("current", new Integer(SqlParserSymbols.KW_CURRENT));

keywordMap.put("current_user", new Integer(SqlParserSymbols.KW_CURRENT_USER));

keywordMap.put("data", new Integer(SqlParserSymbols.KW_DATA));

keywordMap.put("database", new Integer(SqlParserSymbols.KW_DATABASE));

词法语法的代码生成:

cd fe/ && mvn clean install –DskipTests

• SqlScanner.java

• SqlParser.java

• SqlParserSymbols.java

实现新语句步骤总结:

  1. 定义词法语法文件
  2. 实现对应的语句类,比如CreateTableStmt
  3. 实现元数据修改的方法,如Catalog.createTable()
  4. 定义对应操作的元数据日志类,如CreateTableInfo
  5. 实现元数据日志的写入
  6. 实现对应的replay方法,如Catalog.replayCreateTable()

本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师

以上是 [ApacheDoris]ApacheDoris元数据设计及DDL操作源码阅读 的全部内容, 来源链接: utcz.com/z/536091.html

回到顶部