從零系列--node爬蟲利用進程池寫數據
阿新 • • 發佈:2018-09-04
存儲 exit titles date [] += port 請求 如果
1、主進程
const http = require(‘http‘); const fs = require(‘fs‘); const cheerio = require(‘cheerio‘); const request = require(‘request‘); const makePool = require(‘./pooler‘) const runJob = makePool(‘./worker‘) var i = 0; var url = "http://xxx.com/articles/"; //初始url let g = ‘‘; function fetchPage(x) { //封裝了一層函數console.log(x) if(!x || x==‘‘){ g.next() return } startRequest(x); } function startRequest(x) { //采用http模塊向服務器發起一次get請求 return http.get(x, function (res) { var html = ‘‘; //用來存儲請求網頁的整個html內容 var titles = []; res.setEncoding(‘utf-8‘); //防止中文亂碼 //監聽data事件,每次取一塊數據 res.on(‘data‘, function (chunk) { html += chunk; }); //監聽end事件,如果整個網頁內容的html都獲取完畢,就執行回調函數 res.on(‘end‘, function () { var $ = cheerio.load(html); //采用cheerio模塊解析html var time = new Date(); varp = $(‘.content p‘) p.each((index,item)=>{ if($(item).find(‘strong‘).length) { var fex_item = { //獲取文章的標題 title: $(item).find(‘strong‘).text().trim(), //獲取文章發布的時間 time: time, //獲取當前文章的url link: $($(item).children(‘a‘).get(0)).attr(‘href‘), des:$(item).children().remove()&&$(item).text(), //i是用來判斷獲取了多少篇文章 i: index+1 }; runJob(fex_item,(err,data)=>{ if(err) console.error(‘get link error‘) console.log(‘get link ok‘) }) } }) g.next() }) }).on(‘error‘, function (err) { console.log(err); g.next() }); } function* gen(urls){ let len = urls.length; for(var i=0;i<len;i++){ yield fetchPage(urls[i]) } } function getUrl(x){ //采用http模塊向服務器發起一次get請求 http.get(x, function (res) { var html = ‘‘; //用來存儲請求網頁的整個html內容 var titles = []; res.setEncoding(‘utf-8‘); //防止中文亂碼 //監聽data事件,每次取一塊數據 res.on(‘data‘, function (chunk) { html += chunk; }); //監聽end事件,如果整個網頁內容的html都獲取完畢,就執行回調函數 res.on(‘end‘, function () { var $ = cheerio.load(html); //采用cheerio模塊解析html var time = new Date(); var lists = $(‘.articles .post-list li‘) var urls = []; lists.each(function(index,item){ if($(item).find(‘a‘).length) { var url = ‘http://xxxx.com‘+$($(item).children(‘a‘).get(0)).attr(‘href‘); if(url) urls.push(url); //主程序開始運行 } }) g = gen(urls) g.next() }) }).on(‘error‘, function (err) { console.log(err); }); } getUrl(url)
2、創建進程池
const cp = require(‘child_process‘) const cpus = require(‘os‘).cpus().length; module.exports = function pooler(workModule){ let awaiting = [],readyPool = [],poolSize = 0; return function doWork(job,cb){ if(!readyPool.length&&poolSize>cpus) return awaiting.push([doWork,job,cb]) let child = readyPool.length ? readyPool.shift():(poolSize++,cp.fork(workModule)) let cbTriggered = false; child.removeAllListeners() .once(‘error‘,function(err){ if(!cbTriggered){ cb(err) cbTriggered = true } child.kill() }) .once(‘eixt‘,function(){ if(!cbTriggered) cb(new Error(‘childe exited with code:‘+code)) poolSize--; let childIdx = readyPool.indexOf(child) if(childIdx > -1)readyPool.splice(childIdx,1) }) .once(‘message‘,function(msg){ cb(null,msg) cbTriggered = true readyPool.push(child) if(awaiting.length)setImmediate.apply(null,awaiting.shift()) }) .send(job) } }
3、工作進程接受消息並處理內容
const fs = require(‘fs‘) process.on(‘message‘,function(job){ let _job = job let x = ‘TITLE:‘+_job.title+‘\n‘ + ‘LINK:‘+_job.link + ‘\n DES:‘+_job.des+‘\n SAVE-TIME:‘+_job.time fs.writeFile(‘../xx/data/‘ + _job.title + ‘.txt‘, x, ‘utf-8‘, function (err) { if (err) { console.log(err); } }); process.send(‘finish‘) })
從零系列--node爬蟲利用進程池寫數據