it编程 > 前端脚本 > Bootstrap

【大数据Hadoop】HDFS-Namenode-bootstrapStandby同步元数据的源码步骤分析

328人参与 2024-08-06 Bootstrap

流程

  1. 根据配置项获取nameserviceid、namenodeid
  2. 获取其他的 namenode 信息,建立rpc通信。
  3. 判断配置项dfs.namenode.support.allow.format是否允许格式化,一般生产环境建议配置,防止误操作格式化了已有数据。
  4. 获取格式化的目录(fsimage和edits的存储目录,还有sharededitsdirs配置)。
  5. format目录,创建current目录,写version文件和seen_txid文件
  6. 从qjm中检验上一次checkpoint到最近的curtxid中间的editlog文件是否存在。
  7. 从远端namenode下载最近一次checkpoint产生的fsimage文件
  8. 整个过程格式化完毕。

同步元数据命令

hdfs namenode [-bootstrapstandby [-force] [-noninteractive] [-skipsharededitscheck] ]


# 常用的命令
hdfs namenode -bootstrapstandby

源码解读

配置解析

入口org.apache.hadoop.hdfs.server.namenode.ha.bootstrapstandby.run方法
此步骤做了如下操作:

  public int run(string[] args) throws exception {
    // 解析命令行参数
    parseargs(args);
    // disable using the rpc tailing mechanism for bootstrapping the standby
    // since it is less efficient in this case; see hdfs-14806
    conf.setboolean(dfsconfigkeys.dfs_ha_tailedits_inprogress_key, false);
    // 解析配置,获取集群信息,找到remotenn
    parseconfandfindothernn();
    namenode.checkallowformat(conf);

    inetsocketaddress myaddr = dfsutilclient.getnnaddress(conf);
    securityutil.login(conf, dfs_namenode_keytab_file_key,
        dfs_namenode_kerberos_principal_key, myaddr.gethostname());

    return securityutil.doasloginuserorfatal(new privilegedaction<integer>() {
      @override
      public integer run() {
        try {
          // 执行 同步元数据
          return dorun();
        } catch (ioexception e) {
          throw new runtimeexception(e);
        }
      }
    });
  }

同步元数据

执行 dorun 的时候,里面集成了整个流程,主要做了如下事项:

private int dorun() throws ioexception {
    // find the active nn
    namenodeprotocol proxy = null;
    namespaceinfo nsinfo = null;
    boolean isupgradefinalized = false;
    remotenamenodeinfo proxyinfo = null;
    // 整个一大段就是在创建nn的代理对象。通过循环,找到第一个符合要求的。
    for (int i = 0; i < remotenns.size(); i++) {
      proxyinfo = remotenns.get(i);
      inetsocketaddress otheripcaddress = proxyinfo.getipcaddress();
      proxy = creatennprotocolproxy(otheripcaddress);
      try {
        // get the namespace from any active nn. if you just formatted the primary nn and are
        // bootstrapping the other nns from that layout, it will only contact the single nn.
        // however, if there cluster is already running and you are adding a nn later (e.g.
        // replacing a failed nn), then this will bootstrap from any node in the cluster.
        nsinfo = proxy.versionrequest();
        isupgradefinalized = proxy.isupgradefinalized();
        break;
      } catch (ioexception ioe) {
        log.warn("unable to fetch namespace information from remote nn at " + otheripcaddress
            + ": " + ioe.getmessage());
        if (log.isdebugenabled()) {
          log.debug("full exception trace", ioe);
        }
      }
    }

    if (nsinfo == null) {
      log.error(
          "unable to fetch namespace information from any remote nn. possible namenodes: "
              + remotenns);
      return err_code_failed_connect;
    }
	// 判断layout,目前是-66
    if (!checklayoutversion(nsinfo)) {
      log.error("layout version on remote node (" + nsinfo.getlayoutversion()
          + ") does not match " + "this node's layout version ("
          + hdfsserverconstants.namenode_layout_version + ")");
      return err_code_invalid_version;
    }
	// 打印集群信息
    system.out.println(
        "=====================================================\n" +
        "about to bootstrap standby id " + nnid + " from:\n" +
        "           nameservice id: " + nsid + "\n" +
        "        other namenode id: " + proxyinfo.getnamenodeid() + "\n" +
        "  other nn's http address: " + proxyinfo.gethttpaddress() + "\n" +
        "  other nn's ipc  address: " + proxyinfo.getipcaddress() + "\n" +
        "             namespace id: " + nsinfo.getnamespaceid() + "\n" +
        "            block pool id: " + nsinfo.getblockpoolid() + "\n" +
        "               cluster id: " + nsinfo.getclusterid() + "\n" +
        "           layout version: " + nsinfo.getlayoutversion() + "\n" +
        "       isupgradefinalized: " + isupgradefinalized + "\n" +
        "=====================================================");
    // 创建待格式化的存储对象
    nnstorage storage = new nnstorage(conf, dirstoformat, edituristoformat);

    if (!isupgradefinalized) {
      //...省略升级相关部分代码
    } else if (!format(storage, nsinfo)) { // prompt the user to format storage 此步骤就是创建 version/seen_txid文件
      return err_code_already_formatted;
    }

    // download the fsimage from active namenode
    // 从remotenn通过http下载fsimage文件了。
    int download = downloadimage(storage, proxy, proxyinfo);
    if (download != 0) {
      return download;
    }

    //...省略部分代码
  }

