hadoop 源码分析(二)HDFS nameNode 之 FSNamesystem初始化源码分析之加载fsImage 和 edits log

上一篇 讲解了nameNode启动的时候,NameNodeHttpServer的启动流程,其实简单来说就是基于hadoop自己实现的HttpServer2服务绑定一个InetSokcetAddress地址,也就是端口号,端口号哪来的?默认配置文件获取呗,最后在将HttpServer2中绑定一些servlet来处理url请求就完成了我们50070端口的请求处理。

那么本篇分析nameNode进程第二个比较核心的,应该说是最核心的组件 FSNamesystem,为什么说是最核心组件呢,因为元数据管理和block的管理都在这个里面进行操作。

直接进入正题,先回顾一下fsimage(全量快照) 文件 以及 edits log(增量事务记录) 文件,自己回忆下这两个文件的重要性。

找到FSNamesystem 的入口

找到之前的nameNode初始化的地方,里面的loadNamesystem(conf)方法,这个就是核心组件FSNamesystem 构建的入口

 /**   * Initialize name-node.   *    * @param conf the configuration   */  protected void initialize(Configuration conf) throws IOException {    //其实这些配置 就是对应着我们配置的 hdfs-site.xml 或者 core-default.xml或者其他文件中一些配置信息    //这些一般也不是很重要    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);      if (intervals != null) {        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,          intervals);      }    }    UserGroupInformation.setConfiguration(conf);    loginAsNameNodeUser(conf);    NameNode.initMetrics(conf, this.getRole());    StartupProgressMetrics.register(startupProgress);    //如果是nameNode  启动一个httpServer    if (NamenodeRole.NAMENODE == role) {      startHttpServer(conf);    }    this.spanReceiverHost = SpanReceiverHost.getInstance(conf);    // 初始化FSNameSystem 核心组件    loadNamesystem(conf);    //初始化rpc server 组件    rpcServer = createRpcServer(conf);    if (clientNamenodeAddress == null) {      // This is expected for MiniDFSCluster. Set it now using       // the RPC server's bind address.      clientNamenodeAddress =           NetUtils.getHostPortString(rpcServer.getRpcAddress());      LOG.info("Clients are to use "   clientNamenodeAddress   " to access"            " this namenode/service.");    }    //如果是NameNode 设置NameNodeAddress 以及  FsImage    if (NamenodeRole.NAMENODE == role) {      httpServer.setNameNodeAddress(getNameNodeAddress());      httpServer.setFSImage(getFSImage());    }        pauseMonitor = new JvmPauseMonitor(conf);    pauseMonitor.start();    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);    //一些公共服务的初始化    startCommonServices(conf);  }

这里面会做一些什么操作呢,跟进去看一下

 protected void loadNamesystem(Configuration conf) throws IOException {    //创建和初始化FSNameSystem    //nameNode启动的时候 会讲磁盘上的fsimange 文件 以及 edits log 文件读取到内存中进行合并,    // 合并形成最新的元数据    //loadFromDisk 就是讲fsimage 和 edits log 文件读取到内存进行合并    //合并成最新的元数据 就会作为FSNamesystem存放在内存中    this.namesystem = FSNamesystem.loadFromDisk(conf);  }

还是原先那样,如果有注释,尽可能的把注释看懂,并且结合自己强大的技术底蕴,大胆的猜测里面做了什么操作。。。

 /**   * Instantiates an FSNamesystem loaded from the image and edits   * directories specified in the passed Configuration.   * 实例化一个FSNamesystem,怎么实例化呢?加载配置文件中指定的 image 和 edits 文件目录   * 如果没有指定呢,hadoop肯定自己默认的文件目录   *   * @param conf the Configuration which specifies the storage directories   *             from which to load   *  包含了存储目录的Configuration,虽然有默认的,至少也要传递过来把,对吧,肯定是通过可识别方式,难不成用眼神?   *   * @return an FSNamesystem which contains the loaded namespace   *  返回一个包含了加载了 namespace 元数据的 FSNamesystem实例   *   * @throws IOException if loading fails   */  static FSNamesystem loadFromDisk(Configuration conf) throws IOException {    checkConfiguration(conf);    //构建FSImage,怎么构建?肯定是从磁盘加载    //FSImage 就是一个时间点的元数据快照信息,其实也就是元数据信息    //FSNamesystem.getNamespaceDirs(conf) 获取元数据的目录,肯定是找指定的配置信息,如果没有肯定就是默认值啦    // 默认值在hdfs-default.xml 以及 hadoop common 中的core-default.xml 文件中    //file://${hadoop.tmp.dir}/dfs/name  ${hadoop.tmp.dir}:/tmp/hadoop-${user.name}    //可以自己观察下启动的namenode进程的这个目录是否和这个匹配    FSImage fsImage = new FSImage(conf,        FSNamesystem.getNamespaceDirs(conf),            //FSNamesystem.getNamespaceEditsDirs(conf)) 获取edits log 的目录,和上面的一样对不对            //默认情况下 edis log 和namespace 是在同一个目录下,可以进去看下配置信息        FSNamesystem.getNamespaceEditsDirs(conf));    //实例化 FSnamesystem 对象,讲fsImage对象放入到了FSNamesystem中    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);    StartupOption startOpt = NameNode.getStartupOption(conf);    if (startOpt == StartupOption.RECOVER) {      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);    }    long loadStart = now();    try {      //这里就是说通过FSNamesystem 将fsimage 以及 edits log 加载到内存中      //然后再内存中合并两个文件,形成新的fsImange 信息      //(注:默认情况下 每隔一段时间 就会有check point 将旧的fsimage 与 edits log      // 就行合并形成新的fsimage文件,启动的时候肯定也需要合并才能形成新的fsimage文件 对吧)      //最后再内存中持有一份完整的元数据信息      namesystem.loadFSImage(startOpt);    } catch (IOException ioe) {      LOG.warn("Encountered exception loading fsimage", ioe);      fsImage.close();      throw ioe;    }    long timeTakenToLoadFSImage = now() - loadStart;    LOG.info("Finished loading FSImage in "   timeTakenToLoadFSImage   " msecs");    NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();    if (nnMetrics != null) {      nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);    }    return namesystem;  }

大家可以看到里面做了什么操作呢?

1.构建了一个fsImage对象,注意传入的参数名称,可以发现传入了fsImage 以及edits log,这里大概就猜测出新的fsImage 对象肯定是通过两个合并成新的,但是这里并没有加载数据

2.将新的完整的fsImage元数据信息传递给了FSNamesystem

3.最后加载,意思就是前面只是在初始化一些环境以及配置选项信息以及检查,到这里loadFromDisk才是真正的加载合并

构建FSNamesystem其实里面就是获取一些配置信息以及初始化一些选项,我们简单看一下,里面的内容很多我们主要看下注释 ,看到没,看到没

注释明确说了,不会加载数据,只是构建,如果加载数据使用loadFromDisk,所以注意上面的关联

/**   * Create an FSNamesystem associated with the specified image.   * 通过指定的 image 元数据快照创建一个 FSNamesystem对象   * Note that this does not load any data off of disk -- if you would   * like that behavior, use {@link #loadFromDisk(Configuration)}   * 注意 这里不从磁盘加载任何的数据,如果想加载数据,请使用 loadFromDisk(Configuration)   * 看到没,这里明确说了不加载数据对吧   * @param conf configuration   * @param fsImage The FSImage to associate with   * @param ignoreRetryCache Whether or not should ignore the retry cache setup   *                         step. For Secondary NN this should be set to true.   * @throws IOException on bad configuration   */  FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)      throws IOException {

那么我们就直接看下核心的加载数据的方法

loadFSImage()方法

里面大概会有哪些操作?

1.肯定就是加载 fsImage 文件 和 edits log 文件 进行合并为一份完整的内存元数据信息fsImage,

2.将新的fsImage 文件进行覆盖

3.那么edits log 文件呢,肯定也会重新刷新一份。

/*** 肯定就是加载 fsImage 文件 和 edits log 文件 进行合并为一份完整的内存元数据信息,在进行* 磁盘回写,同时注意哦,edits log 文件肯定也要重新来一份新的**/private void loadFSImage(StartupOption startOpt) throws IOException {    final FSImage fsImage = getFSImage();    //刚启动的时候 namenode 读取fsimage 以及 edits log 两个文件    //然后再内存中合并形成一个新的fsiamge文件 包含了完整的最新的元数据信息    //然后重新存储在磁盘,替换旧的fsimage 文件,同时会打开一个edits log 文件,    // format before starting up if requested      //处理下数据,    if (startOpt == StartupOption.FORMAT) {            fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id      startOpt = StartupOption.REGULAR;    }    boolean success = false;    writeLock();    try {      // We shouldn't be calling saveNamespace if we've come up in standby state.        //不会存储元数据,如果我们启动了一个 standby 服务      MetaRecoveryContext recovery = startOpt.createRecoveryContext();      //进行数据加载和 合并      final boolean staleImage          = fsImage.recoverTransitionRead(startOpt, this, recovery);      if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||          RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {        rollingUpgradeInfo = null;      }      final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();       LOG.info("Need to save fs image? "   needToSave            " (staleImage="   staleImage   ", haEnabled="   haEnabled            ", isRollingUpgrade="   isRollingUpgrade()   ")");      //如果需要将fsImage 存储到磁盘 就调用        //其实里面就是将最新的      if (needToSave) {        fsImage.saveNamespace(this);      } else {        updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),            startOpt);        // No need to save, so mark the phase done.        StartupProgress prog = NameNode.getStartupProgress();        prog.beginPhase(Phase.SAVING_CHECKPOINT);        prog.endPhase(Phase.SAVING_CHECKPOINT);      }      //打开一个新的edits log 文件 去进行写入      // This will start a new log segment and write to the seen_txid file, so      // we shouldn't do it when coming up in standby state      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)          || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {        fsImage.openEditLogForWrite();      }      success = true;    } finally {      if (!success) {        fsImage.close();      }      writeUnlock();    }    imageLoadComplete();  }

我们可以看到除了一些常规的check 和 formate 那么比较重要的就是加载fsImage 以及 edits log 文件的方法入口

fsImage.recoverTransitionRead(startOpt, this, recovery);

大致想想里面会做什么操作?就是将fsImage 文件 和 edits log 文件加载并合并

 /**   * Analyze storage directories.   * Recover from previous transitions if required.    * Perform fs state transition if necessary depending on the namespace info.   * Read storage info.    * 分析存储的目录,什么存储目录,就是存储fsImage 以及edits log 的目录、   * 从以前的状态恢复   *  根据元信息 判断是否执行fs状态的转换   *  读取存储的信息   *  上面意思是什么呢,大概意思就是如果以前有fsImage 和 edits log 就从文件信息中加载出来 并进行恢复   * @throws IOException   * @return true if the image needs to be saved or false otherwise   */  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,      MetaRecoveryContext recovery)      throws IOException {    assert startOpt != StartupOption.FORMAT :       "NameNode formatting should be performed before reading the image";    //获取fsImage 文件资源地址 其实也就是目录    Collection<URI> imageDirs = storage.getImageDirectories();    //获取edits log 目录    Collection<URI> editsDirs = editLog.getEditURIs();    // none of the data dirs exist    if((imageDirs.size() == 0 || editsDirs.size() == 0)                              && startOpt != StartupOption.IMPORT)        throw new IOException(          "All specified directories are not accessible or do not exist.");        // 1. For each data directory calculate its state and     // check whether all is consistent before transitioning.    //检查每个数据目录,判断是否状态一致性,什么意思呢,      // 进行数据恢复 里面就是对一些之前停机的时候 更新 回滚 新增数据的恢复操作    Map<StorageDirectory, StorageState> dataDirStates =              new HashMap<StorageDirectory, StorageState>();      boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);    if (LOG.isTraceEnabled()) {      LOG.trace("Data dir states:\n  "          Joiner.on("\n  ").withKeyValueSeparator(": ")        .join(dataDirStates));    }        if (!isFormatted && startOpt != StartupOption.ROLLBACK                      && startOpt != StartupOption.IMPORT) {      throw new IOException("NameNode is not formatted.");          }    int layoutVersion = storage.getLayoutVersion();    if (startOpt == StartupOption.METADATAVERSION) {      System.out.println("HDFS Image Version: "   layoutVersion);      System.out.println("Software format version: "          HdfsConstants.NAMENODE_LAYOUT_VERSION);      return false;    }    if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {      NNStorage.checkVersionUpgradable(storage.getLayoutVersion());    }    if (startOpt != StartupOption.UPGRADE        && startOpt != StartupOption.UPGRADEONLY        && !RollingUpgradeStartupOption.STARTED.matches(startOpt)        && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION        && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {      throw new IOException(          "\nFile system image contains an old layout version "             storage.getLayoutVersion()   ".\nAn upgrade to version "            HdfsConstants.NAMENODE_LAYOUT_VERSION   " is required.\n"            "Please restart NameNode with the \""            RollingUpgradeStartupOption.STARTED.getOptionString()            "\" option if a rolling upgrade is already started;"            " or restart NameNode with the \""            StartupOption.UPGRADE.getName()   "\" option to start"            " a new upgrade.");    }    //执行一些启动选项以及一些二更操作    storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);    // 2. Format unformatted dirs.    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {      StorageDirectory sd = it.next();      StorageState curState = dataDirStates.get(sd);      switch(curState) {      case NON_EXISTENT:        throw new IOException(StorageState.NON_EXISTENT                                 " state cannot be here");      case NOT_FORMATTED:        LOG.info("Storage directory "   sd.getRoot()   " is not formatted.");        LOG.info("Formatting ...");        sd.clearDirectory(); // create empty currrent dir        break;      default:        break;      }    }    // 3. Do transitions    switch(startOpt) {    case UPGRADE:    case UPGRADEONLY:      doUpgrade(target);      return false; // upgrade saved image already    case IMPORT:      doImportCheckpoint(target);      return false; // import checkpoint saved image already    case ROLLBACK:      throw new AssertionError("Rollback is now a standalone command, "            "NameNode should not be starting with this option.");    case REGULAR:    default:      // just load the image    }    //真正的加载fsImage 和 edits log 文件进行合并    return loadFSImage(target, startOpt, recovery);  }

我们其实又看到了一个loadFSImage 操作,这个里面会是真的加载 数据进行合并?

/**   * Choose latest image from one of the directories,   * load it and merge with the edits.   *   * 选择最新的 image 全量快照 和 edits log 文件进行合并   * 哇,终于看到了   *    * Saving and loading fsimage should never trigger symlink resolution.    * The paths that are persisted do not have *intermediate* symlinks    * because intermediate symlinks are resolved at the time files,    * directories, and symlinks are created. All paths accessed while    * loading or saving fsimage should therefore only see symlinks as    * the final path component, and the functions called below do not   * resolve symlinks that are the final path component.   *   * @return whether the image should be saved   * @throws IOException   */  private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,      MetaRecoveryContext recovery)      throws IOException {    final boolean rollingRollback        = RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);    final EnumSet<NameNodeFile> nnfs;    if (rollingRollback) {      // if it is rollback of rolling upgrade, only load from the rollback image      nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);    } else {      // otherwise we can load from both IMAGE and IMAGE_ROLLBACK      nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);    }    final FSImageStorageInspector inspector = storage        .readAndInspectDirs(nnfs, startOpt);    isUpgradeFinalized = inspector.isUpgradeFinalized();    List<FSImageFile> imageFiles = inspector.getLatestImages();    StartupProgress prog = NameNode.getStartupProgress();    prog.beginPhase(Phase.LOADING_FSIMAGE);    File phaseFile = imageFiles.get(0).getFile();    prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath());    prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length());    boolean needToSave = inspector.needToSave();    Iterable<EditLogInputStream> editStreams = null;    initEditLog(startOpt);    if (NameNodeLayoutVersion.supports(        LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {      // If we're open for write, we're either non-HA or we're the active NN, so      // we better be able to load all the edits. If we're the standby NN, it's      // OK to not be able to read all of edits right now.      // In the meanwhile, for HA upgrade, we will still write editlog thus need      // this toAtLeastTxId to be set to the max-seen txid      // For rollback in rolling upgrade, we need to set the toAtLeastTxId to      // the txid right before the upgrade marker.        long toAtLeastTxId = editLog.isOpenForWrite() ? inspector          .getMaxSeenTxId() : 0;      if (rollingRollback) {        // note that the first image in imageFiles is the special checkpoint        // for the rolling upgrade        toAtLeastTxId = imageFiles.get(0).getCheckpointTxId()   2;      }      ///加载edits log 文件      editStreams = editLog.selectInputStreams(          imageFiles.get(0).getCheckpointTxId()   1,          toAtLeastTxId, recovery, false);    } else {      editStreams = FSImagePreTransactionalStorageInspector        .getEditLogStreams(storage);    }    int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,        DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);    for (EditLogInputStream elis : editStreams) {      elis.setMaxOpSize(maxOpSize);    }     for (EditLogInputStream l : editStreams) {      LOG.debug("Planning to load edit log stream: "   l);    }    if (!editStreams.iterator().hasNext()) {      LOG.info("No edit log streams selected.");    }        FSImageFile imageFile = null;    for (int i = 0; i < imageFiles.size(); i  ) {      try {        imageFile = imageFiles.get(i);        //记载Image文件        loadFSImageFile(target, recovery, imageFile, startOpt);        break;      } catch (IOException ioe) {        LOG.error("Failed to load image from "   imageFile, ioe);        target.clear();        imageFile = null;      }    }    // Failed to load any images, error out    if (imageFile == null) {      FSEditLog.closeAllStreams(editStreams);      throw new IOException("Failed to load an FSImage file!");    }    prog.endPhase(Phase.LOADING_FSIMAGE);        if (!rollingRollback) {      long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);      needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),          txnsAdvanced);      if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {        // rename rollback image if it is downgrade        renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);      }    } else {      // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals      // to the last txid in rollback fsimage.      rollingRollback(lastAppliedTxId   1, imageFiles.get(0).getCheckpointTxId());      needToSave = false;    }    editLog.setNextTxId(lastAppliedTxId   1);    return needToSave;  }  /** rollback for rolling upgrade. */  private void rollingRollback(long discardSegmentTxId, long ckptId)      throws IOException {    // discard discard unnecessary editlog segments starting from the given id    this.editLog.discardSegments(discardSegmentTxId);    // rename the special checkpoint    renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE,        true);    // purge all the checkpoints after the marker    archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId);    String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);    if (HAUtil.isHAEnabled(conf, nameserviceId)) {      // close the editlog since it is currently open for write      this.editLog.close();      // reopen the editlog for read      this.editLog.initSharedJournalsForRead();    }  }

进行loadFSImanageFile文件的加载,这里就比较枯燥了,为什么?因为我们明显要有大局观,首先我们肯定直到一定是加载文件,对吧?那么文件最后加载到哪里去进行存储?因为我们的FSNamesystem 是核心管理元数据的组件,大家肯定就也想到了,最终数据肯定是加载到其中啦,所以大家注意看这里对啊不,一直有将FSNamesystem 作为参数再进行传递

/**   * 这里面就不仔细去详细的跟到文件加载了,   * loadFSImage()方法就是最终加载文件的方法   */  void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,      FSImageFile imageFile, StartupOption startupOption) throws IOException {    LOG.debug("Planning to load image :\n"   imageFile);    StorageDirectory sdForProperties = imageFile.sd;    storage.readProperties(sdForProperties, startupOption);    if (NameNodeLayoutVersion.supports(        LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {      // For txid-based layout, we should have a .md5 file      // next to the image file      boolean isRollingRollback = RollingUpgradeStartupOption.ROLLBACK          .matches(startupOption);      loadFSImage(imageFile.getFile(), target, recovery, isRollingRollback);    } else if (NameNodeLayoutVersion.supports(        LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) {      // In 0.22, we have the checksum stored in the VERSION file.      String md5 = storage.getDeprecatedProperty(          NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);      if (md5 == null) {        throw new InconsistentFSStateException(sdForProperties.getRoot(),            "Message digest property "              NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY              " not set for storage directory "   sdForProperties.getRoot());      }      loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery,          false);    } else {      // We don't have any record of the md5sum      loadFSImage(imageFile.getFile(), null, target, recovery, false);    }  }
/**   * Load in the filesystem image from file. It's a big list of   * filenames and blocks.   * 加载 fsimage文件,   */  private void loadFSImage(File curFile, MD5Hash expectedMd5,      FSNamesystem target, MetaRecoveryContext recovery,      boolean requireSameLayoutVersion) throws IOException {    // BlockPoolId is required when the FsImageLoader loads the rolling upgrade    // information. Make sure the ID is properly set.    target.setBlockPoolId(this.getBlockPoolID());    //一个持有FSNamesystem 以及 conf 对象的loader,加载器嘛,为什么持有FSNamesystem?    //因为FSNamesystem 是管理元数据的核心组件,最终的元数据都是存储在FSNamesystem 中对把    FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);    loader.load(curFile, requireSameLayoutVersion);    // Check that the image digest we loaded matches up with what    // we expected    MD5Hash readImageMd5 = loader.getLoadedImageMd5();    if (expectedMd5 != null &&        !expectedMd5.equals(readImageMd5)) {      throw new IOException("Image file "   curFile            " is corrupt with MD5 checksum of "   readImageMd5            " but expecting "   expectedMd5);    }    long txId = loader.getLoadedImageTxId();    LOG.info("Loaded image for txid "   txId   " from "   curFile);    lastAppliedTxId = txId;    storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());  }

其实知道这大家也就知道了其实就是实例化了一个loader 组件去加载fsImage文件了,底层就是基于文件流加载咯

接下就是加载edits log 以及进行一个合并了入口就是

 //执行加载edits log 其实里面也包含了合并 fsImage 与 edits log 的操作      long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);

这里大家也可以发现其实也是将我们的editStream 目录 以及 FSNamesystem 作为了参数传入,底层说不定也是基于loader 加载edits log 然后放入到FSNamesystem中对吧,当然 肯定也会有一些其他的操作,比如检查,筛选对吧

  /**   * Load the specified list of edit files into the image.   * 加载指定的edit files 文件到image中   * 怎么加载 就是再跑一边记录的指令咯   */  public long loadEdits(Iterable<EditLogInputStream> editStreams,      FSNamesystem target) throws IOException {    return loadEdits(editStreams, target, null, null);  }  private long loadEdits(Iterable<EditLogInputStream> editStreams,      FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)      throws IOException {    LOG.debug("About to load edits:\n  "   Joiner.on("\n  ").join(editStreams));    StartupProgress prog = NameNode.getStartupProgress();    prog.beginPhase(Phase.LOADING_EDITS);    long prevLastAppliedTxId = lastAppliedTxId;    try {      //实例化一个loader 加载组件,也是和加载FSImage 文件一样,持有了 FSNamesystem      FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);      // Load latest edits      //加载最新的edits log      for (EditLogInputStream editIn : editStreams) {        LOG.info("Reading "   editIn   " expecting start txid #"                (lastAppliedTxId   1));        try {          //进行文件流的加载对吧,底层不用说 肯定是基于文件流咯          loader.loadFSEdits(editIn, lastAppliedTxId   1, startOpt, recovery);        } finally {          // Update lastAppliedTxId even in case of error, since some ops may          // have been successfully applied before the error.          lastAppliedTxId = loader.getLastAppliedTxId();        }        // If we are in recovery mode, we may have skipped over some txids.        if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {          lastAppliedTxId = editIn.getLastTxId();        }      }    } finally {      FSEditLog.closeAllStreams(editStreams);      // update the counts      updateCountForQuota(target.dir.rootDir);    }    prog.endPhase(Phase.LOADING_EDITS);    return lastAppliedTxId - prevLastAppliedTxId;  }
/**   * Load an edit log, and apply the changes to the in-memory structure   * This is where we apply edits that we've been writing to disk all   * along.   * 记载edit log,同时将一些更改变化加入到内存结构中,这里其实就是将我们的记录再   * edits log 中的操作指令再进行一次刷新   */  long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,      StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {    StartupProgress prog = NameNode.getStartupProgress();    Step step = createStartupProgressStep(edits);    prog.beginStep(Phase.LOADING_EDITS, step);    fsNamesys.writeLock();    try {      long startTime = now();      FSImage.LOG.info("Start loading edits file "   edits.getName());      long numEdits = loadEditRecords(edits, false, expectedStartingTxId,          startOpt, recovery);      FSImage.LOG.info("Edits file "   edits.getName()             " of size "   edits.length()   " edits # "   numEdits             " loaded in "   (now()-startTime)/1000   " seconds");      return numEdits;    } finally {      edits.close();      fsNamesys.writeUnlock();      prog.endStep(Phase.LOADING_EDITS, step);    }  }

到这其实大家也就知道了逻辑了,最后的时候其实在返回一个needToSave ,做什么的我就不说了

小总结:FSNamesystem 初始化的步骤

1.从配置文件中获取fsimage 以及 edits log 目录

2.构建FSnamesystem,并传入 fsimage 以及 edits log 目录以及其他信息

3.loadfromdisk 加载 fsimage 以及edits log 文件到内存

4.将fsimage 和 edits log 文件进行合并成新的fsimage,也就是完整的元数据信息,在内存中持有一份

5.将新的完整的fsimage 文件替换 旧的fsimage 文件

6.开启一个新的edits log 文件记录增量事务

来源:https://www.icode9.com/content-1-823401.html

(0)

相关推荐