nutch原始碼crawldb類原始碼分析
package org.apache.nutch.crawl;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
// Commons Logging imports
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.LockUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
//這是個MapReduce程式,crawldb繼承Configured實現Tool的run方法
public class CrawlDb extends Configured implements Tool {
//下面是日誌和一些常量
public static final Logger LOG = LoggerFactory.getLogger(CrawlDb.class);
public static final String CRAWLDB_ADDITIONS_ALLOWED = "db.update.additions.allowed";
public static final String CRAWLDB_PURGE_404 = "db.update.purge.404";
public static final String CURRENT_NAME = "current";
public static final String LOCK_NAME = ".locked";
//空的建構函式
public CrawlDb() {}
//帶引數的建構函式,引數是配置檔案
public CrawlDb(Configuration conf) {
setConf(conf);
}
//這個方法,檢視crawldb和segments的路徑是否存在這兩個檔案
public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter) throws IOException {
boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);
update(crawlDb, segments, normalize, filter, additionsAllowed, false);
}
public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter, boolean additionsAllowed, boolean force) throws IOException {
//根據配置資訊(執行crawldb時後面的引數)建立一個檔案系統
FileSystem fs = FileSystem.get(getConf());
//建立crawldb檔案路徑
Path lock = new Path(crawlDb, LOCK_NAME);
//這個類主要是判斷crawldb檔案能不能建立的,看一下這個類的方法
// public static void createLockFile(FileSystem fs, Path lockFile, boolean accept) throws IOException {
//因為輸入的時候第二個引數是crawldb的目錄位置,還因為ntuch的容錯功能,所以他判斷你這輸入的是不是個檔案,在判斷這目錄是不是有crawldb這個檔案
//不滿足條件直接退出,IO異常
// if (fs.exists(lockFile)) {
// if(!accept)
// throw new IOException("lock file " + lockFile + " already exists.");
// if (fs.getFileStatus(lockFile).isDir())
// throw new IOException("lock file " + lockFile + " already exists and is a directory.");
// // do nothing - the file already exists.
// } else {
//到這,你輸入的crawldb路徑是個目錄,同時目錄下面沒有crawldb這個目錄,那麼你可以建立了
// // make sure parents exist
// fs.mkdirs(lockFile.getParent());
// fs.createNewFile(lockFile);
// }
LockUtil.createLockFile(fs, lock, force);
//這是定了一下時間格式,因為他要輸出資訊,所以需要在輸出資訊的時候對時間有要求一起輸出
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//這就獲取了系統的當前時間了,也就是建立crawldb的時間
long start = System.currentTimeMillis();
//驅動,他先是把建立crawldb這個目錄的作業建立了
JobConf job = CrawlDb.createJob(getConf(), crawlDb);
//或許你不知道這下面幾句是啥東西,那麼看一下你就so easy了
// public class CrawlDbFilter implements Mapper<Text, CrawlDatum, Text, CrawlDatum> {
// public static final String URL_FILTERING = "crawldb.url.filters";
//
// public static final String URL_NORMALIZING = "crawldb.url.normalizers";
//
// public static final String URL_NORMALIZING_SCOPE = "crawldb.url.normalizers.scope";
//這個CrawlDbFilter是個map,總的來說這地方的功能就是咱們執行crawldb * * * *以後在控制檯上看見的那些輸出(當然你要是IOException了那就看不到了)
job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);
job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);
//這主要是檔案更新的作用,因為nutch的核心就是他們把抓取結果存起來(俗稱更新),通過更新他可以知道哪些他爬過了,哪些是下一次需要爬的等,當然這才啟動哪有記錄啊,直接false過去了
boolean url404Purging = job.getBoolean(CRAWLDB_PURGE_404, false);
//標準被控制檯輸出
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb update: starting at " + sdf.format(start));
LOG.info("CrawlDb update: db: " + crawlDb);
LOG.info("CrawlDb update: segments: " + Arrays.asList(segments));
LOG.info("CrawlDb update: additions allowed: " + additionsAllowed);
LOG.info("CrawlDb update: URL normalizing: " + normalize);
LOG.info("CrawlDb update: URL filtering: " + filter);
LOG.info("CrawlDb update: 404 purging: " + url404Purging);
}
//這步驟是建立crawldb同級目錄下面fetch和parse目錄
for (int i = 0; i < segments.length; i++) {
Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME);
//如果都存在了,那以後的資料可就往裡倒了啊
if (fs.exists(fetch) && fs.exists(parse)) {
FileInputFormat.addInputPath(job, fetch);
FileInputFormat.addInputPath(job, parse);
} else {
//fetch和segment目錄有存在的了,那就跳過去了,不建立了,這就是待遇,crawldb只要在就不行,fetch和segments就可以
LOG.info(" - skipping invalid segment " + segments[i]);
}
}
//這就行了,該有的路徑都有了
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb update: Merging segment data into db.");
}
try {
//這就是要開始執行剛才說的那些作業了,能執行就執行,不行就IO異常,這階段也只有IO異常
JobClient.runJob(job);
} catch (IOException e) {
//這塊也看一下程式碼吧
// public static boolean removeLockFile(FileSystem fs, Path lockFile) throws IOException {
// if (!fs.exists(lockFile)) return false;
// if (fs.getFileStatus(lockFile).isDir())
// throw new IOException("lock file " + lockFile + " exists but is a directory!");
// return fs.delete(lockFile, false);
// }
//可以看出,;判斷的意思就是crawldb這目錄要是不存在(還沒建立),話說也太嚴密了,不管了
//crawldb要是建立了,同時判斷crawldb的目錄要是存在,把crawldb刪了[這裡可以看出,如果到這出問題了,是建立再刪的]
LockUtil.removeLockFile(fs, lock);
//記下了這個作業的輸出路徑
Path outPath = FileOutputFormat.getOutputPath(job);
//至於麼......如果這個job的輸出路徑被建立了,那麼把他刪了。(因為map會生成本地檔案)
if (fs.exists(outPath) ) fs.delete(outPath, true);
throw e;
}
//這回終於沒事了,檔案也都好了,install?原始碼在下面,先粘過來
// public static void install(JobConf job, Path crawlDb) throws IOException {
//把job屬性裡的db.preserve.backup打開了,並接受返回值(成功沒)
// boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
//這裡做的就是我們爬取結束後看到的crawldb裡面的兩個檔案current和old的建立過程
// Path newCrawlDb = FileOutputFormat.getOutputPath(job);
// FileSystem fs = new JobClient(job).getFs();
// Path old = new Path(crawlDb, "old");
// Path current = new Path(crawlDb, CURRENT_NAME);
// 這裡,如果存在current了,同時old還存在了,那就把old刪了,把current變為old,(更新過程)
// if (fs.exists(current)) {
// if (fs.exists(old)) fs.delete(old, true);
// fs.rename(current, old);
// }
// fs.mkdirs(crawlDb);
// fs.rename(newCrawlDb, current);
// if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
// Path lock = new Path(crawlDb, LOCK_NAME);
// LockUtil.removeLockFile(fs, lock);
// }
CrawlDb.install(job, crawlDb);
//記錄一下這些建立資料夾成功後的時間,輸出到控制檯
long end = System.currentTimeMillis();
LOG.info("CrawlDb update: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
/*
* Configure a new CrawlDb in a temp folder at crawlDb/<rand>
*/
//驅動
public static JobConf createJob(Configuration config, Path crawlDb)
throws IOException {
//一個臨時的Crawldb路徑
Path newCrawlDb =
new Path(crawlDb,
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
//建立job,載入配置檔案
JobConf job = new NutchJob(config);
//job名字
job.setJobName("crawldb " + crawlDb);
//crawldb目錄下的current目錄路徑
Path current = new Path(crawlDb, CURRENT_NAME);
if (FileSystem.get(job).exists(current)) {
//如果存在了current,那麼建立輸入流
FileInputFormat.addInputPath(job, current);
}
//輸入方式二進位制輸入
job.setInputFormat(SequenceFileInputFormat.class);
//指定Map的類和Reduce的類
job.setMapperClass(CrawlDbFilter.class);
job.setReducerClass(CrawlDbReducer.class);
//建立輸出流
FileOutputFormat.setOutputPath(job, newCrawlDb);
//輸出方式,這個MapFileOutputFormat有些特殊
//MapFile是基於SequenceFile開發,可以說是帶索引版的SequenceFile。MapFile由兩部分組成:data和index,均由SequenceFile實現。其中data會按照鍵值對的方式儲存資料,
//index儲存索引,主要記錄key值和每個記錄的偏移值。資料訪問時,會先將索引檔案載入到記憶體中,根據對映關係定位檔案位置。
// 所以,MapFile是全域性排序。MapFileOutputFormat實際上使用的仍是MapFile的reader。所以會根據MapFile的索引檔案保證的順序。
// 由於MapFile是一個排序的檔案,典型的場景例如合併多個小檔案,將小檔案根據key值排序合併成大檔案。
job.setOutputFormat(MapFileOutputFormat.class);
//輸出key檔案型別Text和Values的型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
// https://issues.apache.org/jira/browse/NUTCH-1110
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
//這個地方上邊有說過了,就是建立current和old的路徑
public static void install(JobConf job, Path crawlDb) throws IOException {
boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
Path newCrawlDb = FileOutputFormat.getOutputPath(job);
FileSystem fs = new JobClient(job).getFs();
Path old = new Path(crawlDb, "old");
Path current = new Path(crawlDb, CURRENT_NAME);
if (fs.exists(current)) {
if (fs.exists(old)) fs.delete(old, true);
fs.rename(current, old);
}
fs.mkdirs(crawlDb);
fs.rename(newCrawlDb, current);
if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
Path lock = new Path(crawlDb, LOCK_NAME);
LockUtil.removeLockFile(fs, lock);
}
//主函式,通過ToolRunner的run方法內部呼叫GenericOptionsParser,用於解釋hadoop命令
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
//這裡主要是引數輸入異常時,控制檯上輸出的資訊
if (args.length < 1) {
System.err.println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]");
System.err.println("\tcrawldb\tCrawlDb to update");
System.err.println("\t-dir segments\tparent directory containing all segments to update from");
System.err.println("\tseg1 seg2 ...\tlist of segment names to update from");
System.err.println("\t-force\tforce update even if CrawlDb appears to be locked (CAUTION advised)");
System.err.println("\t-normalize\tuse URLNormalizer on urls in CrawlDb and segment (usually not needed)");
System.err.println("\t-filter\tuse URLFilters on urls in CrawlDb and segment");
System.err.println("\t-noAdditions\tonly update already existing URLs, don't add any newly discovered URLs");
return -1;
}
//getConf()裝的是hadoop的配置資訊,把這幾個配置資訊填上值
boolean normalize = getConf().getBoolean(CrawlDbFilter.URL_NORMALIZING, false);
boolean filter = getConf().getBoolean(CrawlDbFilter.URL_FILTERING, false);
boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);
boolean force = false;
final FileSystem fs = FileSystem.get(getConf());
HashSet<Path> dirs = new HashSet<Path>();
//這是判斷爬取命令裡面輸入的引數是不是含有這幾個,上面它賦上了預設值,但是給了可以修改的機會,如果下面幾個引數,預設值就會變成與原來相反的
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-normalize")) {
normalize = true;
} else if (args[i].equals("-filter")) {
filter = true;
} else if (args[i].equals("-force")) {
force = true;
} else if (args[i].equals("-noAdditions")) {
additionsAllowed = false;
} else if (args[i].equals("-dir")) {
//這裡是各個檔案路徑,看一下HadoopFSUtil的原始碼部分
// public static PathFilter getPassDirectoriesFilter(final FileSystem fs) {
// return new PathFilter() {
// public boolean accept(final Path path) {
// try {
//返回建立指定引數的目錄PathFilter型別
// return fs.getFileStatus(path).isDir();
// } catch (IOException ioe) {
// return false;
// }
//這裡就是根據-dir建立指定的路徑的集合
FileStatus[] paths = fs.listStatus(new Path(args[++i]), HadoopFSUtil.getPassDirectoriesFilter(fs));
//把這些路徑放到set集合中
dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
} else {
//如果以上的引數都不包含,簡單的加入我們的輸入的引數地址就好了
dirs.add(new Path(args[i]));
}
}
try {
//從這裡就可以看出任務流程了
update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]), normalize, filter, additionsAllowed, force);
return 0;
} catch (Exception e) {
LOG.error("CrawlDb update: " + StringUtils.stringifyException(e));
return -1;
}
}
}
//該程式的執行過程main方法呼叫重寫的run函式,在run函式裡面呼叫update,加上標記呼叫另一個update方法,在這個方法裡面呼叫JobConf()和install()方法
//crawldb這個類主要的功能就是輸入命令和引數後,建立目錄,判斷目錄
相關推薦
nutch原始碼crawldb類原始碼分析
package org.apache.nutch.crawl; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; // Commons Logging impor
Netflix Eureka原始碼分析(3)——listener(EurekaBootStrap監聽類)分析
web.xml中的listener: <listener> <listener-class>com.netflix.eureka.EurekaBootStrap</listener-class> </listener>
caffe Layer基類原始碼分析
建構函式 //標頭檔案 include/caffe/layer.hpp //實現檔案 src/caffe/layer.cpp // src/caffe/layer.cu /*
java中List介面的實現類 ArrayList,LinkedList,Vector 的區別 list實現類原始碼分析
java面試中經常被問到list常用的類以及內部實現機制,平時開發也經常用到list集合類,因此做一個原始碼級別的分析和比較之間的差異。 首先看一下List介面的的繼承關係: list介面繼承Col
JDK集合類原始碼分析
本文總結一下JDK中集合類的實現。首先看下集合類的繼承圖: 可看出,介面主要有Collection和Map兩大主線,其中Collection又有List和Set兩個分支。List是一個有序的佇
利用Idea生成的類圖分析框架原始碼
1.前提 最近在研究Spring原始碼,藉助spring官方提供的spring doc,和一些優秀博主的部落格。但是儘管如此,在龐大的原始碼庫中,使用Ctrl+Alt+B和Ctrl+←,還是
java Character類原始碼分析
一、使用 構建Character物件: 1 public class CharTest { 2 public static void main(String[] args) { 3 Character c1 = new Character('A'); 4
Java原始碼 - Exceotion異常類的基類Throwable分析
常用的異常有:Error、Exception,這兩個類的基類都是Throwable。 其中Error是用來表示編譯時和系統的錯誤,這一類問題,基本不需要我們關心。 Exception就是我們常見的異常。由原始碼可知,Exception類自身並沒有什麼重要的東西,它只是Throwable類的一
Java容器類原始碼-Vector的最全的原始碼分析(四)
(31) public synchronized boolean retainAll(Collection<?> c) 原始碼解釋: 將陣列中不是c中包含的元素全部移除。呼叫AbstractCollection的實現,程式碼也很簡單,不贅
Java容器類原始碼-Vector的最全的原始碼分析(三)
(16) public synchronized void removeElementAt(int index) 原始碼解釋: 獲取到index位置後有多少個元素,並將index位置後面的元素複製到index位置前的後面,再將index位置置空。複製
Java容器類原始碼-Vector的最全的原始碼分析(一)
一、概述 我們都知道,在Java的Collections包含了List和Set,而List裡面有ArrayList、LinkedList、還有Vector,對於很多Java初學者來說,前面兩個比較常用,ArrayList查詢效率比較高(底層是陣列實現),
Java容器類原始碼-Vector的最全的原始碼分析(二)
三、原始碼解讀 1. 繼承、實現 extends:AbstractList<E> implements:List<E>, RandomAccess, Cloneable, java.io.Serializable 2.
tomcat原始碼解析(三)——Digester類原始碼解析及Rule分析
在這篇文章中主要針對tomcat原始碼中Rule部分的解析;這部分功能主要涉及server.xml檔案載入和tomcat容器中各元件初始化的過程。在我之前的文章中《tomcat原始碼解析(一)——Bootstrap和Catalina啟動部分》和《tomcat原始碼解析(二)
java.lang包中的包裝類原始碼分析
八個基本資料型別byte,char,short,int,long,double,float,boolean,對應的包裝類位於java.lang包下面。只有對資料型別更好的瞭解,才能更高效的使用,更得心應手。本文通過整體分析來了解八個包裝類和一個字串類String,分析類設計共
【原始碼】主成分分析(PCA)與獨立分量分析(ICA)MATLAB工具箱
本MATLAB工具箱包含PCA和ICA實現的多個函式,並且包括多個演示示例。 在主成分分析中,多維資料被投影到最大奇異值相對應的奇異向量上,該操作有效地將輸入訊號分解成在資料中最大方差方向上的正交分量。因此,PCA常用於維數降低的應用中,通過執行PCA產生資料的低維表示,同時,該低維表
Java 中使用 google.zxing 快捷生成二維碼(附工具類原始碼)
移動網際網路時代,基於手機端的各種活動掃碼和收付款碼層出不窮;那我們如何在Java中生成自己想要的二維碼呢?下面就來講講在Java開發中使用 google.zxing 生成二維碼。 一般情況下,Java生成二維碼的方式有三種,一種是基於 google.zxing ,是google公司出的;一種
【go原始碼分析】go原始碼之slice原始碼分析
Go 語言切片是對陣列的抽象。 Go 陣列的長度不可改變,與陣列相比切片的長度是不固定的,可以追加元素,在追加時可能使切片的容量增大。 len() 和 cap() 函式 切片是可索引的,並且可以由 len() 方法獲取長度。
【go原始碼分析】go原始碼之list原始碼分析
本文針對go 1.11版本,路徑src/container/list/list.go 資料結構 Element結構體 Value 前驅 後繼 // Element is an element of a linked list. type Element st
Java集合類原始碼解析:AbstractMap
目錄 引言 原始碼解析 抽象函式entrySet() 兩個集合檢視 操作方法 兩個子類 參考: 引言 今天學習一個Java集合的一個抽象類 AbstractMap ,AbstractMap 是Map介面的 實現類之一,也是HashMap、T
Java集合類原始碼解析:HashMap (基於JDK1.8)
目錄 前言 HashMap的資料結構 深入原始碼 兩個引數 成員變數 四個構造方法 插入資料的方法:put() 雜湊函式:hash() 動態擴容:resize() 節點樹化、紅黑樹的拆分 節點樹化