hadoop源码_hdfs启动流程_1_NameNode

database

执行start-dfs.sh脚本后,集群是如何启动的?

本文阅读并注释了start-dfs脚本,以及namenode和datanode的启动主要流程流程源码。

阅读源码前准备

源码获取

  1. 拉取Apache Hadoop官方源码

    https://github.com/apache/hadoop

  2. 用idea打开...

  3. 切换到想看的版本...

    这里用的最新版本3.3.1

阅读目标

​ 本篇的阅读目标是搞明白hadoop中的start-dfs.sh启动脚本执行后都做了什么,hadoop中的NameNode,DataNode启动过程等大致流程,不会细追细节。

start-dfs.sh 干了什么

hdfs集群的启动命令为:start-dfs.sh, 脚本的位置在下图中:

![image-

​ 脚本中大致分位两块内容,第一部分是调用hdfs-config.sh脚本配置hdfs以及hadoop的参数以及环境等,第二部分是启动datanode、namenode以及secondary namenode等等。我们的重点是看第二部分的启动流程。

hdfs-config 简述

​ start-dfs.sh中启动hdfs-config.sh的代码如下:

# let"s locate libexec...

if [[ -n "${HADOOP_HOME}" ]]; then

HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"

else

HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"

fi

HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"

# shellcheck disable=SC2034

HADOOP_NEW_CONFIG=true

if [[ -f "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh" ]]; then

. "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh"

else

echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/hdfs-config.sh." 2>&1

exit 1

fi

​ 在hdfs-config.sh脚本中会尝试启动hdfs-evn.sh脚本(如果存在)

​ 之后会检查以及设置HDFS的各种参数,例如:

  # turn on the defaults

export HDFS_AUDIT_LOGGER=${HDFS_AUDIT_LOGGER:-INFO,NullAppender}

export HDFS_NAMENODE_OPTS=${HDFS_NAMENODE_OPTS:-"-Dhadoop.security.logger=INFO,RFAS"}

export HDFS_SECONDARYNAMENODE_OPTS=${HDFS_SECONDARYNAMENODE_OPTS:-"-Dhadoop.security.logger=INFO,RFAS"}

export HDFS_DATANODE_OPTS=${HDFS_DATANODE_OPTS:-"-Dhadoop.security.logger=ERROR,RFAS"}

export HDFS_PORTMAP_OPTS=${HDFS_PORTMAP_OPTS:-"-Xmx512m"}

# depending upon what is being used to start Java, these may need to be

# set empty. (thus no colon)

export HDFS_DATANODE_SECURE_EXTRA_OPTS=${HDFS_DATANODE_SECURE_EXTRA_OPTS-"-jvm server"}

export HDFS_NFS3_SECURE_EXTRA_OPTS=${HDFS_NFS3_SECURE_EXTRA_OPTS-"-jvm server"}

​ 再之后会启动hadoop-config.sh脚本:

# shellcheck source=./hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh

if [[ -n "${HADOOP_COMMON_HOME}" ]] &&

[[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]]; then

. "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh"

elif [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then

. "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"

elif [ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]; then

. "${HADOOP_HOME}/libexec/hadoop-config.sh"

else

echo "ERROR: Hadoop common not found." 2>&1

exit 1

fi

hadoop-config.sh是最基本的、公用的环境变量配置脚本,会再调用etc/hadoop/hadoop-env.sh脚本。主要是配置java启动项相关参数,比如java执行环境的classpath等。

hdfs-config.sh一系列脚本的整体功能就是保证启动hdfs集群前对hdfs和hadoop的各种环境变量进行配置。

start-dfs.sh后续就是逐步启动各个节点(namenodes,datanodes,secondary namenodes,quorumjournal nodes,quorumjournal nodes),如果是ha集群还会启动ZK Failover controllers

NameNode 启动流程

脚本代码

start-dfs.sh中启动namenode的代码:

#---------------------------------------------------------

# namenodes

# 找到配置中(如果没有配置取当前节点为)的namenode节点

NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)

if [[ -z "${NAMENODES}" ]]; then

NAMENODES=$(hostname)

fi

# 执行启动命令

echo "Starting namenodes on [${NAMENODES}]"

hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs"

--workers

--config "${HADOOP_CONF_DIR}"

--hostnames "${NAMENODES}"

--daemon start

namenode ${nameStartOpt}

HADOOP_JUMBO_RETCOUNTER=$?

hadoop-hdfs > src > mian > bin > hdfs中查看namenode命令:

# 命令描述:用于启动namenode

hadoop_add_subcommand "namenode" daemon "run the DFS namenode"

# 命令处理程序

namenode)

HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"

HADOOP_CLASSNAME="org.apache.hadoop.hdfs.server.namenode.NameNode"

hadoop_add_param HADOOP_OPTS hdfs.audit.logger "-Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER}"

;;

# 执行函数命令

# everything is in globals at this point, so call the generic handler

hadoop_generic_java_subcmd_handler

这里就定位到了具体的处理类org.apache.hadoop.hdfs.server.namenode.NameNode。继续跟进到hadoop_generic_java_subcmd_handler函数定义的地方-脚本hdfs

## @description Handle subcommands from main program entries

## @audience private

## @stability evolving

## @replaceable yes

function hadoop_generic_java_subcmd_handler

{

# ...... 省略

# do the hard work of launching a daemon or just executing our interactive

# java class

if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = true ]]; then

if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then

hadoop_secure_daemon_handler

"${HADOOP_DAEMON_MODE}"

"${HADOOP_SUBCMD}"

"${HADOOP_SECURE_CLASSNAME}"

"${daemon_pidfile}"

"${daemon_outfile}"

"${priv_pidfile}"

"${priv_outfile}"

"${priv_errfile}"

"${HADOOP_SUBCMD_ARGS[@]}"

else

hadoop_daemon_handler

"${HADOOP_DAEMON_MODE}"

"${HADOOP_SUBCMD}"

"${HADOOP_CLASSNAME}"

"${daemon_pidfile}"

"${daemon_outfile}"

"${HADOOP_SUBCMD_ARGS[@]}"

fi

exit $?

else

hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}"

fi

}

function hadoop_java_exec

{

# run a java command. this is used for

# non-daemons

local command=$1

local class=$2

shift 2

hadoop_debug "Final CLASSPATH: ${CLASSPATH}"

hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"

hadoop_debug "Final JAVA_HOME: ${JAVA_HOME}"

hadoop_debug "java: ${JAVA}"

hadoop_debug "Class name: ${class}"

hadoop_debug "Command line options: $*"

export CLASSPATH

#shellcheck disable=SC2086

exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"

}

可以看到最终还是利用java命令来执行该类。

NameNode 源码

​ 在源码中定位到org.apache.hadoop.hdfs.server.namenode.NameNode类。按照类的加载顺序来看NameNode启动流程:

静态代码块

  static{

HdfsConfiguration.init();

}

// 继续跟进代码,进入HdfsConfiguration类中:

@InterfaceAudience.Private

public class HdfsConfiguration extends Configuration {

static {

addDeprecatedKeys();

// 加载默认的配置文件

Configuration.addDefaultResource("hdfs-default.xml");

Configuration.addDefaultResource("hdfs-rbf-default.xml");

Configuration.addDefaultResource("hdfs-site.xml");

Configuration.addDefaultResource("hdfs-rbf-site.xml");

}

/**

* 这个方法在这里,这样当调用HdfsConfiguration时,如果它之前还没有被加载,它将被类加载。

* 在加载类时,将执行上面的静态初始化块来添加已弃用的键并添加默认资源。

* 这个方法被多次调用是安全的,因为静态初始化器块只会被调用一次。

* 这取代了以前其他类直接调用Configuration.addDefaultResource("hdfs-default.xml")而不首先加载该类的危险做法,从而跳过了弃用键。

*/

public static void init() {

}

mian方法

  public static void main(String argv[]) throws Exception {

//分析传入的参数,是否是帮助参数

if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {

System.exit(0);

}

try {

//打印一些启动日志信息

StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);

//创建namenode

NameNode namenode = createNameNode(argv, null);

if (namenode != null) {

//加入集群,HA,联邦集群都是有多个NameNode

namenode.join();

}

} catch (Throwable e) {

LOG.error("Failed to start namenode.", e);

terminate(1, e);

}

}

