1. 程式人生 > >用rabbitmq 寫一個多機分散式爬蟲

用rabbitmq 寫一個多機分散式爬蟲


                     其實說是爬蟲也只是個幌子。換成其他的耗資源的程式也成,耗CPU,耗網路。                        

                     1,要有被用於爬取的url,用一個client生成url,在server中宣告一個url_queue,並把他放在server中,如果要持久化的花,記得加上durable。

                     2,各個爬蟲位於各個機子上,從 url_queue中取出url,成功處理爬取網頁之後反饋給server,
        
        
        

將訊息分配給Client有兩種方式,推和拉。

          推呢,是S把訊息給C,主動權掌握在S中,S不給C 的話,C就一直無事地在等。有任務要做的時候,S會把這寫任務分配給C。但是,如果C掛掉了的話,S也會分配給C的,S怎麼知道C掛了。所以問題,就變成S怎麼去了解C了,rabbitmq預設no_ack=False,不是沒有道理的,這要求C在完成S給的任務之後告訴S,‘ok了’。同時我們還可以設定

channel.basic_qos(prefetch_count=1)

也就是說,C得閒的時候S才分配任務給他。這個閒不閒的意思就是  ,C告訴S ‘ok了’,S就認為C現在閒了,否則S認為C還在忙。

         而拉呢,就是C主動,整天沒事就大個電話問,‘嘿,S,有沒有事做啊’,所以容易導致的問題就是浪費CPU資源,輪詢。

爬蟲一般我是用PyQuery來提取網頁資訊。pyquery的按照很麻煩,在windows先就跟麻煩了。所以我是把這個downloadweb的爬蟲放到windos,把download下頁面處理成string傳送到rabbitmq伺服器,再用我的ubuntu和centos(都按照了pyquery)來解析。這種小場景用rabbitmq感覺有點大材小用啊

#coding=utf8

import pika,urllib2,urllib,cookielib,random,time
import threading 
from Queue import Queue
q=Queue()#用於從伺服器接收要下載url
webBuff=Queue()#快取下載的網頁
proxy_suport=urllib2.ProxyHandler({'http':'http://127.0.0.1:8087'})
cookie_suport=urllib2.HTTPCookieProcessor(cookielib.CookieJar())
opener_proxy=urllib2.build_opener(proxy_suport,cookie_suport,urllib2.HTTPHandler)
opener_normal=urllib2.build_opener(cookie_suport,urllib2.HTTPHandler)

class downloadWeb(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        global q
        while True:
            while not q.empty():
                url=q.get()
                retF=''
                print 'Downloading %s' % url
                flag=False
                for i in range(3):
                    try:
                        retF=opener_normal.open(url,timeout=5).read()
                        flag=True
                        break
                    except Exception,e:
                        print 'error'
                        time.sleep(i)
                if not flag:
                    try:
                        retF=opener_proxy.open(url,timeout=5).read()
                        flag=True
                    except Exception,e:
                        print 'error'
                if flag and webBuff.qsize()<100:
                    webBuff.put(retF)
                while webBuff.qsize()>=100:
                    time.sleep(1)
               

threadList=[]
threadNum=2
def cleanWebBuff(ch):
    while True:
        while not webBuff.empty():
            s=webBuff.get()
            ch.basic_publish(exchange='',routing_key='webstring',body=s)
try:
    c=pika.ConnectionParameters(host='192.168.1.113')
    conn=pika.BlockingConnection(c)
    channel=conn.channel()
    channel.queue_declare(queue='appurl')
    channel.queue_declare(queue='webstring')
    threading.Thread(target=cleanWebBuff,args=(channel,)).start()
    for i in range(threadNum):
        threadList.append(downloadWeb())
    for one in threadList:
        one.start()
    def callback(cn,method,pro,body):
        url=str(body)
        print 'Received %s' %url
        q.put(url)
        while q.qsize()>3:
            time.sleep(1)
        cn.basic_ack(delivery_tag=method.delivery_tag)
        

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,queue='appurl')
    print 'start...'
    channel.start_consuming()
except Exception,e:
    channel.start_consuming()

一開始寫的時候問題挺多的,

