1. 程式人生 > >Python 3 中的 urllib 例項

Python 3 中的 urllib 例項

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
import time
import sys
import gzip
import socket
import urllib.request, urllib.parse, urllib.error
import http.cookiejar
 
class HttpTester:
    def __init__(self, timeout=10, addHeaders=True):
        socket.setdefaulttimeout(timeout)   # 設定超時時間
 
        self.__opener = urllib.request.build_opener()
        urllib.request.install_opener(self.__opener)
 
        if addHeaders: self.__addHeaders()
 
    def __error(self, e):
        '''錯誤處理'''
        print(e)
 
    def __addHeaders(self):
        '''新增預設的 headers.'''
        self.__opener.addheaders = [('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'),
                                    ('Connection', 'keep-alive'),
                                    ('Cache-Control', 'no-cache'),
                                    ('Accept-Language:', 'zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3'),
                                    ('Accept-Encoding', 'gzip, deflate'),
                                    ('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8')]
 
    def __decode(self, webPage, charset):
        '''gzip解壓,並根據指定的編碼解碼網頁'''
        if webPage.startswith(b'\x1f\x8b'):
            return gzip.decompress(webPage).decode(charset)
        else:
            return webPage.decode(charset)
 
    def addCookiejar(self):
        '''為 self.__opener 新增 cookiejar handler。'''
        cj = http.cookiejar.CookieJar()
        self.__opener.add_handler(urllib.request.HTTPCookieProcessor(cj))
 
    def addProxy(self, host, type='http'):
        '''設定代理'''
        proxy = urllib.request.ProxyHandler({type: host})
        self.__opener.add_handler(proxy)
 
    def addAuth(self, url, user, pwd):
        '''新增認證'''
        pwdMsg = urllib.request.HTTPPasswordMgrWithDefaultRealm()
        pwdMsg.add_password(None, url, user, pwd)
        auth = urllib.request.HTTPBasicAuthHandler(pwdMsg)
        self.__opener.add_handler(auth)
 
    def get(self, url, params={}, headers={}, charset='UTF-8'):
        '''HTTP GET 方法'''
        if params: url += '?' + urllib.parse.urlencode(params)
        request = urllib.request.Request(url)
        for k,v in headers.items(): request.add_header(k, v)    # 為特定的 request 新增指定的 headers
 
        try:
            response = urllib.request.urlopen(request)
        except urllib.error.HTTPError as e:
            self.__error(e)
        else:
            return self.__decode(response.read(), charset)
 
    def post(self, url, params={}, headers={}, charset='UTF-8'):
        '''HTTP POST 方法'''
        params = urllib.parse.urlencode(params)
        request = urllib.request.Request(url, data=params.encode(charset))  # 帶 data 引數的 request 被認為是 POST 方法。
        for k,v in headers.items(): request.add_header(k, v)
 
        try:
            response = urllib.request.urlopen(request)
        except urllib.error.HTTPError as e:
            self.__error(e)
        else:
            return self.__decode(response.read(), charset)
 
    def download(self, url, savefile):
        '''下載檔案或網頁'''
        header_gzip = None
 
        for header in self.__opener.addheaders:     # 移除支援 gzip 壓縮的 header
            if 'Accept-Encoding' in header:
                header_gzip = header
                self.__opener.addheaders.remove(header)
 
        __perLen = 0
        def reporthook(a, b, c):    # a:已經下載的資料大小; b:資料大小; c:遠端檔案大小;
            if c > 1000000:
                nonlocal __perLen
                per = (100.0 * a * b) / c
                if per>100: per=100
                per = '{:.2f}%'.format(per)
                print('\b'*__perLen, per, end='')     # 列印下載進度百分比
                sys.stdout.flush()
                __perLen = len(per)+1
 
        print('--> {}\t'.format(url), end='')
        try:
            urllib.request.urlretrieve(url, savefile, reporthook)   # reporthook 為回撥鉤子函式,用於顯示下載進度
        except urllib.error.HTTPError as e:
            self.__error(e)
        finally:
            self.__opener.addheaders.append(header_gzip)
            print()