1. 程式人生 > >程序和執行緒——Python中的實現

程序和執行緒——Python中的實現

一、程序(Process

    程序是一個實體。每一個程序都有它自己的地址空間,一般情況下,包括文字區域(text region)、資料區域(data region)和堆疊(stack region)。文字區域儲存處理器執行的程式碼;資料區域儲存變數和程序執行期間使用的動態分配的記憶體;堆疊區域儲存著活動過程呼叫的指令和本地變數。

    程序是一個“執行中的程式”。程式是一個沒有生命的實體,只有處理器賦予程式生命時,它才能成為一個活動的實體,我們稱其為程序。

在Python中呼叫第三方模組multiprocessing,其過程就三步:

# 第一步:初始化multiprocess
# 第二步:start
# 第三步:join

import time
import multiprocessing
def process_run(params):
    time.sleep(2)
    print(params)
    time.sleep(2)
    print(params)
    time.sleep(2)
    print(params)


if __name__ == '__main__':
    params = 'old food sleepy is going to sleep'
    
    p = multiprocessing.Process(target=process_run, args=(params,))   # 例項化物件
    p.start()   # 開啟程序
    print('lyb is lyb')
    p.join()   # 關閉
    print('lyb is lyb')

程序池:

    使用multiprocessing模組提供的Pool方法.

    初始化Pool時,可以指定一個最大程序數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來執行,請看下面的例項:

import multiprocessing
import time

def process_run(params):
    time.sleep(2)
    print('tomato' + params)
    time.sleep(2)
if __name__ == '__main__':
    # 第一步
    pool = multiprocessing.Pool(2)
    for i in range(0, 10):
        pool.apply_async(process_run, ('My English is very good' + str(i),))
    # 第二步
    pool.close()
    # 第三步
    pool.join()

二、執行緒(Thread)

    通常在一個程序中可以包含若干個執行緒,當然一個程序中至少有一個執行緒,不然沒有存在的意義。執行緒可以利用程序所擁有的資源,在引入執行緒的作業系統中,通常都是把程序作為分配資源的基本單位,而把執行緒作為獨立執行和獨立排程的基本單位,由於執行緒比程序更小,基本上不擁有系統資源,故對它的排程所付出的開銷就會小得多,能更高效的提高系統多個程式間併發執行的程度。

    執行緒有自己的堆疊和區域性變數,但執行緒之間沒有單獨的地址空間,一個執行緒死掉就等於整個程序死掉所以多程序的程式要比多執行緒的程式健壯。

在Python中開啟執行緒用模組threading裡面的Thread類,也是三步走:

# 第一步:建立 threading的類
# 第一步:執行start
# 第一步:等待結束 join
import threading
import time

def th_run(params):
    time.sleep(2)
    print(params)
    time.sleep(2)

if __name__ =='__main__':
    params = 'black potato is a good boy'
    # 建立Thread類, 傳入兩個引數
    t = threading.Thread(target=th_run, args=(params,))
    # 執行
    t.start()
    print('sunyang is good at swimming')
    time.sleep(2)
    print('sunyang is good at swimming')
    time.sleep(2)
    #等待結束
    t.join()

執行緒池:

Python中執行緒池的建立用第三方庫threadpool裡面的ThreadPool類:(感覺不怎麼常用)

import threadpool
import time

def thread_run(params, cuolechileni):
    time.sleep(2)
    print('tomato' + params + cuolechileni)
    time.sleep(2)

# 有多少條thread
pool = threadpool.ThreadPool(2)

params = [(['我用java','aaa'], None),(['我用php', 'bbb'],None), (['我學區塊鏈','ccc'], None), (['我學大資料', 'ddd'], None)]

# 建立函式和引數的類
requests = threadpool.makeRequests(thread_run, params)

for req in requests:
    pool.putRequest(req)
    print('lala 我們在這了,' + str(req))

pool.wait()
print('我才是最後的boss')

下面給出一個程序、程序池的應用例項:(爬取喜馬拉雅音訊檔案),具體分析略

import requests
from lxml import etree
from urllib import request
import time, os
from multiprocessing import Queue, Pool, Process


def paqu(q):
    page = 1
    while page <= 4:
        url1 = 'https://www.ximalaya.com/xiangsheng/9742789/p{}/'.format(page)
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'}
        
        response = requests.get(url1, headers=headers)
        res_html = response.text
        
        res_lxml = etree.HTML(res_html)
        res_href = res_lxml.xpath('//ul[@class="dOi2"]/li/div[2]/a/@href')  # ['/xiangsheng/9742789/46430555', '/x0
        # print(res_href)
        for href1 in res_href:
            href_one = request.urljoin(url1, href1)  # https://www.ximalaya.com/xiangsheng/9742789/46105777
            # print(href_one)
            
            href_last = href_one.split('/')[-1]  # 46430555
            # print(href_last)
            url2 = 'https://www.ximalaya.com/revision/play/tracks?trackIds={}'.format(href_last)
            response_one = requests.get(url2, headers=headers)
            json_dict = response_one.json()
            # print(json_dict)
            
            src_str = json_dict['data']['tracksForAudioPlay'][0]['src']
            # print(src_str)
            if os.path.exists('Download_mp3'):
                pass
            else:
                os.mkdir('Download_mp3')
                
            trackName = 'Download_mp3/' + json_dict['data']['tracksForAudioPlay'][0]['trackName']
            # print(trackName)
            
            q.put((src_str, trackName))   # ----------------
            
            # request.urlretrieve(src_str, trackName + '.m4a')
            print(page)
        print('我是訊息佇列:', q.qsize())
        page += 1


def write_mp3(mp3_url_referer_url):
    time.sleep(5)
    (src_str, trackName) = mp3_url_referer_url
    print('開始下載', src_str, trackName)

    # request.urlretrieve(src_str, trackName + '.m4a')
    
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=paqu, args=(q,))
    p.start()
    

    # 開啟的程序數量
    download_pool = Pool(5)
    while True:
        try:
            mp3_url_referer_url = q.get()   # 可以設定超時時間
            print(mp3_url_referer_url)
            download_pool.apply_async(write_mp3, (mp3_url_referer_url,))
        except:
            
            print('Queue is Empty!')
            break
    download_pool.close()
    download_pool.join()
    # 程式最後退出前進行join
    p.join()