1,一個一個url的下載,很慢。瓶頸在於網路傳輸,so=====>多執行緒。

2,錯誤,偶爾會出現這個錯誤。

Traceback (most recent call last):
  File "C:\Users\YanisNet7\Documents\myCode\rabbitmq_client.py", line 76, in <module>
    channel.start_consuming()
  File "build\bdist.win32\egg\pika\adapters\blocking_connection.py", line 293, in start_consuming
    self.transport.connection.process_data_events()
  File "build\bdist.win32\egg\pika\adapters\blocking_connection.py", line 87, in process_data_events
    raise AMQPConnectionError
AMQPConnectionError
這個錯誤出現在,channel.start_consuming()。我給外層加一個異常控制

3,

Warning (from warnings module):
  File "C:\Python27\lib\site-packages\pika-0.9.5-py2.7.egg\pika\connection.py", line 642
UserWarning: Pika: Write buffer exceeded warning threshold at 95420 bytes and an estimated 12 frames behind

因為我採用了多執行緒,可能出現有多個執行緒同時處在
ch.basic_publish(exchange='',routing_key='webstring',body=s)
basic_publish的傳送通道的快取是有限的,解決方法可以是a,修改配置檔案;b,我自己建一個用於快取webString的webBuff(也要注意限制啊),一個一個發,區域網的網速難道還不會比外網快;c,多程序,一個管道的buff不夠,我就多個,不過這點我還沒做。

可以優化的方向:a,多程序啦,這個我還沒做

b,pyquery處理的時候,除了用多執行緒併發處理之外,我希望後期能夠實現非同步IO,即我就只管解析網頁,至於把解析的結果存檔這種事我就不想等了。非同步IO是下一步學習方向

c,搞大點的場景,這種小爬蟲沒意思。

相關推薦

rabbitmq 一個分散式爬蟲

                     其實說是爬蟲也只是個幌子。換成其他的耗資源的程式也成,耗CPU,耗網路。                                              1,要有被用於爬取的url,用一個client生成url,在ser

webmagic一個簡單的網路爬蟲

用webmagic寫一個網路爬蟲(不是註解) 引入的jar包: 具體程式碼如下: import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import

入門級Python一個簡單的網路爬蟲下載和獲取資料

學會如何使用API通過url(Uniform Resource Locator 統一資源定位符)連線網路,獲取網站的API獲取url儲存的API,request執行獲取的urlrequests.get(url) 定義一個變數,將API響應儲存在裡面,呼叫json將r儲存的ap

python一個豆瓣短評通用爬蟲(登入、爬取、視覺化)

>原創技術公眾號:`bigsai`,本文在1024釋出,祝大家節日快樂,心想事成。 @[TOC](文章結構) ## 前言 在本人上的一門課中,老師對每個小組有個任務要求,介紹和完成一個小模組、工具知識的使用。然而我所在的組剛好遇到的是python爬蟲的小課題。 心想這不是很簡單嘛,搞啥呢?想著

GoWorld – Golang一個分散式可擴充套件、可熱更的遊戲伺服器

Golang具有執行效率高、記憶體安全等優良特性,因此是非常適合用來進行伺服器開發。使用Golang開發遊戲伺服器有如下的優點: 執行效率遠高於各種指令碼語言,大幅度提升伺服器承載能力 記憶體安全,不會像C++伺服器那樣出現記憶體錯誤導致伺服器down機 Goroutine能夠很好地利用多核計算

Python一個簡單的爬蟲

和朋友都灰常懶,不想上下滾動頁面看價格,所以寫了一個爬蟲,用於儲存商品價格。 環境:macOS、python3.5 IDE:pycharm 使用的庫:BeautifulSoup、urllib BeautifulSoup:優秀的HTML/XML的解析

python一個簡單的爬蟲功能

iOS開發如果之前沒接觸過除了c和c++(c++太難了,不花個十來年基本不可能精通)的語言,第二門語言最好的選擇就是python.原因就是 1.語法簡單 2.庫太多,隨便想要什麼功能的庫都找得到,簡直程式設計界的哆啦A夢. 3.語法優美,不信?你去看看py

python一個簡單的爬蟲儲存在json檔案中

