hadoop 源码分析(二)HDFS nameNode 之 FSNamesystem初始化源码分析之加载fsImage 和 edits log
上一篇 讲解了nameNode启动的时候,NameNodeHttpServer的启动流程,其实简单来说就是基于hadoop自己实现的HttpServer2服务绑定一个InetSokcetAddress地址,也就是端口号,端口号哪来的?默认配置文件获取呗,最后在将HttpServer2中绑定一些servlet来处理url请求就完成了我们50070端口的请求处理。
那么本篇分析nameNode进程第二个比较核心的,应该说是最核心的组件 FSNamesystem,为什么说是最核心组件呢,因为元数据管理和block的管理都在这个里面进行操作。
直接进入正题,先回顾一下fsimage(全量快照) 文件 以及 edits log(增量事务记录) 文件,自己回忆下这两个文件的重要性。
找到FSNamesystem 的入口
找到之前的nameNode初始化的地方,里面的loadNamesystem(conf)方法,这个就是核心组件FSNamesystem 构建的入口
/** * Initialize name-node. * * @param conf the configuration */ protected void initialize(Configuration conf) throws IOException { //其实这些配置 就是对应着我们配置的 hdfs-site.xml 或者 core-default.xml或者其他文件中一些配置信息 //这些一般也不是很重要 if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) { String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY); if (intervals != null) { conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS, intervals); } } UserGroupInformation.setConfiguration(conf); loginAsNameNodeUser(conf); NameNode.initMetrics(conf, this.getRole()); StartupProgressMetrics.register(startupProgress); //如果是nameNode 启动一个httpServer if (NamenodeRole.NAMENODE == role) { startHttpServer(conf); } this.spanReceiverHost = SpanReceiverHost.getInstance(conf); // 初始化FSNameSystem 核心组件 loadNamesystem(conf); //初始化rpc server 组件 rpcServer = createRpcServer(conf); if (clientNamenodeAddress == null) { // This is expected for MiniDFSCluster. Set it now using // the RPC server's bind address. clientNamenodeAddress = NetUtils.getHostPortString(rpcServer.getRpcAddress()); LOG.info("Clients are to use " clientNamenodeAddress " to access" " this namenode/service."); } //如果是NameNode 设置NameNodeAddress 以及 FsImage if (NamenodeRole.NAMENODE == role) { httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); //一些公共服务的初始化 startCommonServices(conf); }
protected void loadNamesystem(Configuration conf) throws IOException { //创建和初始化FSNameSystem //nameNode启动的时候 会讲磁盘上的fsimange 文件 以及 edits log 文件读取到内存中进行合并, // 合并形成最新的元数据 //loadFromDisk 就是讲fsimage 和 edits log 文件读取到内存进行合并 //合并成最新的元数据 就会作为FSNamesystem存放在内存中 this.namesystem = FSNamesystem.loadFromDisk(conf); }
/** * Instantiates an FSNamesystem loaded from the image and edits * directories specified in the passed Configuration. * 实例化一个FSNamesystem,怎么实例化呢?加载配置文件中指定的 image 和 edits 文件目录 * 如果没有指定呢,hadoop肯定自己默认的文件目录 * * @param conf the Configuration which specifies the storage directories * from which to load * 包含了存储目录的Configuration,虽然有默认的,至少也要传递过来把,对吧,肯定是通过可识别方式,难不成用眼神? * * @return an FSNamesystem which contains the loaded namespace * 返回一个包含了加载了 namespace 元数据的 FSNamesystem实例 * * @throws IOException if loading fails */ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); //构建FSImage,怎么构建?肯定是从磁盘加载 //FSImage 就是一个时间点的元数据快照信息,其实也就是元数据信息 //FSNamesystem.getNamespaceDirs(conf) 获取元数据的目录,肯定是找指定的配置信息,如果没有肯定就是默认值啦 // 默认值在hdfs-default.xml 以及 hadoop common 中的core-default.xml 文件中 //file://${hadoop.tmp.dir}/dfs/name ${hadoop.tmp.dir}:/tmp/hadoop-${user.name} //可以自己观察下启动的namenode进程的这个目录是否和这个匹配 FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), //FSNamesystem.getNamespaceEditsDirs(conf)) 获取edits log 的目录,和上面的一样对不对 //默认情况下 edis log 和namespace 是在同一个目录下,可以进去看下配置信息 FSNamesystem.getNamespaceEditsDirs(conf)); //实例化 FSnamesystem 对象,讲fsImage对象放入到了FSNamesystem中 FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = now(); try { //这里就是说通过FSNamesystem 将fsimage 以及 edits log 加载到内存中 //然后再内存中合并两个文件,形成新的fsImange 信息 //(注:默认情况下 每隔一段时间 就会有check point 将旧的fsimage 与 edits log // 就行合并形成新的fsimage文件,启动的时候肯定也需要合并才能形成新的fsimage文件 对吧) //最后再内存中持有一份完整的元数据信息 namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } long timeTakenToLoadFSImage = now() - loadStart; LOG.info("Finished loading FSImage in " timeTakenToLoadFSImage " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage); } return namesystem; }
1.构建了一个fsImage对象,注意传入的参数名称,可以发现传入了fsImage 以及edits log,这里大概就猜测出新的fsImage 对象肯定是通过两个合并成新的,但是这里并没有加载数据
构建FSNamesystem其实里面就是获取一些配置信息以及初始化一些选项,我们简单看一下,里面的内容很多我们主要看下注释 ,看到没,看到没
/** * Create an FSNamesystem associated with the specified image. * 通过指定的 image 元数据快照创建一个 FSNamesystem对象 * Note that this does not load any data off of disk -- if you would * like that behavior, use {@link #loadFromDisk(Configuration)} * 注意 这里不从磁盘加载任何的数据,如果想加载数据,请使用 loadFromDisk(Configuration) * 看到没,这里明确说了不加载数据对吧 * @param conf configuration * @param fsImage The FSImage to associate with * @param ignoreRetryCache Whether or not should ignore the retry cache setup * step. For Secondary NN this should be set to true. * @throws IOException on bad configuration */ FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache) throws IOException {
1.肯定就是加载 fsImage 文件 和 edits log 文件 进行合并为一份完整的内存元数据信息fsImage,
2.将新的fsImage 文件进行覆盖
3.那么edits log 文件呢,肯定也会重新刷新一份。
/*** 肯定就是加载 fsImage 文件 和 edits log 文件 进行合并为一份完整的内存元数据信息,在进行* 磁盘回写,同时注意哦,edits log 文件肯定也要重新来一份新的**/private void loadFSImage(StartupOption startOpt) throws IOException { final FSImage fsImage = getFSImage(); //刚启动的时候 namenode 读取fsimage 以及 edits log 两个文件 //然后再内存中合并形成一个新的fsiamge文件 包含了完整的最新的元数据信息 //然后重新存储在磁盘,替换旧的fsimage 文件,同时会打开一个edits log 文件, // format before starting up if requested //处理下数据, if (startOpt == StartupOption.FORMAT) { fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id startOpt = StartupOption.REGULAR; } boolean success = false; writeLock(); try { // We shouldn't be calling saveNamespace if we've come up in standby state. //不会存储元数据,如果我们启动了一个 standby 服务 MetaRecoveryContext recovery = startOpt.createRecoveryContext(); //进行数据加载和 合并 final boolean staleImage = fsImage.recoverTransitionRead(startOpt, this, recovery); if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) || RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) { rollingUpgradeInfo = null; } final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); LOG.info("Need to save fs image? " needToSave " (staleImage=" staleImage ", haEnabled=" haEnabled ", isRollingUpgrade=" isRollingUpgrade() ")"); //如果需要将fsImage 存储到磁盘 就调用 //其实里面就是将最新的 if (needToSave) { fsImage.saveNamespace(this); } else { updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(), startOpt); // No need to save, so mark the phase done. StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAVING_CHECKPOINT); prog.endPhase(Phase.SAVING_CHECKPOINT); } //打开一个新的edits log 文件 去进行写入 // This will start a new log segment and write to the seen_txid file, so // we shouldn't do it when coming up in standby state if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE) || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) { fsImage.openEditLogForWrite(); } success = true; } finally { if (!success) { fsImage.close(); } writeUnlock(); } imageLoadComplete(); }
我们可以看到除了一些常规的check 和 formate 那么比较重要的就是加载fsImage 以及 edits log 文件的方法入口
fsImage.recoverTransitionRead(startOpt, this, recovery);
大致想想里面会做什么操作?就是将fsImage 文件 和 edits log 文件加载并合并
/** * Analyze storage directories. * Recover from previous transitions if required. * Perform fs state transition if necessary depending on the namespace info. * Read storage info. * 分析存储的目录,什么存储目录,就是存储fsImage 以及edits log 的目录、 * 从以前的状态恢复 * 根据元信息 判断是否执行fs状态的转换 * 读取存储的信息 * 上面意思是什么呢,大概意思就是如果以前有fsImage 和 edits log 就从文件信息中加载出来 并进行恢复 * @throws IOException * @return true if the image needs to be saved or false otherwise */ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target, MetaRecoveryContext recovery) throws IOException { assert startOpt != StartupOption.FORMAT : "NameNode formatting should be performed before reading the image"; //获取fsImage 文件资源地址 其实也就是目录 Collection<URI> imageDirs = storage.getImageDirectories(); //获取edits log 目录 Collection<URI> editsDirs = editLog.getEditURIs(); // none of the data dirs exist if((imageDirs.size() == 0 || editsDirs.size() == 0) && startOpt != StartupOption.IMPORT) throw new IOException( "All specified directories are not accessible or do not exist."); // 1. For each data directory calculate its state and // check whether all is consistent before transitioning. //检查每个数据目录,判断是否状态一致性,什么意思呢, // 进行数据恢复 里面就是对一些之前停机的时候 更新 回滚 新增数据的恢复操作 Map<StorageDirectory, StorageState> dataDirStates = new HashMap<StorageDirectory, StorageState>(); boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates); if (LOG.isTraceEnabled()) { LOG.trace("Data dir states:\n " Joiner.on("\n ").withKeyValueSeparator(": ") .join(dataDirStates)); } if (!isFormatted && startOpt != StartupOption.ROLLBACK && startOpt != StartupOption.IMPORT) { throw new IOException("NameNode is not formatted."); } int layoutVersion = storage.getLayoutVersion(); if (startOpt == StartupOption.METADATAVERSION) { System.out.println("HDFS Image Version: " layoutVersion); System.out.println("Software format version: " HdfsConstants.NAMENODE_LAYOUT_VERSION); return false; } if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) { NNStorage.checkVersionUpgradable(storage.getLayoutVersion()); } if (startOpt != StartupOption.UPGRADE && startOpt != StartupOption.UPGRADEONLY && !RollingUpgradeStartupOption.STARTED.matches(startOpt) && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) { throw new IOException( "\nFile system image contains an old layout version " storage.getLayoutVersion() ".\nAn upgrade to version " HdfsConstants.NAMENODE_LAYOUT_VERSION " is required.\n" "Please restart NameNode with the \"" RollingUpgradeStartupOption.STARTED.getOptionString() "\" option if a rolling upgrade is already started;" " or restart NameNode with the \"" StartupOption.UPGRADE.getName() "\" option to start" " a new upgrade."); } //执行一些启动选项以及一些二更操作 storage.processStartupOptionsForUpgrade(startOpt, layoutVersion); // 2. Format unformatted dirs. for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); StorageState curState = dataDirStates.get(sd); switch(curState) { case NON_EXISTENT: throw new IOException(StorageState.NON_EXISTENT " state cannot be here"); case NOT_FORMATTED: LOG.info("Storage directory " sd.getRoot() " is not formatted."); LOG.info("Formatting ..."); sd.clearDirectory(); // create empty currrent dir break; default: break; } } // 3. Do transitions switch(startOpt) { case UPGRADE: case UPGRADEONLY: doUpgrade(target); return false; // upgrade saved image already case IMPORT: doImportCheckpoint(target); return false; // import checkpoint saved image already case ROLLBACK: throw new AssertionError("Rollback is now a standalone command, " "NameNode should not be starting with this option."); case REGULAR: default: // just load the image } //真正的加载fsImage 和 edits log 文件进行合并 return loadFSImage(target, startOpt, recovery); }
我们其实又看到了一个loadFSImage 操作,这个里面会是真的加载 数据进行合并?
/** * Choose latest image from one of the directories, * load it and merge with the edits. * * 选择最新的 image 全量快照 和 edits log 文件进行合并 * 哇,终于看到了 * * Saving and loading fsimage should never trigger symlink resolution. * The paths that are persisted do not have *intermediate* symlinks * because intermediate symlinks are resolved at the time files, * directories, and symlinks are created. All paths accessed while * loading or saving fsimage should therefore only see symlinks as * the final path component, and the functions called below do not * resolve symlinks that are the final path component. * * @return whether the image should be saved * @throws IOException */ private boolean loadFSImage(FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { final boolean rollingRollback = RollingUpgradeStartupOption.ROLLBACK.matches(startOpt); final EnumSet<NameNodeFile> nnfs; if (rollingRollback) { // if it is rollback of rolling upgrade, only load from the rollback image nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK); } else { // otherwise we can load from both IMAGE and IMAGE_ROLLBACK nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK); } final FSImageStorageInspector inspector = storage .readAndInspectDirs(nnfs, startOpt); isUpgradeFinalized = inspector.isUpgradeFinalized(); List<FSImageFile> imageFiles = inspector.getLatestImages(); StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.LOADING_FSIMAGE); File phaseFile = imageFiles.get(0).getFile(); prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath()); prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length()); boolean needToSave = inspector.needToSave(); Iterable<EditLogInputStream> editStreams = null; initEditLog(startOpt); if (NameNodeLayoutVersion.supports( LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { // If we're open for write, we're either non-HA or we're the active NN, so // we better be able to load all the edits. If we're the standby NN, it's // OK to not be able to read all of edits right now. // In the meanwhile, for HA upgrade, we will still write editlog thus need // this toAtLeastTxId to be set to the max-seen txid // For rollback in rolling upgrade, we need to set the toAtLeastTxId to // the txid right before the upgrade marker. long toAtLeastTxId = editLog.isOpenForWrite() ? inspector .getMaxSeenTxId() : 0; if (rollingRollback) { // note that the first image in imageFiles is the special checkpoint // for the rolling upgrade toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() 2; } ///加载edits log 文件 editStreams = editLog.selectInputStreams( imageFiles.get(0).getCheckpointTxId() 1, toAtLeastTxId, recovery, false); } else { editStreams = FSImagePreTransactionalStorageInspector .getEditLogStreams(storage); } int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT); for (EditLogInputStream elis : editStreams) { elis.setMaxOpSize(maxOpSize); } for (EditLogInputStream l : editStreams) { LOG.debug("Planning to load edit log stream: " l); } if (!editStreams.iterator().hasNext()) { LOG.info("No edit log streams selected."); } FSImageFile imageFile = null; for (int i = 0; i < imageFiles.size(); i ) { try { imageFile = imageFiles.get(i); //记载Image文件 loadFSImageFile(target, recovery, imageFile, startOpt); break; } catch (IOException ioe) { LOG.error("Failed to load image from " imageFile, ioe); target.clear(); imageFile = null; } } // Failed to load any images, error out if (imageFile == null) { FSEditLog.closeAllStreams(editStreams); throw new IOException("Failed to load an FSImage file!"); } prog.endPhase(Phase.LOADING_FSIMAGE); if (!rollingRollback) { long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery); needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), txnsAdvanced); if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) { // rename rollback image if it is downgrade renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE); } } else { // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals // to the last txid in rollback fsimage. rollingRollback(lastAppliedTxId 1, imageFiles.get(0).getCheckpointTxId()); needToSave = false; } editLog.setNextTxId(lastAppliedTxId 1); return needToSave; } /** rollback for rolling upgrade. */ private void rollingRollback(long discardSegmentTxId, long ckptId) throws IOException { // discard discard unnecessary editlog segments starting from the given id this.editLog.discardSegments(discardSegmentTxId); // rename the special checkpoint renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE, true); // purge all the checkpoints after the marker archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId); String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf); if (HAUtil.isHAEnabled(conf, nameserviceId)) { // close the editlog since it is currently open for write this.editLog.close(); // reopen the editlog for read this.editLog.initSharedJournalsForRead(); } }
进行loadFSImanageFile文件的加载,这里就比较枯燥了,为什么?因为我们明显要有大局观,首先我们肯定直到一定是加载文件,对吧?那么文件最后加载到哪里去进行存储?因为我们的FSNamesystem 是核心管理元数据的组件,大家肯定就也想到了,最终数据肯定是加载到其中啦,所以大家注意看这里对啊不,一直有将FSNamesystem 作为参数再进行传递
/** * 这里面就不仔细去详细的跟到文件加载了, * loadFSImage()方法就是最终加载文件的方法 */ void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery, FSImageFile imageFile, StartupOption startupOption) throws IOException { LOG.debug("Planning to load image :\n" imageFile); StorageDirectory sdForProperties = imageFile.sd; storage.readProperties(sdForProperties, startupOption); if (NameNodeLayoutVersion.supports( LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { // For txid-based layout, we should have a .md5 file // next to the image file boolean isRollingRollback = RollingUpgradeStartupOption.ROLLBACK .matches(startupOption); loadFSImage(imageFile.getFile(), target, recovery, isRollingRollback); } else if (NameNodeLayoutVersion.supports( LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) { // In 0.22, we have the checksum stored in the VERSION file. String md5 = storage.getDeprecatedProperty( NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY); if (md5 == null) { throw new InconsistentFSStateException(sdForProperties.getRoot(), "Message digest property " NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY " not set for storage directory " sdForProperties.getRoot()); } loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery, false); } else { // We don't have any record of the md5sum loadFSImage(imageFile.getFile(), null, target, recovery, false); } }
/** * Load in the filesystem image from file. It's a big list of * filenames and blocks. * 加载 fsimage文件, */ private void loadFSImage(File curFile, MD5Hash expectedMd5, FSNamesystem target, MetaRecoveryContext recovery, boolean requireSameLayoutVersion) throws IOException { // BlockPoolId is required when the FsImageLoader loads the rolling upgrade // information. Make sure the ID is properly set. target.setBlockPoolId(this.getBlockPoolID()); //一个持有FSNamesystem 以及 conf 对象的loader,加载器嘛,为什么持有FSNamesystem? //因为FSNamesystem 是管理元数据的核心组件,最终的元数据都是存储在FSNamesystem 中对把 FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target); loader.load(curFile, requireSameLayoutVersion); // Check that the image digest we loaded matches up with what // we expected MD5Hash readImageMd5 = loader.getLoadedImageMd5(); if (expectedMd5 != null && !expectedMd5.equals(readImageMd5)) { throw new IOException("Image file " curFile " is corrupt with MD5 checksum of " readImageMd5 " but expecting " expectedMd5); } long txId = loader.getLoadedImageTxId(); LOG.info("Loaded image for txid " txId " from " curFile); lastAppliedTxId = txId; storage.setMostRecentCheckpointInfo(txId, curFile.lastModified()); }
其实知道这大家也就知道了其实就是实例化了一个loader 组件去加载fsImage文件了,底层就是基于文件流加载咯
接下就是加载edits log 以及进行一个合并了入口就是
//执行加载edits log 其实里面也包含了合并 fsImage 与 edits log 的操作 long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
这里大家也可以发现其实也是将我们的editStream 目录 以及 FSNamesystem 作为了参数传入,底层说不定也是基于loader 加载edits log 然后放入到FSNamesystem中对吧,当然 肯定也会有一些其他的操作,比如检查,筛选对吧
/** * Load the specified list of edit files into the image. * 加载指定的edit files 文件到image中 * 怎么加载 就是再跑一边记录的指令咯 */ public long loadEdits(Iterable<EditLogInputStream> editStreams, FSNamesystem target) throws IOException { return loadEdits(editStreams, target, null, null); } private long loadEdits(Iterable<EditLogInputStream> editStreams, FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { LOG.debug("About to load edits:\n " Joiner.on("\n ").join(editStreams)); StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.LOADING_EDITS); long prevLastAppliedTxId = lastAppliedTxId; try { //实例化一个loader 加载组件,也是和加载FSImage 文件一样,持有了 FSNamesystem FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId); // Load latest edits //加载最新的edits log for (EditLogInputStream editIn : editStreams) { LOG.info("Reading " editIn " expecting start txid #" (lastAppliedTxId 1)); try { //进行文件流的加载对吧,底层不用说 肯定是基于文件流咯 loader.loadFSEdits(editIn, lastAppliedTxId 1, startOpt, recovery); } finally { // Update lastAppliedTxId even in case of error, since some ops may // have been successfully applied before the error. lastAppliedTxId = loader.getLastAppliedTxId(); } // If we are in recovery mode, we may have skipped over some txids. if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) { lastAppliedTxId = editIn.getLastTxId(); } } } finally { FSEditLog.closeAllStreams(editStreams); // update the counts updateCountForQuota(target.dir.rootDir); } prog.endPhase(Phase.LOADING_EDITS); return lastAppliedTxId - prevLastAppliedTxId; }
/** * Load an edit log, and apply the changes to the in-memory structure * This is where we apply edits that we've been writing to disk all * along. * 记载edit log,同时将一些更改变化加入到内存结构中,这里其实就是将我们的记录再 * edits log 中的操作指令再进行一次刷新 */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { long startTime = now(); FSImage.LOG.info("Start loading edits file " edits.getName()); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, startOpt, recovery); FSImage.LOG.info("Edits file " edits.getName() " of size " edits.length() " edits # " numEdits " loaded in " (now()-startTime)/1000 " seconds"); return numEdits; } finally { edits.close(); fsNamesys.writeUnlock(); prog.endStep(Phase.LOADING_EDITS, step); } }
到这其实大家也就知道了逻辑了,最后的时候其实在返回一个needToSave ,做什么的我就不说了
小总结:FSNamesystem 初始化的步骤
1.从配置文件中获取fsimage 以及 edits log 目录
2.构建FSnamesystem,并传入 fsimage 以及 edits log 目录以及其他信息
3.loadfromdisk 加载 fsimage 以及edits log 文件到内存
4.将fsimage 和 edits log 文件进行合并成新的fsimage,也就是完整的元数据信息,在内存中持有一份
5.将新的完整的fsimage 文件替换 旧的fsimage 文件
6.开启一个新的edits log 文件记录增量事务