[ApacheAtlas]Atlas架构设计及源代码简单分析

database

Apache Atlas 架构设计及源代码分析, 以Hive建库为例,分析元数据采集的主体流程

Apache Atlas 架构图

Atlas 支持多数据源接入:Hive、HBase、Storm等

Type System

Type

Atlas 中定义了一些元数据类型

── AtlasBaseTypeDef

│ ├── AtlasEnumDef

│ └── AtlasStructDef

│ ├── AtlasBusinessMetadataDef

│ ├── AtlasClassificationDef

│ ├── AtlasEntityDef

│ └── AtlasRelationshipDef

├── AtlasStructType

│ ├── AtlasBusinessMetadataType

│ ├── AtlasClassificationType

│ ├── AtlasRelationshipType

│ └── AtlasEntityType

│ └── AtlasRootEntityType

├── AtlasType

│ ├── AtlasArrayType

│ ├── AtlasBigDecimalType

│ ├── AtlasBigIntegerType

│ ├── AtlasByteType

│ ├── AtlasDateType

│ ├── AtlasDoubleType

│ ├── AtlasEnumType

│ ├── AtlasFloatType

│ ├── AtlasIntType

│ ├── AtlasLongType

│ ├── AtlasMapType

│ ├── AtlasObjectIdType

│ ├── AtlasShortType

│ ├── AtlasStringType

│ └── AtlasStructType

│ ├── AtlasBusinessMetadataType

│ ├── AtlasClassificationType

│ ├── AtlasEntityType

│ └── AtlasRelationshipType

├── AtlasTypeDefStore

│ └── AtlasTypeDefGraphStore

│ └── AtlasTypeDefGraphStoreV2

└── StructTypeDefinition

└── HierarchicalTypeDefinition

├── ClassTypeDefinition

└── TraitTypeDefinition

Entity

Entity 是基于类型的具体实现

AtlasEntity

├── AtlasEntityExtInfo

│ ├── AtlasEntitiesWithExtInfo

│ └── AtlasEntityWithExtInfo

├── AtlasEntityStore

│ └── AtlasEntityStoreV2

├── AtlasEntityStream

│ └── AtlasEntityStreamForImport

├── AtlasEntityType

│ └── AtlasRootEntityType

└── IAtlasEntityChangeNotifier

├── AtlasEntityChangeNotifier

└── EntityChangeNotifierNop

Attributes

针对模型定义属性

AtlasAttributeDef

└── AtlasRelationshipAttributeDef

AtlasAttributeDef 属性字段:

private String                   name;

private String typeName;

private boolean isOptional;

private Cardinality cardinality;

private int valuesMinCount;

private int valuesMaxCount;

private boolean isUnique;

private boolean isIndexable;

private boolean includeInNotification;

private String defaultValue;

private String description;

private int searchWeight = DEFAULT_SEARCHWEIGHT;

private IndexType indexType = null;

private List<AtlasConstraintDef> constraints;

private Map<String, String> options;

private String displayName;

具体实现:

db:

"name": "db",

"typeName": "hive_db",

"isOptional": false,

"isIndexable": true,

"isUnique": false,

"cardinality": "SINGLE"

columns:

"name": "columns",

"typeName": "array<hive_column>",

"isOptional": optional,

"isIndexable": true,

“isUnique": false,

"constraints": [ { "type": "ownedRef" } ]

  • isComposite - 是否复合
  • isIndexable - 是否索引
  • isUnique - 是否唯一
  • multiplicity - 指示此属性是(必需的/可选的/还是可以是多值)的

System specific types and their significance

Referenceable

This type represents all entities that can be searched for using a unique attribute called qualifiedName.

  ├── Referenceable

├── ReferenceableDeserializer

├── ReferenceableSerializer

└── V1SearchReferenceableSerializer

Hooks

以Hive元信息采集为例分析采集过程:

全量导入

import-hive.sh

"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" 

org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS

importTables

└── importDatabases [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +295]

└── importHiveMetadata [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +289]

上面是调用过程:

importTables -> importTable --> registerInstances