下载fsimage文件

private int downloadimage(nnstorage storage, namenodeprotocol proxy, remotenamenodeinfo proxyinfo)
      throws ioexception {
    // load the newly formatted image, using all of the directories
    // (including shared edits)
    // 获取最近的checkpointtxid
    final long imagetxid = proxy.getmostrecentcheckpointtxid();
    // 获取当前事务id
    final long curtxid = proxy.gettransactionid();
    fsimage image = new fsimage(conf);
    try {
      // 赋值集群信息给image
      image.getstorage().setstorageinfo(storage);
      // 创建journalset对象,置状态为open_for_reading
      image.initeditlog(startupoption.regular);
      assert image.geteditlog().isopenforread() :
          "expected edit log to be open for read";

      // ensure that we have enough edits already in the shared directory to
      // start up from the last checkpoint on the active.
      // 从共享的qjm中获取curtxid到imagetxid的editlogs数据
      if (!skipsharededitscheck &&
          !checklogsavailableforread(image, imagetxid, curtxid)) {
        return err_code_logs_unavailable;
      }
	  // 通过http下载fsimage,名称为fsimage.ckpt文件,写到存储目录中。
      // download that checkpoint into our storage directories.
      md5hash hash = transferfsimage.downloadimagetostorage(
        proxyinfo.gethttpaddress(), imagetxid, storage, true, true);
        // 保存fsimage的md5值,并且重命名fsimage为正式的无ckpt的。
      image.savedigestandrenamecheckpointimage(namenodefile.image, imagetxid,
          hash);
	  // 写seen_txid到目录中
      // write seen_txid to the formatted image directories.
      storage.writetransactionidfiletostorage(imagetxid, namenodedirtype.image);
    } catch (ioexception ioe) {
      throw ioe;
    } finally {
      image.close();
    }
    return 0;
  }

校验shareeditslog是否存在

先看 checklogsavailableforread
此步骤主要是从 qjm中获取imagetxid到curtxid之间的editlogs的日志流
直接看重点
org.apache.hadoop.hdfs.server.namenode.fseditlog.selectinputstreams方法

public collection<editloginputstream> selectinputstreams(long fromtxid,
      long toatleasttxid, metarecoverycontext recovery, boolean inprogressok,
      boolean onlydurabletxns) throws ioexception {

    list<editloginputstream> streams = new arraylist<editloginputstream>();
    synchronized(journalsetlock) {
      preconditions.checkstate(journalset.isopen(), "cannot call " +
          "selectinputstreams() on closed fseditlog");
      // 从共享qjm中获取editlogs,并保存
      selectinputstreams(streams, fromtxid, inprogressok, onlydurabletxns);
    }

    try {
       // 校验是否有间隔 
      checkforgaps(streams, fromtxid, toatleasttxid, inprogressok);
    } catch (ioexception e) {
      if (recovery != null) {
        // if recovery mode is enabled, continue loading even if we know we
        // can't load up to toatleasttxid.
        log.error("exception while selecting input streams", e);
      } else {
        closeallstreams(streams);
        throw e;
      }
    }
    return streams;
  }

下载fsimage

public static md5hash downloadimagetostorage(url fsname, long imagetxid,
      storage dststorage, boolean needdigest, boolean isbootstrapstandby)
      throws ioexception {
    string fileid = imageservlet.getparamstringforimage(null,
        imagetxid, dststorage, isbootstrapstandby);
    string filename = nnstorage.getcheckpointimagefilename(imagetxid);
    
    list<file> dstfiles = dststorage.getfiles(
        namenodedirtype.image, filename);
    if (dstfiles.isempty()) {
      throw new ioexception("no targets in destination storage!");
    }
    // 下载并返回 md5值
    md5hash hash = getfileclient(fsname, fileid, dstfiles, dststorage, needdigest);
    log.info("downloaded file " + dstfiles.get(0).getname() + " size " +
        dstfiles.get(0).length() + " bytes.");
    return hash;
  }

最后同步元数据完成

另外一个节点的数据目录下存放如下数据:

── current
    ├── fsimage_0000000000000000000
    ├── fsimage_0000000000000000000.md5
    ├── seen_txid
    └── version

1 directory, 4 files

希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

offset Explorer连接kafka 一直报错unable to determine broker endpoints from zookeeper.one or more broker

08-06

【机器学习】集成学习:使用scikitLearn中的BaggingClassifier实现bagging和pasting策略

08-06

application.yaml与bootstrap.yaml的使用

08-23

docker-desktop闪退出现wsl问题解决方法

08-01

k8s部署 多master节点负载均衡以及集群高可用

07-31

EelasticSearch使用!!!

07-31

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论