1. 程式人生 > >《Python核心程式設計》之資料庫程式設計快速入門與專案實戰

《Python核心程式設計》之資料庫程式設計快速入門與專案實戰

資料庫儲存是一種持久化儲存,因此先從持久化儲存說起吧。

1.持久化儲存

(1).在任何應用中,都需要持久化儲存。一般有3種基礎的儲存機制:檔案、資料庫系統和一些混合型別。這種混合型別包括現有現有系統上的API、ORM、檔案管理器、電子表格、配置檔案等。
(2).檔案或簡單的持久化儲存可以滿足一些小應用的需求,而大型資料庫或高資料容量的應用則需要更加成熟的資料庫系統。

2.資料庫基本操作和SQL

(1).資料庫的底層儲存
資料庫通常使用檔案系統作為基本的持久化儲存,它可以是普通的作業系統檔案、專用的作業系統檔案,甚至是原始的磁碟分割槽。
(2).使用者介面
大多數資料庫系統提供了命令列工具,可以用其執行SQL語句或查詢。此外,還有一些GUI工具,使用命令列客戶端或資料庫客戶端庫,向用戶提供更加便捷的介面。
(3).模式
資料庫儲存可以抽象為一張表。每行欄位都有一些欄位對應於資料庫的列。每一列的表定義的集合以及每個表的資料型別放到一起定義了資料庫的模式。
(4).SQL
資料庫命令和查詢操作是通過SQL語句提交給資料庫的。雖然並非所有資料庫都使用SQL,但是大多數關係資料庫使用。注意,大部分資料庫都是不區分大小寫的,尤其是針對資料庫命令而言。一般來說,對資料庫關鍵字使用大寫字母是最為廣泛接受的風格。
SQL操作包括:
[1].對資料庫的操作(CREATE, DROP)
[2].對錶的操作(CREATE,DROP)
[3].對錶中資料欄位的操作(INSERT, DELETE, UPDATE, SELECT)
[4].對許可權進行分配(GRANT)

3.資料庫和Python

訪問資料庫包括直接通過資料庫介面訪問和使用ORM訪問兩種方式。其中,使用ORM訪問的方式不需要顯示給出SQL命令,但也能完成相同的任務。
在Python中,資料庫是通過介面卡的方式進行訪問的。介面卡是一個Python模組,使用它可以與關係資料庫的客戶端庫(通常用C語言編寫)介面相連。一般情況下,會推薦所有的Python介面卡應當符合Python資料庫興趣小組的API標準。

4.Python的DB-API
目的是,為不同的關係資料庫提供一致性的介面,使不同資料庫間移植程式碼變得更加簡單。
DB-API標準要求必須提供下文列出的功能和屬性。一個相容DB-API的模組必須定義apilevel、threadsafety、paramstyle和connect()四個全域性屬性。

模組屬性
(1).資料屬性
【1】apilevel:該字串(注意,不是浮點型)指明瞭模組需要相容的DB-API最高版本,比如,1.0、2.0等。
【2】threadsafety:這是一個整型值,可選值如下:
0:不支援執行緒安全。執行緒間不能共享模組。
1:最小化執行緒安全支援:執行緒間可以共享模組,但是不能共享連線。
2:適度的執行緒安全支援:執行緒間可以共享模組和連線,但是不能共享遊標。
3:完整的執行緒安全支援:執行緒間可以共享模組、連線和遊標。
【3】paramstyle:DB-API支援以不同的方式指明如何將引數與SQL語句進行整合,並最終傳遞給伺服器中執行。該引數是一個字串,用於指定構建查詢行或命令時使用的字串替代形式。
資料庫引數風格paramstyle有:
numeric            數值位置風格            WHERE name=:1
named            命名風格                WHERE name=:name
pyformat        Python字典printf()格式轉換    WHERE name=%(name)s
qmark            問號風格                WHERE name=?
format             ANSIC的printf()格式轉換        WHERE name=%s

