328人参与 • 2024-08-06 • Bootstrap
dfs.namenode.support.allow.format
是否允许格式化,一般生产环境建议配置,防止误操作格式化了已有数据。hdfs namenode [-bootstrapstandby [-force] [-noninteractive] [-skipsharededitscheck] ]
# 常用的命令
hdfs namenode -bootstrapstandby
入口org.apache.hadoop.hdfs.server.namenode.ha.bootstrapstandby.run
方法
此步骤做了如下操作:
public int run(string[] args) throws exception {
// 解析命令行参数
parseargs(args);
// disable using the rpc tailing mechanism for bootstrapping the standby
// since it is less efficient in this case; see hdfs-14806
conf.setboolean(dfsconfigkeys.dfs_ha_tailedits_inprogress_key, false);
// 解析配置,获取集群信息,找到remotenn
parseconfandfindothernn();
namenode.checkallowformat(conf);
inetsocketaddress myaddr = dfsutilclient.getnnaddress(conf);
securityutil.login(conf, dfs_namenode_keytab_file_key,
dfs_namenode_kerberos_principal_key, myaddr.gethostname());
return securityutil.doasloginuserorfatal(new privilegedaction<integer>() {
@override
public integer run() {
try {
// 执行 同步元数据
return dorun();
} catch (ioexception e) {
throw new runtimeexception(e);
}
}
});
}
执行 dorun 的时候,里面集成了整个流程,主要做了如下事项:
private int dorun() throws ioexception {
// find the active nn
namenodeprotocol proxy = null;
namespaceinfo nsinfo = null;
boolean isupgradefinalized = false;
remotenamenodeinfo proxyinfo = null;
// 整个一大段就是在创建nn的代理对象。通过循环,找到第一个符合要求的。
for (int i = 0; i < remotenns.size(); i++) {
proxyinfo = remotenns.get(i);
inetsocketaddress otheripcaddress = proxyinfo.getipcaddress();
proxy = creatennprotocolproxy(otheripcaddress);
try {
// get the namespace from any active nn. if you just formatted the primary nn and are
// bootstrapping the other nns from that layout, it will only contact the single nn.
// however, if there cluster is already running and you are adding a nn later (e.g.
// replacing a failed nn), then this will bootstrap from any node in the cluster.
nsinfo = proxy.versionrequest();
isupgradefinalized = proxy.isupgradefinalized();
break;
} catch (ioexception ioe) {
log.warn("unable to fetch namespace information from remote nn at " + otheripcaddress
+ ": " + ioe.getmessage());
if (log.isdebugenabled()) {
log.debug("full exception trace", ioe);
}
}
}
if (nsinfo == null) {
log.error(
"unable to fetch namespace information from any remote nn. possible namenodes: "
+ remotenns);
return err_code_failed_connect;
}
// 判断layout,目前是-66
if (!checklayoutversion(nsinfo)) {
log.error("layout version on remote node (" + nsinfo.getlayoutversion()
+ ") does not match " + "this node's layout version ("
+ hdfsserverconstants.namenode_layout_version + ")");
return err_code_invalid_version;
}
// 打印集群信息
system.out.println(
"=====================================================\n" +
"about to bootstrap standby id " + nnid + " from:\n" +
" nameservice id: " + nsid + "\n" +
" other namenode id: " + proxyinfo.getnamenodeid() + "\n" +
" other nn's http address: " + proxyinfo.gethttpaddress() + "\n" +
" other nn's ipc address: " + proxyinfo.getipcaddress() + "\n" +
" namespace id: " + nsinfo.getnamespaceid() + "\n" +
" block pool id: " + nsinfo.getblockpoolid() + "\n" +
" cluster id: " + nsinfo.getclusterid() + "\n" +
" layout version: " + nsinfo.getlayoutversion() + "\n" +
" isupgradefinalized: " + isupgradefinalized + "\n" +
"=====================================================");
// 创建待格式化的存储对象
nnstorage storage = new nnstorage(conf, dirstoformat, edituristoformat);
if (!isupgradefinalized) {
//...省略升级相关部分代码
} else if (!format(storage, nsinfo)) { // prompt the user to format storage 此步骤就是创建 version/seen_txid文件
return err_code_already_formatted;
}
// download the fsimage from active namenode
// 从remotenn通过http下载fsimage文件了。
int download = downloadimage(storage, proxy, proxyinfo);
if (download != 0) {
return download;
}
//...省略部分代码
}
private int downloadimage(nnstorage storage, namenodeprotocol proxy, remotenamenodeinfo proxyinfo)
throws ioexception {
// load the newly formatted image, using all of the directories
// (including shared edits)
// 获取最近的checkpointtxid
final long imagetxid = proxy.getmostrecentcheckpointtxid();
// 获取当前事务id
final long curtxid = proxy.gettransactionid();
fsimage image = new fsimage(conf);
try {
// 赋值集群信息给image
image.getstorage().setstorageinfo(storage);
// 创建journalset对象,置状态为open_for_reading
image.initeditlog(startupoption.regular);
assert image.geteditlog().isopenforread() :
"expected edit log to be open for read";
// ensure that we have enough edits already in the shared directory to
// start up from the last checkpoint on the active.
// 从共享的qjm中获取curtxid到imagetxid的editlogs数据
if (!skipsharededitscheck &&
!checklogsavailableforread(image, imagetxid, curtxid)) {
return err_code_logs_unavailable;
}
// 通过http下载fsimage,名称为fsimage.ckpt文件,写到存储目录中。
// download that checkpoint into our storage directories.
md5hash hash = transferfsimage.downloadimagetostorage(
proxyinfo.gethttpaddress(), imagetxid, storage, true, true);
// 保存fsimage的md5值,并且重命名fsimage为正式的无ckpt的。
image.savedigestandrenamecheckpointimage(namenodefile.image, imagetxid,
hash);
// 写seen_txid到目录中
// write seen_txid to the formatted image directories.
storage.writetransactionidfiletostorage(imagetxid, namenodedirtype.image);
} catch (ioexception ioe) {
throw ioe;
} finally {
image.close();
}
return 0;
}
先看 checklogsavailableforread
此步骤主要是从 qjm中获取imagetxid到curtxid之间的editlogs的日志流
直接看重点
org.apache.hadoop.hdfs.server.namenode.fseditlog.selectinputstreams
方法
public collection<editloginputstream> selectinputstreams(long fromtxid,
long toatleasttxid, metarecoverycontext recovery, boolean inprogressok,
boolean onlydurabletxns) throws ioexception {
list<editloginputstream> streams = new arraylist<editloginputstream>();
synchronized(journalsetlock) {
preconditions.checkstate(journalset.isopen(), "cannot call " +
"selectinputstreams() on closed fseditlog");
// 从共享qjm中获取editlogs,并保存
selectinputstreams(streams, fromtxid, inprogressok, onlydurabletxns);
}
try {
// 校验是否有间隔
checkforgaps(streams, fromtxid, toatleasttxid, inprogressok);
} catch (ioexception e) {
if (recovery != null) {
// if recovery mode is enabled, continue loading even if we know we
// can't load up to toatleasttxid.
log.error("exception while selecting input streams", e);
} else {
closeallstreams(streams);
throw e;
}
}
return streams;
}
public static md5hash downloadimagetostorage(url fsname, long imagetxid,
storage dststorage, boolean needdigest, boolean isbootstrapstandby)
throws ioexception {
string fileid = imageservlet.getparamstringforimage(null,
imagetxid, dststorage, isbootstrapstandby);
string filename = nnstorage.getcheckpointimagefilename(imagetxid);
list<file> dstfiles = dststorage.getfiles(
namenodedirtype.image, filename);
if (dstfiles.isempty()) {
throw new ioexception("no targets in destination storage!");
}
// 下载并返回 md5值
md5hash hash = getfileclient(fsname, fileid, dstfiles, dststorage, needdigest);
log.info("downloaded file " + dstfiles.get(0).getname() + " size " +
dstfiles.get(0).length() + " bytes.");
return hash;
}
另外一个节点的数据目录下存放如下数据:
── current
├── fsimage_0000000000000000000
├── fsimage_0000000000000000000.md5
├── seen_txid
└── version
1 directory, 4 files
希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论