需要关注的是NameNode namenode = createNameNode(argv, null);

  public static NameNode createNameNode(String argv[], Configuration conf)

throws IOException {

// 日志信息

LOG.info("createNameNode " + Arrays.asList(argv));

if (conf == null)

// 准备配置文件对象

conf = new HdfsConfiguration();

// 将一些通用参数解析到配置中。

GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);

argv = hParser.getRemainingArgs();

// 解析其余的NameNode特定参数,放到配置中。

StartupOption startOpt = parseArguments(argv);

if (startOpt == null) {

printUsage(System.err);

return null;

}

setStartupOption(conf, startOpt);

boolean aborted = false;

// 针对startup对象进行switch case 选择

switch (startOpt) {

case FORMAT:

// 格式化

// 安装hadoop后第一次启动之前要执行的格式化命令 hadoop namenode -format

aborted = format(conf, startOpt.getForceFormat(),

startOpt.getInteractiveFormat());

terminate(aborted ? 1 : 0);

return null; // avoid javac warning

case GENCLUSTERID:

String clusterID = NNStorage.newClusterID();

LOG.info("Generated new cluster id: {}", clusterID);

terminate(0);

return null;

case ROLLBACK:

aborted = doRollback(conf, true);

terminate(aborted ? 1 : 0);

return null; // avoid warning

case BOOTSTRAPSTANDBY:

String[] toolArgs = Arrays.copyOfRange(argv, 1, argv.length);

int rc = BootstrapStandby.run(toolArgs, conf);

terminate(rc);

return null; // avoid warning

case INITIALIZESHAREDEDITS:

aborted = initializeSharedEdits(conf,

startOpt.getForceFormat(),

startOpt.getInteractiveFormat());

terminate(aborted ? 1 : 0);

return null; // avoid warning

case BACKUP:

case CHECKPOINT:

NamenodeRole role = startOpt.toNodeRole();

DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));

return new BackupNode(conf, role);

case RECOVER:

NameNode.doRecovery(startOpt, conf);

return null;

case METADATAVERSION:

printMetadataVersion(conf);

terminate(0);

return null; // avoid javac warning

case UPGRADEONLY:

DefaultMetricsSystem.initialize("NameNode");

new NameNode(conf);

terminate(0);

return null;

default:

// 正常启动的时候就会走到这里

// metrics:度量系统记录详细运行信息

DefaultMetricsSystem.initialize("NameNode");

// 初始化NameNode

return new NameNode(conf);

}

}

NameNode构造方法

	public NameNode(Configuration conf) throws IOException {

// 默认为正常的NameNode

this(conf, NamenodeRole.NAMENODE);

}

protected NameNode(Configuration conf, NamenodeRole role)

throws IOException {

// 将配置文件赋值到父类的静态变量中

super(conf);

// 初始化Tracer

// 在“进程”中使用一个Tracer实例来收集和分发它的跟踪范围。Tracer实例是所有跟踪的一站式商店。

this.tracer = new Tracer.Builder("NameNode").

conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).

build();

// TracerConfigurationManager类提供了通过RPC协议在运行时管理跟踪器配置的函数。

this.tracerConfigurationManager =

new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);

this.role = role;

// clientNamenodeAddress : 客户端将用来访问这个namenode或名称服务的namenode地址。对于使用逻辑URI的HA配置,它将是逻辑地址。

String nsId = getNameServiceId(conf);

String namenodeId = HAUtil.getNameNodeId(conf, nsId);

clientNamenodeAddress = NameNodeUtils.getClientNamenodeAddress(

conf, nsId);

// 虚拟机中搭建集群启动日志打印为:

// 2021-07-07 17:45:46,560 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: Clients are to use wanglj01:9000 to access this namenode/service.

if (clientNamenodeAddress != null) {

LOG.info("Clients should use {} to access"

+ " this namenode/service.", clientNamenodeAddress);

}

// ha集群相关