(2).函式屬性
相容模組必須實現connect()函式,該函式建立並返回一個Connection物件。下面是connect()的引數。
user        使用者名稱
password    密碼
host        主機名
database    資料庫名
dsn            資料來源名
可以使用包含多個引數的字串(DSN)來傳遞資料庫連線資訊,也可以按照位置傳遞每個引數,或者是使用關鍵字引數的形式傳入。
使用DSN還是獨立引數主要基於所連線的系統。比如,如果你使用的是像ODBC或JDBC的API,則需要使用DSN;而如果你直接使用資料庫,則更傾向於使用獨立的登入引數。另一個使用獨立引數的原因是很多資料庫介面卡並沒有實現對DSN的支援。

(3).異常
異常同樣需要包含在相容的模組中。DB-API類如下:
Warning            警告異常基類
Error            錯誤異常基類
    InterfaceError    資料庫介面錯誤
    DatabaseError    資料庫錯誤
        DataError    處理資料時出現問題
        OperationError    資料庫操作執行期間出現錯誤
        IntegrityError    資料庫關係完整性錯誤
        InternalError    資料庫內部錯誤
        ProgrammingError    SQL命令執行失敗
        NotSupportedError    出現不支援的操作

5.Connection物件

應用與資料庫之間進行通訊需要建立資料庫連線。它是最基本的機制,只有通過資料庫連線才能把命令傳遞到伺服器,並得到返回的結果。當一個連線(或一個連線池)建立後,可以建立一個遊標,向資料庫傳送請求,然後從資料庫中接收回應。
Connection物件方法:不需要包含任何資料屬性,不過應當定義如下幾個方法:
close()            關閉資料庫連線
commit()        提交當前事務
rollback()        取消當前事務
cursor()        使用該連線建立並返回一個遊標或類遊標的物件
errorhandler()    作為給定連線的遊標的處理程式

6.Cursor物件

如果建立一個數據庫介面卡,還必須要實現cursor物件,原因:這樣,無論你將資料庫切換到支援到遊標的資料庫還是不支援遊標的資料庫,都能保持Python的一致性。
遊標物件最重要的屬性是execute*()和fetch*()方法,所有針對資料庫的服務請求都是通過它們執行的。arraysize資料屬性在為fetchmany()設定預設大小時非常有用。當然,在不需要時關閉遊標是個好主意,而如果你的資料庫系統支援儲存過程,可能會用到callproc()。
arraysize        使用fetchmany()方法時,一次取出的結果行數,預設為1。
connection        建立此遊標的連線(可選)
description        返回遊標活動狀態(7項元組):(name,type_code, display_size, display_size, internal_size, precision, scale, null_ok),只有name和type_code是必需的。
lastrowid        上次修改行的行ID
rowcount        上次execute*()方法處理或影響的行數
callproc(func[,args])        呼叫儲存過程
close()            關閉遊標
execute(op[,args])        執行資料庫查詢或命令
executemany(op, args)    類似execute()和map()的結合,為給定的所有引數準備並執行資料庫查詢或命令
fetchone()        獲取查詢結果的下一行
fetchmany([size=cursor.arraysize])        獲取查詢結果的下面size行
fetchall()        獲取查詢結果的所有(剩餘)行
__iter__()        為遊標建立迭代器物件
messages        遊標執行後從資料庫中獲得的訊息列表
next()            被迭代器用於獲取查詢結果的下一行(可選,類似fetchone(), 參考__iter__())
nextset()        移動到下一個結果集合(如果支援)
rownumber        當前結果集中游標的索引(以行為單位,從0開始,可選)
 

7.專案實戰

在下面的專案中,我將從蘇州吳中區行政處罰和行政許可公示中:爬取資料,並將資料儲存在工作站上,當然,如果覺得資料插入的部分比較麻煩,可以刪除掉一些欄位。

以下為專案程式碼:

爬取行政處罰公示資訊大致分為兩步,

首先先爬取各處罰的名稱和url,

然後根據這些url爬取詳情頁的資訊。

# http://61.155.216.26:8011/publicity/gs/queryHistory post
import re
import requests
from bs4 import BeautifulSoup
import common
import chardet
import time
import json
import pymysql
from punishment.functions import normalize_string

conn = pymysql.connect(host='192.168.1.204', db='wuzhong', user='root', password='123')
cursor = conn.cursor()


