1. 程式人生 > >並發編程 進程基礎

並發編程 進程基礎

管道 not 應該 += input 任務 相同 ipc ces

操作系統

  • 多道 、分時、實時

  • 同步異步

    • 同步:一件事情完成後再做另一件事
    • 異步:同時做多件事
  • 阻塞和非阻塞

    • 阻塞:recv,accept,recvfrom
      • 會讓整個進程進入阻塞隊列
    • 非阻塞:進程只會在就緒和 運行狀態中切換
  • 進程三狀態:就緒 運行 阻塞

  • 並發並行

    • 並發是包含並行的
    • 並發:宏觀上多個程序同時運行,實際是同一時間只運運行了一次
    • 並行:微觀上多個程序同時運行
  • 子進程和主進程

    • pid ppid
  • 多並發的tcp服務端

    • import socket
      from multiprocessing import Process
      def communicate(conn):
          while True:
              conn.send("hello".encode("utf-8"))
              print(conn.recv(1024))
      if __name__ == ‘__main__‘:
          sk = socket.socket()
          sk.bind((‘127.0.0.1‘,9001))
          sk.listen()
          while True:
              conn,addr = sk.accept()
              Process(target=communicate,args=(conn,)).start()
      import socket
      sk = socket.socket()
      sk.connect((‘127.0.0.1‘,9001))
      while True:
          print(sk.recv(1024))
          mv = input(">>>>>>>>>>:").strip()
          sk.send(mv.encode("utf-8"))
  • 進程是操作系統中最小的資源分配單位

  • 進程

    • multiprocessing
    • multiprocessing.Process
    • 如何開啟一個子進程
  • Process 開啟子進程

    • 第二種開啟子進程的方式

      • def func(index):
            time.sleep(random.random())
            print(‘第%s個郵件已經發送完畢‘%index)
        if __name__ == ‘__main__‘:
            p_lst = []
            for i in range(10):
                p = Process(target=func,args=(i,))
                p.start()
                p_lst.append(p)
            for p in p_lst:
                p.join()
            print(‘全部發送完畢‘)
    • join控制子進程

      • #子進程同步,執行完畢後才執行主程序後面的程序
        # import time
        # from multiprocessing import Process
        # def f(name):
        #     print("hello",name)
        #     time.sleep(1)
        # if __name__ == ‘__main__‘:
        #     p_list = []
        #     for i in range(5):
        #         p = Process(target=f,args=(i,))
        #         p.start()
        #         p_list.append(p)
        #         p.join()      #阻塞,
        #     print("主進程執行")
        
        #子程序異步執行,執行完了阻塞結束
        import time
        from multiprocessing import Process
        def f(name):
            print("hello",name)
            time.sleep(1)
        if __name__ == ‘__main__‘:
            p_list = []
            for i in range(10):
                p = Process(target=f,args=(i,))
                p.start()
                p_list.append(p)
            for i in p_list:
                i.join()
            print("主進程執行完畢")
    • 守護進程 daemon

      • 守護進程會隨著主進程代碼執行完畢而結束

      • 守護進程內無法再開啟子進程,否則會拋出異常

      • 註意:進程之間是相互獨立的,主進程代碼運行結束,守護進程也會隨即終止

      • import time
        from multiprocessing import Process
        def func1():
            count = 1
            while True:
                time.sleep(0.5)
                print(count*"*")
                count += 1
        def func2():
            print("func strat")
            time.sleep(5)
            print("func2 end")
        if __name__ == ‘__main__‘:
            p1 = Process(target=func1)
            p1.daemon = True      #定義為守護進程
            p1.start()          #執行
            Process(target=func2).start()
            time.sleep(3)
            print("主進程")
        #輸出
        # func strat
        # *
        # **
        # ***
        # ****
        # *****
        # 主進程
        # func2 end

        如果主進程執行完畢那麽守護進程也會結束,但是其他子進程如果沒執行完還會繼續執行

    • 作業:在進程之間保證數據安全性

    • from multiprocessing import Process,Lock

    • lock= Lock()實例對象

    • lock.acquire() 取鑰匙開門

    • lock.release() 關門放鑰匙

    • 例題 模擬搶票

    • import time
      import json
      from multiprocessing import Process,Lock
      def search(person):         #查票
          with open("ticket") as f:   #文件中保存著一個字典{"count":4}
              dic = json.load(f)   #讀出文件中的字典
          time.sleep(0.2)
          print("%s查詢余票"%person,dic["count"])
      def get_ticket(person):         #搶票
          with open("ticket") as f:
              dic = json.load(f)
          time.sleep(0.2)             #模擬延遲
          if dic["count"] >0:
              print("%s買到票了"%person)
              dic["count"] -= 1
              time.sleep(0.2)
              with open("ticket","w") as f:
                  json.dump(dic,f)    #寫回文件
          else:
              print("%s沒買到票"%person)
      def ticket(person,lock):
          search(person)
          lock.acquire()      #開門,一次只能進一個
          get_ticket(person)
          lock.release()      #關門
      if __name__ == ‘__main__‘:
          lock = Lock()
          for i in range(10):
              p = Process(target=ticket,args=("person%s"%i,lock))
              p.start()

      為了保證數據的安全,在異步的情況下,多個進程又可能同時修改同一份數據的時候,需要給這個數據上鎖

    • 加鎖的作用

      • 降低了程序的效率,讓原來能夠同時執行的代碼編程順序執行了,異步變同步的過程,保證了數據的安全
  • 同步控制

    • import time
      from multiprocessing import Process,Lock
      def func(num,lock):
          time.sleep(1)
          print("異步執行",num)
          lock.acquire()
          time.sleep(0.5)
          print("同步執行",num)
          lock.release()      #同步執行是依次執行,間隔0.5秒
      if __name__ == ‘__main__‘:
          lock = Lock()
          for i in range(10):
              p = Process(target=func,args=(i,lock))
              p.start()
  • 信號量 機制:計數器+鎖實現的 Semaphore

    • 主程序控制一定數量的子程序同時執行,這些數量的子程序執行完一個就會有下一個子程序補充進來

    • import time
      import random
      from multiprocessing import Process,Semaphore
      def ktv(person,sem):
          sem.acquire()       #進
          print("%s走進KTV"%person)
          time.sleep(random.randint(1,3))     #隨機延遲一到三秒
          print("%s走出ktv"%person)
          sem.release()       #出
      if __name__ == ‘__main__‘:
          sem = Semaphore(4)      #信號量為4,默認為1
          for i in range(10):
              Process(target=ktv,args=(i,sem)).start()
  • 事件 Event

    • 阻塞事件 wait() 方法

      • wait 是否阻塞是看event對象你不的一個屬性
    • 控制這個屬性的值

      • set()將這個屬性的值改成True

      • clear() 將這個屬性的值改成False

      • is_set() 判斷當前屬性是否為True

      • #模擬紅綠燈,只有全部車通過後才停止
        import time
        import random
        from multiprocessing import Process,Event
        def traffic_light(e):
            print("紅燈亮")
            while True:
                if e.is_set():
                    time.sleep(2)
                    print("紅燈亮")
                    e.clear()
                else:
                    time.sleep(2)
                    print("綠燈亮")
                    e.set()
        def car(e,i):
            if not e.is_set():
                print("car%s在等待"%i)
                e.wait()
            print("car%s通過了"%i)
        if __name__ == ‘__main__‘:
            e = Event()
            p = Process(target=traffic_light,args=(e,))
            p.daemon =True    #變成守護進程
            p.start()
            p_list = []
            for i in range(10):
                time.sleep(random.randrange(0,3,2))
                p = Process(target=car,args=(e,i))
                p.start()
                p_list.append(p)
            for p in p_list:p.join()
  • 進程之間的通信(IPC)
    • 多個進程之間有一些固定的通信內容

    • socket給予文件家族通信

    • 進程之間雖然內存不共享,但是可以通信,

    • 進程隊列 Queue
      • 進程之間數據是安全的
    • 隊列是基於管道實現的

    • 管道是基於socket實現的

    • 隊列 + 鎖 簡便的IPC機制 使得進程之間的數據安全

    • def consume(q):
          print(‘son-->‘,q.get())
          q.put(‘abc‘)
      if __name__ == ‘__main__‘:
          q = Queue()
          p = Process(target=consume,args=(q,))
          p.start()
          q.put({‘123‘:123})
          p.join()
          print(‘Foo-->‘,q.get())

      簡單的生產消費模型

      def consume(q):
          print(‘son-->‘,q.get())
          q.put(‘abc‘)
      if __name__ == ‘__main__‘:
          q = Queue()
          p = Process(target=consume,args=(q,))
          p.start()
          q.put({‘123‘:123})
          p.join()
          print(‘Foo-->‘,q.get())
      • 相同的原理 JoinableQueue

      • task_done 通知隊列已經有一個數據被處理了

      • q.join() 阻塞直到放入隊列中所有的數據都被處理掉(有多少個數據就接受到多少taskdone)

      • import time
        import random
        from multiprocessing import Process,JoinableQueue
        def consumer(q,name):
            while True:
                food = q.get()
                time.sleep(random.uniform(0.3,0.8))
                print("%s吃了一個%s"%(name,food))
                q.task_done()
        def producer(q,name,food):
            for i in range(10):
                time.sleep(random.uniform(0.3,0.8))
                print("%s生產了%s%s"%(name,food,i))
                q.put(food+str(i))
        if __name__ == ‘__main__‘:
            jq = JoinableQueue()
            c1 = Process(target=consumer,args=(jq,"alex"))
            c1.daemon = True
            p1 = Process(target=producer,args=(jq,"libai","包子"))
            c1.start()
            p1.start()
            p1.join()
            jq.join()
  • 管道 進程之間數據不安全 且存取數據復雜

  • 進程池

    • 開啟過多的進程並不能提高你的效率,反而會降低效率

    • 計算密集型 充分占用CPU 多進程可以充分利用多核 適合開啟多進程,但是不適合開啟很多多進程

    • IO密集型 大部分時間都在阻塞隊列,而不是在運行狀態 根本不太適合開啟多進程

    • 提交任務:

      • 同步提交 apply

        • 返回值:子進程對應函數的返回值

        • 一個一個順序執行的,並沒有任何的並發效果

        • # import os
          # import time
          # from multiprocessing import Process,Pool
          # def task(num):
          #     time.sleep(0.5)
          #     print("%s: %s"%(num,os.getpid()))
          #     return num ** 2
          # if __name__ == ‘__main__‘:
          #     p = Pool(4)
          #     for i in range(20):
          #         res = p.apply(task,args=(i,)) #apply   提交任務方法,同步提交
          #         print("--->",res)
          #四個任務依次執行,輪換
      • 異步提交 apply_async

        • 沒有返回值,要想所有任務能夠順利的執行完畢

          • p.close()
          • p.join() 必須先close在join,阻塞直到進程池中所有任務都執行完畢
        • 有返回值的情況下

          • res.get() #get不能再提交任務之後立刻執行,應該是先提交所有的任務再通過get獲取結果

          • map()方法

            • 異步提交的簡化版本
            • 自帶close和join方法
          • import os
            import time
            from multiprocessing import Pool
            def task(num):
                time.sleep(1)
                print("%s: %s"%(num,os.getpid()))
                return num **2
            if __name__ == ‘__main__‘:
                p = Pool(4)
                for i in range(20):
                    res = p.apply_async(task,args=(i,))     #apply_async   異步提交
                p.close()
                p.join()
            #輸出結果同時四個認識執行

並發編程 進程基礎