this.haEnabled = HAUtil.isHAEnabled(conf, nsId);

state = createHAState(getStartupOption(conf));

this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);

this.haContext = createHAContext();

try {

// 给联邦模式下准备的,主要是设置联邦模式下namenode的地址和RPC地址

initializeGenericKeys(conf, nsId, namenodeId);

// 初始化namenode的核心方法

initialize(getConf());

// HA相关内容

state.prepareToEnterState(haContext);

try {

haContext.writeLock();

state.enterState(haContext);

} finally {

haContext.writeUnlock();

}

} catch (IOException e) {

this.stopAtException(e);

throw e;

} catch (HadoopIllegalArgumentException e) {

this.stopAtException(e);

throw e;

}

// 启动成功

notBecomeActiveInSafemode = conf.getBoolean(

DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE,

DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT);

this.started.set(true);

}

NameNode#initialize

  protected void initialize(Configuration conf) throws IOException {

if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {

String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);

if (intervals != null) {

conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,

intervals);

}

}

// UserGroupInformation类作用:

// Hadoop的用户和组信息。该类封装了一个JAAS Subject,并提供了确定用户用户名和组的方法。它同时支持Windows、Unix和Kerberos登录模块。

// 方法简介:设置UGI的静态配置。特别是设置安全身份验证机制和组查找服务。

UserGroupInformation.setConfiguration(conf);

// 以NameNode配置的用户登录。

loginAsNameNodeUser(conf);

// 初始化namemode的度量系统

NameNode.initMetrics(conf, this.getRole());

StartupProgressMetrics.register(startupProgress);

// 初始化jvm监听的度量系统

// JvmPauseMonitor类的作用:

// 此类建立一个简单的线程。在此线程中,在循环中运行sleep一段时间方法,如果sleep花费的时间比传递给sleep方法的时间长,

// 就意味着JVM或者宿主机已经出现了停顿处理现象,可能会导致其它问题,如果这种停顿被监测出来,线程会打印一个消息。

pauseMonitor = new JvmPauseMonitor();

pauseMonitor.init(conf);

pauseMonitor.start();

metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

if (conf.getBoolean(DFS_NAMENODE_GC_TIME_MONITOR_ENABLE,

DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT)) {

long observationWindow = conf.getTimeDuration(

DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS,

DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT,

TimeUnit.MILLISECONDS);

long sleepInterval = conf.getTimeDuration(

DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS,

DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS_DEFAULT,

TimeUnit.MILLISECONDS);

gcTimeMonitor = new Builder().observationWindowMs(observationWindow)

.sleepIntervalMs(sleepInterval).build();

gcTimeMonitor.start();

metrics.getJvmMetrics().setGcTimeMonitor(gcTimeMonitor);

}

// 启动httpserver

if (NamenodeRole.NAMENODE == role) {

startHttpServer(conf);

}

// 启动nameNode时从磁盘加载fsimage以及edits文件,初始化FsNamesystem、FsDirectory、 LeaseManager等

loadNamesystem(conf);

startAliasMapServerIfNecessary(conf);

// 创建rpcserver,支持namenode与datanode,client进行通信的协议

// 封装了NameNodeRpcServer clientRpcServer,支持 ClientNamenodeProtocol、DatanodeProtocolPB等协议

// 啥是rpc看这里:https://www.zhihu.com/question/25536695

rpcServer = createRpcServer(conf);

initReconfigurableBackoffKey();

if (clientNamenodeAddress == null) {

// This is expected for MiniDFSCluster. Set it now using

// the RPC server"s bind address.

clientNamenodeAddress =

NetUtils.getHostPortString(getNameNodeAddress());

LOG.info("Clients are to use " + clientNamenodeAddress + " to access"

+ " this namenode/service.");

}

if (NamenodeRole.NAMENODE == role) {

httpServer.setNameNodeAddress(getNameNodeAddress());

httpServer.setFSImage(getFSImage());

if (levelDBAliasMapServer != null) {

httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());

}

}

// 启动常用到主备状态的服务

startCommonServices(conf);

// 启动一个计时器,定期将NameNode度量写入日志文件。可以通过配置禁用此行为。

