1. 程式人生 > >進程、線程、協程

進程、線程、協程

等於 pool 信息 global 沖突 join 時間輪 擁有 multi

進程
什麽是進程?
程序就是一堆放在磁盤上的代碼,進程是一段程序的運行過程
正規點說,進程一般由程序、數據集、進程控制塊三部分組成

什麽進程切換?
進程切換是,一個正在運行的進程被中斷,操作系統指定另一個進程為運行態,並把CPU執行權交給這個進程。由操作系統控制調度,如單線程遇到io或執行時間過長就會被迫交出cpu執行權限
1 )如果沒有遇到io操作,操作系統會按照一個固定的時間段去切換進程,幾百毫秒(時間輪詢切換)。進程每一次切換的時間都是微不足道的,但要命的是其切換次數。
2)遇到io操作,則主動交出cpu使用權。
3)最後是遇到一個更高優先級的進程,被操作系統的調度算法強行換下來

進程並發(切換)如何實現?

進程並發的實現在於,硬件中斷一個正在運行的進程,把此時進程運行的所有狀態保存下來,為此,操作系統維護一張表格,即進程表(process table),
每個進程占用一個進程表項(這些表項也稱為進程控制塊)該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配狀況、所有打開文件的狀態、帳號和調度信息,以及其他在進程由運行態轉為就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啟動時,就像從未被中斷過一樣。

擴展知識:進程切換保存的上下文信息存到哪裏去?別跟我說內存,太慢了

像進程切換所要保存的數據都會存儲到寄存器上。
1.寄存器的產生原因是由於CPU運行速度,遠遠快於從內存讀取數據,程序慢就是慢取這些指令,而不是運行這些指令。
2.寄存器緊貼cpu,如今大概有1k左右大小。設計出來的初衷是讀取速度要配合cpu的高速運轉速度,即讀取速度較快
3.寄存器存儲非常關鍵的數據,並且要求取數據速度快的數據,例如常存儲進程切換的狀態

進程有哪些狀態?
進程的三種狀態
運行、阻塞(掛起)、就緒
其中阻塞和就緒的差別就是有沒有被操作系統考慮調度切換。阻塞就一直晾在一邊,不考慮

什麽是並發與並行?
並發:是指系統有處理多個任務(動作)的能力
   只要1核的cpu,不可能存在兩個進程同時運行,而不用任何切換機制。
並行,是指系統具有‘同時’處理多個任務的能力
   這是多核cpu特有的,可以並行少於等於其核數的進程

進程相關代碼

Process([group [, target [, name [, args [, kwargs]]]]])

  group # 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target#
要執行的方法;   name #進程名;   args/kwargs # 要傳入方法的參數。 實例方法:   is_alive() #返回進程是否在運行。   join([timeout]) #阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。   start()#進程準備就緒,等待CPU調度   run() #strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。   terminate()#不管任務是否完成,立即停止工作進程   p.getpid()#當前進程id   p.getppid() #父進程id 屬性:   daemon#和線程的setDeamon功能一樣,不過變了屬性,設置守護進程的屬性   name#進程名字。   pid  #進程號

進程鎖
進程爭同一個資源時,若要求要有先後順序,則鎖住相應的資源相關代碼
例如:鎖住屏幕,則讓print上鎖

from multiprocessing import Process,Lock

def prt(i,lock):
    lock.acquire()
    print(love is ,i)
    lock.release()

if __name__ ==__main__:
    l=Lock()
    for i in range(1,11):
        p = Process(target=prt,args=(i,l))
        p.start()


進程池
需求:比如有100塊磚,自己一個人搬,需要100趟。當然也可以開100線程!,但是無論開100個線程還是進程,開銷都很大。進程更是,相當於開100個內存獨立空間
所以最好的辦法是一般花點雇幾個人幫你搬,並且搬完一趟之後繼續搬,直至100塊磚搬完

from  multiprocessing import Process,Pool
import time,os

def Foo(i):
    time.sleep(1)
    print(i)
    return i+100

def Bar(arg):

    print(os.getpid())
    print(os.getppid())
    print(logger:,arg)

pool = Pool(5) #如果不傳參,默認為電腦核數
Bar(1)
for i in range(10):
    #pool.apply(func=Foo, args=(i,))  #同步
    #pool.apply_async(func=Foo, args=(i,)) #異步
    pool.apply_async(func=Foo, args=(i,),callback=Bar) #回調函數,要有且僅有一個參數。進程池有個回調函數參數,回調函數是子進程執行完一次任務後,由主進程執行的函數。

pool.close()
pool.join() #註意一下,這裏格式是死的,只要用進程詞,close()後再join()一下,兩者去其一,要麽報錯要麽不執行
print(end)

進程通信
1)進程隊列
2)管道(類似於socket)
3)Manager(數據集共享)
Queue和Pipe只是實現了數據交互,卻沒有實現數據共享,即一個進程可以修改另一進程的數據

