1. 程式人生 > >python之進程

python之進程

arch elf 鎖定 什麽 pid utf-8 信號 lock 問題

1、開啟進程的兩種方式

方式一

def task(name):
    print(" %s start..." % name)


if __name__ == ‘__main__‘:
    p = Process(target=task, args=("sb",))
    p.start()

方式二

class Piao(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(‘%s start..‘ % self.name)


if __name__ == ‘__main__‘:
    p1 = Piao("sb")
    p1.start()

terminate和is_alive

from multiprocessing import Process
import time


def task(name):
    time.sleep(1)
    print("%s done.." % name)
  
    
if __name__ == ‘__main__‘:
    p1 = Process(target=task, args=("sb",))
    p1.start()
    p1.terminate()  # 發送關閉進程命令
    print(p1.is_alive())  # 查看進程是否活動
    # True
    print("主")
    time.sleep(1)

name與pid

from multiprocessing import Process
import os


def task(name):
    print("%s start..." % name)
   
    
if __name__ == ‘__main__‘:
    p = Process(target=task, args=("sb",), name="子進程1")  # 可以用關鍵參數來指定進程名
    p.start()
    print(p.name, p.pid, os.getppid())  # p.ppid 報錯
    print(os.getpid())  # p.pid==os.getpid()

守護進程

一、守護進程在主進程執行結束終止

二、守護進程內無法開啟子進程。

from multiprocessing import Process

import time


def foo():  # 不執行
    print(123)
    time.sleep(1)
    print("end123")


def bar():
    print(456)
    time.sleep(3)
    print("end456")


if __name__ == ‘__main__‘:
    p1 = Process(target=foo)
    p2 = Process(target=bar)

    p1.daemon = True
    p1.start()
    p2.start()
    print("main-------")

互斥鎖

前戲:進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或者打開同一個打印終端,共享帶來競爭。

互斥鎖:互斥鎖的意思就是互相排斥。保證了數據安全不錯亂。

from multiprocessing import Process
from multiprocessing import Lock
import time


def task(name, mutex):
    mutex.acquire()
    print("%s 1" % name)
    time.sleep(1)
    print("%s 2" % name)
    mutex.release()
   
    
if __name__ == ‘__main__‘:
    mutex = Lock()
    for i in range(2):
        p = Process(target=task, args=("進程%s" % i, mutex))
        p.start()

互斥鎖和join的區別

互斥鎖是讓加鎖部分變成串性,join是讓整段代碼都變成串行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__:JasonLIN
from multiprocessing import Process, Lock
import time
import json


def search(name):
    time.sleep(1)
    data = json.load(open("data.txt", "r"))
    print("<%s>剩余票數[%s]" % (name, data["count"]))
        
        
def buy(name):
    data = json.load(open("data.txt", "r"))
    if int(data["count"]) > 0:
        data["count"] -= 1
        time.sleep(2)
        json.dump(data, open("data.txt", "w"))
        print("%s 購票成功" % name)
  
    
def task(name, mutex):
    search(name)
    mutex.acquire()
    buy(name)
    mutex.release()
    
    
if __name__ == ‘__main__‘:
    mutex = Lock()
    for i in range(10):
        p = Process(target=task, args=("路人%s" % i, mutex))
        p.start()

join方法,search和buy都變成串行,效率更低。

from multiprocessing import Process
import time
import json


def search(name):
    data = json.load(open("data.txt", "r"))
    time.sleep(1)
    print("<%s>剩余票數[%s]" % (name, data["count"]))


def buy(name):
    data = json.load(open("data.txt", "r"))
    if int(data["count"]) > 0:
        data["count"] -= 1
        time.sleep(2)
        json.dump(data, open("data.txt", "w"))
        print("%s 購票成功" % name)
    

def task(name):
    search(name)
    buy(name)


if __name__ == ‘__main__‘:
 
    for i in range(10):
        p = Process(target=task, args=("路人%s" % i,))
        p.start()
        p.join()

隊列

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。

創建隊列的類(底層就是以管道和鎖定的方式實現)

Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞

Queue([maxsize]):是隊列中允許最大項數,省略則無大小限制
q.put方法用以插入數據到隊列中。
q.get方法可以從隊列讀取並且刪除一個元素。

代碼實例

from multiprocessing import Process,Queue

q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #滿了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了

生產者消費者模型

為什麽要使用生產者消費者模型?

生產者指的是生產數據的任務,消費者指的是處理數據的任務,在並發編程中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者和消費者模式?

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊。

這個阻塞隊列就是用來給生產者和消費者解耦的

轉自:
https://www.luffycity.com/python-book/di-7-zhang-bing-fa-bian-cheng/72-bing-fa-bian-cheng-zhi-duo-jin-cheng/727-sheng-chan-zhe-xiao-fei-zhe-mo-xing.html

from multiprocessing import Process, Queue
import time
import random


def producer(q):
    for i in range(3):
        res = "包子%s" % i
        time.sleep(0.5)
        print("生產了%s" % res)
        q.put(res)
    

def consume(q):
    while True:
        res = q.get()
        if not res:
            break
        time.sleep(random.randint(1, 3))
        print("吃了%s" % res)
    
    
if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))
    
    c1 = Process(target=consume, args=(q,))
    c2 = Process(target=consume, args=(q,))
    
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print("zhu")from multiprocessing import Process, Queue
import time
import random


def producer(q):
    for i in range(3):
        res = "包子%s" % i
        time.sleep(0.5)
        print("生產了%s" % res)
        q.put(res)
    

def consume(q):
    while True:
        res = q.get()
        if not res:
            break
        time.sleep(random.randint(1, 3))
        print("吃了%s" % res)
    
    
if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))
    
    c1 = Process(target=consume, args=(q,))
    c2 = Process(target=consume, args=(q,))
    
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print("zhu")
JoinableQueue([maxsize])
from multiprocessing import Process, JoinableQueue
import time
import random


def producer(q):
    for i in range(3):
        res = "包子%s" % i
        time.sleep(0.5)
        print("生產了%s" % res)
        q.put(res)
    q.join()  # 等消費者把所有數據取走之後,生產者才結束
    

def consume(q):
    while True:
        res = q.get()
        if not res:
            break
        time.sleep(random.randint(1, 3))
        print("吃了%s" % res)
        q.task_done()  # 發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢


if __name__ == ‘__main__‘:
    q = JoinableQueue()
    
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))
    
    c1 = Process(target=consume, args=(q,))
    c2 = Process(target=consume, args=(q,))
    c1.daemon = True
    c2.daemon = True
    
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    
    p1.join()
    p2.join()
    p3.join()
    
    print("zhu")

python之進程