AtlasEntitiesWithExtInfo ret = null;

EntityMutationResponse response = atlasClientV2.createEntities(entities);

List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);

if (CollectionUtils.isNotEmpty(createdEntities)) {

ret = new AtlasEntitiesWithExtInfo();

for (AtlasEntityHeader createdEntity : createdEntities) {

AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());

ret.addEntity(entity.getEntity());

if (MapUtils.isNotEmpty(entity.getReferredEntities())) {

for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {

ret.addReferredEntity(entry.getKey(), entry.getValue());

}

}

LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());

}

}

通过Http Post 的请求将库表数据更新至Atlas

atlasClientV2有很多Http接口

Atlas HTTP 客户端API:

实时监听

HiveHook implements ExecuteWithHookContext

ExecuteWithHookContext is a new interface that the Pre/Post Execute Hook can run with the HookContext.

实现run()方法来对Hive 相关事件做处理

Hive相关事件:

BaseHiveEvent

├── AlterTableRename

├── CreateHiveProcess

├── DropDatabase

├── DropTable

├── CreateDatabase

│ └── AlterDatabase

└── CreateTable

└── AlterTable

└── AlterTableRenameCol

以create database 为例分析流程:

//处理Hook 上下文信息

AtlasHiveHookContext context =

new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects(), isSkipTempTables());

//建库事件处理,提取相关库信息

event = new CreateDatabase(context);

if (event != null) {

final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();

super.notifyEntities(ActiveEntityFilter.apply(event.getNotificationMessages()), ugi);

}

public enum HookNotificationType {

TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE,

ENTITY_CREATE_V2, ENTITY_PARTIAL_UPDATE_V2, ENTITY_FULL_UPDATE_V2, ENTITY_DELETE_V2

}

//操作用户获取

if (context.isMetastoreHook()) {

try {

ugi = SecurityUtils.getUGI();

} catch (Exception e) {

//do nothing

}

} else {

ret = getHiveUserName();

if (StringUtils.isEmpty(ret)) {

ugi = getUgi();

}

}

if (ugi != null) {

ret = ugi.getShortUserName();

}

if (StringUtils.isEmpty(ret)) {

try {

ret = UserGroupInformation.getCurrentUser().getShortUserName();

} catch (IOException e) {

LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e);

ret = System.getProperty("user.name");

}

}

主要:

获取实体信息, 传递Hook message的类型、操作用户

notifyEntities 可以看出其他组件HBase、impala也会调用该方法进行消息的发送

public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {

if (executor == null) { // send synchronously

notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);

} else {

executor.submit(new Runnable() {

@Override

public void run() {

notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);

}

});

}

}

消息通知框架:

NotificationInterface

├── AtlasFileSpool

└── AbstractNotification

├── KafkaNotification

└── Spooler

数据写入Kafka中:

@Override

public void sendInternal(NotificationType notificationType, List<String> messages) throws NotificationException {

KafkaProducer producer = getOrCreateProducer(notificationType);

sendInternalToProducer(producer, notificationType, messages);

}

根据NotificationType写入指定topic 中:

private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {

{

put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);

put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);

}

};

NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),

NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),

数据主要写入两个Topic中: ATLAS_ENTITIES、ATLAS_HOOK

ATLAS_HOOK是写入Hook事件消息, 创建库的事件元数据信息会写入该Topic中

如何唯一确定一个库:

public String getQualifiedName(Database db) {

return getDatabaseName(db) + QNAME_SEP_METADATA_NAMESPACE + getMetadataNamespace();

}

dbName@clusterName 确定唯一性

外延应用

一个基于Hive hook 实现Impala 元数据刷新的用例:

AutoRefreshImpala:https://github.com/Observe-secretly/AutoRefreshImpala

参考

[1] Apache Atlas – Data Governance and Metadata framework for Hadoop

[2] Apache Atlas 源码

本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师

以上是 [ApacheAtlas]Atlas架构设计及源代码简单分析 的全部内容, 来源链接: utcz.com/z/535946.html

回到顶部