1. 程式人生 > >Lucene搜尋引擎+HDFS+MR完成垂直搜尋

Lucene搜尋引擎+HDFS+MR完成垂直搜尋

  1 package org.liky.sina.craw;
  2 
  3 import java.util.ArrayList;
  4 import java.util.HashMap;
  5 import java.util.HashSet;
  6 import java.util.List;
  7 import java.util.Map;
  8 import java.util.Set;
  9 
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.fs.FSDataOutputStream;
12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.jsoup.Jsoup; 15 import org.jsoup.nodes.Document; 16 import org.jsoup.nodes.Element; 17 import org.jsoup.select.Elements; 18 import org.liky.sina.dao.INewsDAO; 19 import org.liky.sina.dbc.DataBaseConnection;
20 import org.liky.sina.factory.DAOFactory; 21 import org.liky.sina.vo.News; 22 23 /** 24 * 爬蟲開始進行資料庫操作以及HDFS寫入 25 * 26 * @author k04 27 * 28 */ 29 public class URLDemo { 30 // 該物件的構造方法會預設載入hadoop中的兩個配置檔案,hdfs-site.xml和core-site.xml 31 // 這兩個檔案包含訪問hdfs所需的引數值 32 private static
Configuration conf = new Configuration(); 33 34 private static int id = 1; 35 36 private static FileSystem fs; 37 38 private static Path path; 39 40 // 等待爬取的url 41 private static List<String> allWaitUrl = new ArrayList<>(); 42 // 已經爬取的url 43 private static Set<String> allOverUrl = new HashSet<>(); 44 // 記錄所有url的深度,以便在addUrl方法內判斷 45 private static Map<String, Integer> allUrlDepth = new HashMap<>(); 46 // 爬取網頁的深度 47 private static int maxDepth = 5; 48 // 宣告object獨享幫助進行執行緒的等待操作 49 private static Object obj = new Object(); 50 // 設定匯流排程數 51 private static final int MAX_THREAD = 20; 52 // 記錄空閒的執行緒數 53 private static int count = 0; 54 55 // 宣告INewsDAO物件, 56 private static INewsDAO dao; 57 58 static { 59 dao = DAOFactory.getINewsDAOInstance(new DataBaseConnection()); 60 } 61 62 public static void main(String args[]) { 63 // 爬取的目標網址 64 String strUrl = "http://news.sina.com.cn/"; 65 66 // 爬取第一個輸入的url 67 addUrl(strUrl, 0); 68 // 建立多個執行緒 69 for (int i = 0; i < MAX_THREAD; i++) { 70 new URLDemo().new MyThread().start(); 71 } 72 73 // DataBaseConnection dc=new DataBaseConnection(); 74 // dc.getConnection(); 75 76 } 77 78 public static void parseUrl(String strUrl, int depth) { 79 // 先判斷當前url是否爬取過 80 // 判斷深度是否符合要求 81 if (!(allOverUrl.contains(strUrl) || depth > maxDepth)) { 82 System.out.println("當前執行的 " + Thread.currentThread().getName() 83 + " 爬蟲執行緒處理爬取: " + strUrl); 84 85 try { 86 // 用jsoup進行資料爬取 87 Document doc = Jsoup.connect(strUrl).get(); 88 // 通過doc接受返回的結果 89 // 提取有效的title和description 90 String title = doc.title(); 91 Element descE = doc.getElementsByAttributeValue("name", 92 "description").first(); 93 String desc = descE.attr("content"); 94 95 // System.out.println(title + " --> " + desc); 96 97 // 如果有效,則驚醒儲存 98 if (title != null && desc != null && !title.trim().equals("") 99 && !desc.trim().equals("")) { 100 // 需要生成一個id,以便放入資料庫中,因此id也要加入到HDFS中,便於後續索引 101 News news = new News(); 102 news.setId(id++); 103 news.setTitle(title); 104 news.setDescription(desc); 105 news.setUrl(strUrl); 106 // 新增到資料庫語句 107 dao.doCreate(news); 108 // 向HDFS儲存資料 109 path = new Path("hdfs://localhost:9000/sina_news_input/" 110 + System.currentTimeMillis() + ".txt"); 111 fs = path.getFileSystem(conf); 112 FSDataOutputStream os = fs.create(path); 113 // 進行內容輸出,此處需要用news.getId(),不然資料庫和HDFS的id會不相同,因為多執行緒的執行 114 os.writeUTF(news.getId() + "\r\n" + title + "\r\n" + desc); 115 os.close(); 116 117 // 解析所有超連結 118 Elements aEs = doc.getElementsByTag("a"); 119 // System.out.println(aEs); 120 if (aEs != null && aEs.size() > 0) { 121 for (Element aE : aEs) { 122 String href = aE.attr("href"); 123 System.out.println(href); 124 // 擷取網址,並給出篩選條件!!! 125 if ((href.startsWith("http:") || href 126 .startsWith("https:")) 127 && href.contains("news.sina.com.cn")) { 128 // 呼叫addUrl()方法 129 addUrl(href, depth + 1); 130 } 131 } 132 } 133 134 } 135 136 } catch (Exception e) { 137 138 } 139 // 吧當前爬完的url放入到偶爾中 140 allOverUrl.add(strUrl); 141 System.out.println(strUrl + "爬去完成,已經爬取的內容量為:" + allOverUrl.size() 142 + "剩餘爬取量為:" + allWaitUrl.size()); 143 144 // 判斷是否集合中海油其他的內容需要進行爬取,如果有,則進行執行緒的喚醒 145 if (allWaitUrl.size() > 0) { 146 synchronized (obj) { 147 obj.notify(); 148 } 149 } else { 150 System.out.println("爬取結束..."); 151 System.exit(0); 152 } 153 154 } 155 } 156 157 /** 158 * url加入到等待佇列中 並判斷是否已經放過,若沒有就放入allUrlDepth中 159 * 160 * @param href 161 * @param depth 162 */ 163 public static synchronized void addUrl(String href, int depth) { 164 // 將url放入佇列中 165 allWaitUrl.add(href); 166 // 判斷url是否已經存在 167 if (!allUrlDepth.containsKey(href)) { 168 allUrlDepth.put(href, depth + 1); 169 } 170 } 171 172 /** 173 * 獲取等待佇列下一個url,並從等待佇列中移除 174 * 175 * @return 176 */ 177 public static synchronized String getUrl() { 178 if (allWaitUrl.size() > 0) { 179 String nextUrl = allWaitUrl.get(0); 180 allWaitUrl.remove(0); 181 return nextUrl; 182 } 183 return null; 184 } 185 186 /** 187 * 用多執行緒進行url爬取 188 * 189 * @author k04 190 * 191 */ 192 public class MyThread extends Thread { 193 194 @Override 195 public void run() { 196 // 編寫一個死迴圈,以便執行緒可以一直存在 197 while (true) { 198 // 199 200 String url = getUrl(); 201 if (url != null) { 202 // 呼叫該方法爬取url的資料 203 parseUrl(url, allUrlDepth.get(url)); 204 } else { 205 System.out.println("當前執行緒準備就緒,等待連線爬取:" + this.getName()); 206 // 執行緒+1 207 count++; 208 // 建立一個物件,幫助執行緒進入等待狀態wait() 209 synchronized (obj) { 210 try { 211 obj.wait(); 212 } catch (Exception e) { 213 e.printStackTrace(); 214 } 215 // 執行緒-1 216 count--; 217 } 218 } 219 } 220 } 221 222 } 223 224 }