學習python沒多久,所以只能寫一個很簡單的爬蟲啦~~ 我使用annacada 自帶的spyder來寫爬蟲的,這次我們要爬取得網站是http://www.drugbank.ca/drugs, 主要是爬取裡面每種藥物的資訊到json檔案中,包括有 DrugBank ID, 

Python一個批量生成賬號的函數(戶控制數據長度、數據條數)

shuf open 小寫 長度 數據 ase 函數 用戶控制 app # 1、寫一個函數,批量生成一些註冊使用的賬號:[email protected]/* */,長度由用戶輸入,產生多少條也由用戶輸入,用戶名不能重復,用戶名必須由大寫字母、小寫字母、數字組成

集合一個簡單的隨機分組,以及集合內元素數量查詢

移除 以及 表示 元素 move spa color 查詢 println 12個人,隨機分為4組 public static void main(String[] args) { List list = new ArrayList();

node一個皖水公寓自動刷房源腳本

port 發件人 ges 開啟 pan sendemail transport tar 收件人 因為住的地方離公司太遠,每天上下班都要坐很久的班車,所以最近想搬到公司旁邊的皖水公寓住。去問了一下公寓的客服,客服說房源現在沒有了,只能等到別人退房,才能在網站上申請到。 如果

Python一個小遊戲

python 小腳本 剛學Python時間不長,但也知道了一點,看別人的參考寫了一個猜數字小遊戲,也算是禹學於樂吧。#!/usr/bin/env python #coding=utf-8

hive一個獲取本日期的季度初的sql

ble 等於 diff 需要 lock 2-0 lec select ediff 由於項目需要獲取(本季度的的發生額總和)/(本季度經歷的天數)的數據(還有月均,年均的數據)。 判斷季度的時候是一個難點,開始的時候寫了一堆case when 來判斷月份,後來寫著寫著發現,這

004.怎麽VMware新建一個虛擬

get cnblogs vmware 系統類型 電源 images html tps 版本 VMWare官網鏈接:https://www.vmware.com/cn.html 1.打開VMware X.0(X是版本),看到主界面,選擇“新建虛擬機”。 2.彈出新建虛擬機的

五:JAVA一個阿裏雲VPC Open API調程序

外部jar包 ef7 dac java 方式 命令 pro ng- 自動化管理 用JAVA寫一個阿裏雲VPC Open API調用程序 摘要:用JAVA拼出來Open API的URL 引言 VPC提供了豐富的API接口,讓網絡工程是可以通過API調用的方式

Struts2框架使用(一)之Struts2一個HelloWorld

同時 style test 框架 exceptio world utf-8 dispatch har 這裏記一下第一次Struts2框架的使用方法,首先學會使用Struts2寫一個HelloWorld。 首先導入Struts2所依賴得jar包 首先創建一個web工程

python一個簡單的excel表格獲取當時的linux系統信息

psutil 生成 之前 建立 set ces ext 流量 關閉 最近在學習excel表格的制作,順便結合之前學習的內容,利用python的兩個模板,分別是獲取系統信息的psutil,和生成excel表格的xlsxwriter。利用這兩個模板將生成一個簡單的excel表格

python一個九九乘法表-2月19日/2018

九九乘法 while -c pos ont 九九 pytho 九九乘法表 font first = 1 while first<=9:   sec=1   while sec<=first:     print(str(sec),"x",str(first),

shell一個簡易計算器,可以實現加、減、乘、除運算,假如腳本名字為1.sh,執行示例:./1.

a-z 依次 腳本 als 示例 內置 數位 特殊字符 使用 用shell寫一個簡易計算器,可以實現加、減、乘、除運算,假如腳本名字為1.sh,執行示例:./1.sh 1 + 2#!/bin/bash if [ $# -ne 3 ] then echo "參

Canvas一個簡單的遊戲--別踩白塊兒

來吧 ber -c [] for 輸入 itl event 內部   第一次寫博客也不知怎麽寫,反正就按照我自己的想法來吧!怎麽說呢?還是不要扯那些多余的話了,直接上正題吧! 第一次用canvas寫遊戲,所以挑個簡單實現點的來幹:別踩白塊兒,其他那些怎麽操作的那些就不用再扯