from multiprocess import Queue,Pipe,Manager
進程隊列
q = multiprocess.Queue()

管道,類似於socket的用法
son_pipe,parent_pipe = Pipe()
son_pipe.send([love])
data=parent_pip.recv()
son_pipe.close()

Manager
with Manager() as manager
    d=manager.dict()
    l=manager.list(range(6))
#使用時註意:是將d,l等對象當做進程函數參數傳過去




線程
線程的出現是為了降低上下文切換的消耗,提高系統的並發性,並突破一個進程只能幹一樣事的缺陷,使到進程內並發成為可能。
一個程序運行起來,最少有一個進程。一個進程裏面至少一個線程。
進程相當於為線程提供資源的容器,有了線程概念以後,真正切換和跑的是線程,線程無法脫離進程而存在,因為線程要共享進程裏面的內存。

線程和進程的區別?
1)每個進程都有自己獨立的內存空間。例如QQ的內存空間和360的內存空間沒有任何關系。
2)一個進程中的線程可以共享該進程中的所有資源

什麽是多線程?多線程對比多進程有什麽好處?

由於多線程的運行與多進程的運行類似,是cpu在多個線程之間的快速切換。
進一步說,python的線程屬於內核級別的(雖說是這樣,但是由於GIL存在,表現得跟用戶級線程差不多)
1)多線程共享一個進程的地址空間,數據共享更加容易,並且當處理的操作基本一樣時,可以節省內存
  如果一個數據集在磁盤上,使用多進程讀入,則會出現多個相同的內存,如果采用多線程,則是多個線程共享一份讀入的數據集內存
2)線程比進程更輕量級,線程比進程更容易創建可撤銷,在許多操作系統中,
  創建一個線程比創建一個進程要快10-100倍,在有大量線程需要動態和快速修改時,這一特性很有用
3)線程切換的消耗更小
  多線程也是要做切換的,不過由於其單位更小,掛起恢復不大。
  而進程牽扯到獨立的內存空間等其他,掛起恢復消耗的資源大

GIL
1)GIL的存在使得python不能利用多核cpu,因為GIL鎖cpython解釋器上。
2)在解釋器層面,解釋器線程鎖(GIL)保證了內存管理的線程安全,例如保證解釋器垃圾線程回收時不沖突
3)相比java的jvm,JVM 沒有 GIL ,但是也有鎖,只是鎖的粒度細小得多,一般不會整個線程給你鎖上

進程與線程有什麽區別?

進程是最小的資源單位,操作系統分資源只能分到進程
進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位

線程最小執行單位,CPU調度分派的基本單位。(這個調度可以理解為,一段時間校長代表學校,一段時間由副校長代表學校)
線程自己基本不擁有系統資源,嚴格來說,只擁有一點運行起來必不可少的資源(程序計數器,棧,寄存器)
程序計數器是類似於看書的頁碼功能,也是一個寄存器,保存一個地址

守護進程和守護線程是什麽?
其實兩種概念差不多,旨在同時退出
守護線程:子線程會守護著主線程。即主線程和守護線程同時退出
若主線程退出,守護線程也退出,即使還沒運行完。
若守護線程執行完了,守護線程也不退,直到主線程也執行完一同退出。

守護進程:主進程代碼運行結束,守護進程隨即終止,不過守護進程內無法再開啟子進程,否則拋出異常(未驗證)
註意:操作系統層面的守護進程是一直在後臺運行的進程

線程相關代碼

t.join()等用於waitforsingleobject,在子線程執行完之前,父線程會一直阻塞,註意只是父線程。
t.setDaemeon(True) 設為守護線程,在t.start()之前調用
t.start(),因為線程一被創建默認被掛起,這個是用來啟動的(其實也沒有真正跑起來,只是讓線程處於就緒狀態)
t.isAlive() 判斷線程是否還在運行。

t.getname、t.setname,設置線程名字,如果不設的話有默認的名字
threading.active_count() 此時此刻到底有幾個線程在活動。 
threading.current_thread() 自己所在線程的名字

#常用:主線程等待子線程結束
#等待所有線程運行完畢
def wait_allcomplete(self):
    for item in self.threads:
        if item.isAlive():
            item.join()

通過繼承的方式運行線程:
#總結來說:繼承threading.Thread ,覆寫run(self),創建對象後t.start()
import threading
import time

class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)  # 等同於 super().__init__()
        self.num = num

    def run(self):#定義每個線程要運行的函數
        print("running on number:%s" %self.num)
        time.sleep(3)

if __name__ == __main__:

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
    
    print("ending......")

線程安全

threadlocal
  threading對象裏面的不同屬性用於被不同線程讀寫(相當於美化版的dict,不用dict[thread_id]那麽狼狽), 且互相不幹擾。保證各個子線程的數據安全。
例如:Threading.local最大的用處就是HTTP請求時綁定用戶的信息,這樣每個用戶線程可以非常方便訪問各自的資源而互不幹擾

import threading

# 創建全局ThreadLocal對象:
local_school = threading.local()