def get_latest_time():
    sql = 'SELECT MAX(cf_date) FROM punishment'
    cursor.execute(sql)
    result = cursor.fetchone()
    if result:
        return result[0]
    else:
        return 0


url_name_times = []
urls = []
for index in range(1, 30):
    print('正在爬取第{}頁資料'.format(index))
    url = 'http://61.155.216.26:8011/publicity/gs/queryHistory'
    headers = common.get_user_agent()
    headers['Cookie'] = 'JSESSIONID=5FE868E99AC3C2D14DFE457C0469CEB5'
    print('index:', index)
    post_data = {
        'endTime': "",
        'gsType': '1',
        'page': index,
        'startTime': "",
        'userId': "",
        'xkrName': "",
        'userId': '001018',
    }

    response = requests.post(url, headers=headers, data=post_data, timeout=10)
    if response.status_code == 200:
        response.encoding = 'utf-8'
        soup = BeautifulSoup(response.text, 'lxml')
        dicts = json.loads(response.text)
        datas_list = dicts['obj']['list']
        print(datas_list)
        for data in datas_list:
            company = normalize_string(data.get('cfXdrMc'))
            punish_title = data.get('cfAjmc')
            msecs = data.get('cfJdrq')
            punish_time = time.strftime("%Y-%m-%d", time.localtime(msecs/1000))
            latest_time = str(get_latest_time())
            id = data.get('id')
            page_url = 'http://61.155.216.26:8011/publicity/xzcf/queryDetail?id={}&xzType=1'.format(id)
            if page_url not in urls and punish_time > latest_time:
                url_name_times.append((page_url, punish_title))
                urls.append(page_url)
    print(len(url_name_times))
    time.sleep(3)


try:
    for each in url_name_times:
        sql = 'INSERT INTO punishment(url, title, flag, province_id, city_id)' \
              ' VALUES("%s", "%s", "%s", "%s", "%s")' % (each[0], each[1], 0, 10, 79)
        cursor.execute(query=sql)
        conn.commit()
except Exception as e:
    print(e)
    conn.rollback()


# -------------------之後將flag為0的url進行更新-----------------------


def get_id_urls(cursor):
    try:
        sql = 'SELECT id, url FROM punishment WHERE flag = 0;'
        cursor.execute(sql)
        result = cursor.fetchall()
    except Exception as e:
        print(e)

    return result


def save_data(cursor, id, data):
    update_time = time.strftime("%Y-%m-%d", time.localtime())
    sql = 'UPDATE punishment' \
          ' SET cf_number = "%s", reason="%s", basis="%s", category="%s", xz_name="%s", ' \
          'credit_number="%s", legal_person_name="%s", id_card="%s", cf_result="%s", update_time="%s",' \
          'cf_date="%s", cf_department="%s", note="%s", source="%s", flag="%s"' \
          'WHERE id = "%s"' % (
        data.get('行政處罰決定書文號'), data.get('處罰事由'), data.get('處罰依據'), data.get('處罰類別1') + " " + data.get('處罰類別2'), data.get('行政相對人名稱'),
        data.get('統一社會信用程式碼'), data.get('法定代表人姓名'), data.get('居民身份證號'), data.get('處罰結果'), update_time,
        data.get('處罰決定日期').replace('年', '-').replace('月', '-').replace('日', ''), data.get('處罰機關'), data.get('備註'), '吳中區信用資訊公示', 1,
        id,
    )
    try:
        cursor.execute(sql)
        conn.commit()
        print('資料庫插入成功!')
    except Exception as e:
        print(e)
        conn.rollback()


id_urls = get_id_urls(cursor)
for each in id_urls:
    url = each[1]
    id = each[0]
    print(url)
    response = requests.get(url, headers=common.get_user_agent(), timeout=10)
    response.encoding = chardet.detect(response.content)['encoding']
    soup = BeautifulSoup(response.text, 'lxml')
    table = soup.select('#home1DetailTable')[0]
    trs = table.find_all(name='tr')

    data = {}
    for tr in trs:
        tds = tr.find_all(name='td')
        data[normalize_string(tds[0].text)] = normalize_string(tds[1].text)

    save_data(cursor, id, data)
    time.sleep(3)

cursor.close()
conn.close()