startMetricsLogger(conf);

}

NameNode#startCommonServices

  /** Start the services common to active and standby states */

private void startCommonServices(Configuration conf) throws IOException {

// 创建NameNodeResourceChecker、激活BlockManager等

namesystem.startCommonServices(conf, haContext);

registerNNSMXBean();

// 非NamenodeRole.NAMENODE的角色在此处启动HttpServer

if (NamenodeRole.NAMENODE != role) {

startHttpServer(conf);

httpServer.setNameNodeAddress(getNameNodeAddress());

httpServer.setFSImage(getFSImage());

if (levelDBAliasMapServer != null) {

httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());

}

}

// 启动RPCServer

rpcServer.start();

// 启动各插件

try {

plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,

ServicePlugin.class);

} catch (RuntimeException e) {

String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);

LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +

pluginsValue, e);

throw e;

}

for (ServicePlugin p: plugins) {

try {

p.start(this);

} catch (Throwable t) {

LOG.warn("ServicePlugin " + p + " could not be started", t);

}

}

LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());

if (rpcServer.getServiceRpcAddress() != null) {

LOG.info(getRole() + " service RPC up at: "

+ rpcServer.getServiceRpcAddress());

}

}

FSNamesystem#startCommonServices

方法用于启动对主备状态都通用的服务:

  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {

this.registerMBean(); // register the MBean for the FSNamesystemState

writeLock();

this.haContext = haContext;

try {

// 创建NameNodeResourceChecker(资源检查线程),并立即检查一次

nnResourceChecker = new NameNodeResourceChecker(conf);

checkAvailableResources();

assert !blockManager.isPopulatingReplQueues();

// 设置一些启动过程中的信息

StartupProgress prog = NameNode.getStartupProgress();

prog.beginPhase(Phase.SAFEMODE);

long completeBlocksTotal = getCompleteBlocksTotal();

prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,

completeBlocksTotal);

// 激活blockManager: 保存与存储在Hadoop集群中的块相关的信息。

blockManager.activate(conf, completeBlocksTotal);

} finally {

writeUnlock("startCommonServices");

}

registerMXBean();

DefaultMetricsSystem.instance().register(this);

if (inodeAttributeProvider != null) {

inodeAttributeProvider.start();

dir.setINodeAttributeProvider(inodeAttributeProvider);

}

snapshotManager.registerMXBean();

InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);

this.nameNodeHostName = (serviceAddress != null) ?

serviceAddress.getHostName() : "";

}

BlockManager#activate

  public void activate(Configuration conf, long blockTotal) {

// 启动PendingReplicationBlocks,这个类主要是对数据块进行一些记账工作。类似于Block可能存放在那个Datanode上这种。

pendingReconstruction.start();

// 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager-- Monitor

datanodeManager.activate(conf);

// 启动redundancyThread, 大致作用是:

// 计算可以在数据节点上调度的块复制和块失效工作。datanode将在下一个心跳时被告知这项工作

// 如果有任何重构请求超时,获取它们并将它们放回需要的重构队列中

// 重新扫描之前推迟的块列表

this.redundancyThread.setName("RedundancyMonitor");

this.redundancyThread.start();

// 启动storageInfoDefragmenterThread

// 它监视StorageInfo TreeSet的碎片,并在它低于某个阈值时压缩它。

storageInfoDefragmenterThread.setName("StorageInfoMonitor");

storageInfoDefragmenterThread.start();

//块汇报线程穹顶(心跳检测机制)

this.blockReportThread.start();

mxBeanName = MBeans.register("NameNode", "BlockStats", this);

// 初始化安全模式

bmSafeMode.activate(blockTotal);

}

namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注

与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中,

RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。

posted @

2021-07-09 14:49 

坐井 

阅读(0

评论(0

编辑 

收藏 

举报

刷新评论刷新页面返回顶部

Copyright © 2021 坐井


Powered by .NET 5.0 on Kubernetes

以上是 hadoop源码_hdfs启动流程_1_NameNode 的全部内容, 来源链接: utcz.com/z/535753.html

回到顶部