def process_student():
    print Hello, %s (in %s) % (local_school.student, threading.current_thread().name)

def process_thread(name):
    # 綁定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=(Alice,), name=Thread-A)
t2 = threading.Thread(target= process_thread, args=(Bob,), name=Thread-B)
t1.start()
t2.start()
t1.join()
t2.join()




同步鎖(互斥鎖)

數據安全問題是線程切換引起的。
那麽只要告訴操作系統,進入某代碼塊時跟操作系統說好了讓cpu不能切換即可。即加鎖,加鎖的代碼塊會變為串行,同一時間只有你在運行,其他運行到這部分會阻塞。

例如:我們執行三步過程中,跟操作系統說好了讓cpu不能切換。

lock.acquie()
temp=num #這一部分代碼變串行了
time.sleep(0.1)
temp=num-1
lock.release()

遞歸鎖
可以有效解決死鎖,死鎖是線程間誰都不願意釋放自己的資源(鎖),導致整體停滯不前。
常見於不同名鎖的遞歸操作。
在鎖裏面在上一次相同的鎖,因為鎖內部維護著一個計數器。只有內部計數器為0時才能被線程請求
直到一個線程所有的acquire(計數+1)都被release(計數-1),其他的線程才能獲得資源。

a=100
b = 0
lock =  threading.local()
lock.acquire()
lock.acquire()
a-=1
b+=2
lock.release()
lock.release()


同步條件
詳細解析
同步條件適用於讓不同種類的線程按次序執行。
例如:老板類線程先執行到一半之後,工人類線程才解開阻塞。

event =threading.Event()  #一個event對象可以用到多個線程裏面去
event.wait()  #如果沒有set設定的條件,該函數會一直阻塞,有的話無視該函數
event.set()  #設置同步條件
event.clear()  #清除同步條件
event.isSet() #檢查標誌是否有設定

信號量
信號量用來控制線程並發數的,即控制處理同一個被信號量鎖住的代碼塊的線程進入數目。
需要註意的是:進入的線程處理代碼塊是異步的。要考慮線程安全。

num = 100
def a():
    if semaphore.acquire():  ###
        global num
        num1=num
        time.sleep(0.1)
        num=num1-1
        semaphore.release()####
li =[]
if __name__ == __main__:

    semaphore=threading.Semaphore(5) ### 最多只能由五個線程同時處理處理
    for i in range(100):
        t = threading.Thread(target=a)
        t.start()
        li.append(t)
    for i in li:
        i.join()
    print(num)

隊列
隊列是線程安全的數據結構,操作它不需要加鎖

import queue
q = queue.Queue() #可以傳參,表示隊列大小
q.put(xx) #若隊列滿了,該函數會阻塞線程,若block參數為False則直接報錯
q.get()  #若隊列沒數據,則該函數會阻塞線程,若block參數為False則直接報錯
q.qsize()
q.empty() #是否為空?
q.full()    #是否滿了?
q.put_notwait()  #不阻塞滿直接報錯
q.get_nowait()    

q.task_done() #在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() #接收到由q.task_done函數發送的信號(隊列為空)後才繼續走,否則就一直阻塞

還有幾個不同種類的隊列:

q =queue.LifoQueue()   #先進後出
 q=queue.PriorityQueue()   #優先級隊列
#優先級高的先出來,壓進去的時候帶優先級,數據都是[優先級,數據]
#例如:q.put([10,‘info‘]),10是優先級

隊列應用:生產者與消費者
生產者與消費者模型是將同步轉化為異步的好例子。
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print(Producer %s has produced %s baozi.. %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print(\033[32;1mConsumer %s has eat %s baozi...\033[0m %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=(A,))
c1 = threading.Thread(target=Consumer, args=(B,))
# c2 = threading.Thread(target=Consumer, args=(‘C‘,))
# c3 = threading.Thread(target=Consumer, args=(‘D‘,))
p1.start()
c1.start()
# c2.start()
# c3.start()



協程

協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。、
協程是協作式的,而線程和進程都是搶占式的。
協程是程序員自己規定調度,即在哪裏切換,切換之前保護起來,哪裏換回來。目的就是要使得cpu不能閑置

協程的原理是什麽?
 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率,!!!非io操作的切換會降低效率。協程面對的是主要也是io操作
  具體地說,對於單線程下,我們不可避免程序中出現io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該線程能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統,讓其看到該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給我們的線程。協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。

協程對比線程優點是什麽?
優點
1)協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
2)單線程內就可以實現並發的效果,最大限度地利用cpu
3)修改共享數據不需加鎖

缺點
1)協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
2)協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程(調用time.sleep可是,所以打monkey補丁)。
3) 因為協程在線程裏面,所以只用協程也不能用多核,要配合多進程才行。


協程代碼
如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制)

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的
g2=gevent.spawn(func2)
gevent.joinall([g1,g2])
g1.join() #等待g1結束
g2.join() #等待g2結束
#或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值

進程、線程、協程