1. 程式人生 > 其它 >Python -- 多程序、多執行緒 的基本使用

Python -- 多程序、多執行緒 的基本使用

單程序單執行緒

import time


def production():
    """
    間隔一秒,模擬一秒生產一個任務,生產10個任務
    :return: 生產完畢,返回需要消費的任務
    """
    _tasks = 0
    while _tasks < 10:
        time.sleep(1)
        _tasks += 1
        print(f"生產任務:{_tasks}")

    return _tasks


def consumption(_tasks, _count):
    """
    間隔一秒,模擬一秒消費完一個任務
    :param _tasks: 需要消費的任務數
    :param _count: 已經消費的任務數
    :return:
    """
    while _tasks:
        time.sleep(1)
        _tasks -= 1
        _count += 1
        print(f"完成任務:{_count} \t\t\t 剩餘任務:{_tasks}")


if __name__ == '__main__':
    # 已經消費的任務數
    finishedTasks = 0

    # 生產任務
    numberOfTasks = production()
    # 消費任務
    consumption(numberOfTasks, finishedTasks)



單程序多執行緒

import time
from queue import Queue
from threading import Thread, current_thread


def production(_task_queue: Queue):
    """
    間隔一秒,模擬一秒生產一個任務
    一直生產任務
    :param: _task_queue
    """
    while True:
        time.sleep(1)
        _task_queue.put(1)
        print(f"生產任務:{_task_queue.qsize()} \t\t\t 執行緒名稱:{current_thread().name}\n")


def consumption(_task_queue: Queue, _count_queue: Queue):
    """
    間隔一秒,模擬一秒消費完一個任務
    只要任務佇列中有任務就一直消費
    :param _task_queue:  需要消費的任務數
    :param _count_queue: 已經消費的任務數
    """
    while True:
        if _task_queue.qsize():
            time.sleep(1)
            _task_queue.get()
            _count_queue.put(1)
            print(f"完成任務:{_count_queue.qsize()} \t\t\t 剩餘任務:{_task_queue.qsize()} \t\t\t 執行緒名稱:{current_thread().name}\n")
        else:
            # 佇列中沒任務的時候,間隔一段時間後,再繼續
            print(f'所有任務消費完畢,休息一段時間,等待生產 \t\t 執行緒名稱:{current_thread().name}\n')
            time.sleep(10)


if __name__ == '__main__':
    """
    執行緒間通訊 用 queue.Queue 佇列
    傳參 args 傳遞的是元組
    只傳一個引數時:args = (引數1,)
    """

    # 生產 佇列
    tasksQueue = Queue()
    # 已經執行的任務數
    ftQueue = Queue()

    # 3個執行緒生產任務
    for i in range(1, 4):
        t = Thread(target=production, args=(tasksQueue,), name=f'生產執行緒{i}')
        t.start()

    # 5個執行緒消費任務
    for i in range(1, 6):
        t = Thread(target=consumption, args=(tasksQueue, ftQueue), name=f'消費執行緒{i}')
        t.start()



多程序單執行緒

import time
from multiprocessing import Process, current_process, Queue


def production(_task_queue: Queue):
    """
    間隔一秒,模擬一秒生產一個任務
    一直生產任務
    :param: _task_queue
    """
    while True:
        time.sleep(1)
        _task_queue.put(1)
        print(f"生產任務:{_task_queue.qsize()} \t\t\t 程序名稱:{current_process().name}\n")


def consumption(_task_queue: Queue, _count_queue: Queue):
    """
    間隔一秒,模擬一秒消費完一個任務
    只要任務佇列中有任務就一直消費
    :param _task_queue:  需要消費的任務數
    :param _count_queue: 已經消費的任務數
    """
    while True:
        if _task_queue.qsize():
            time.sleep(1)
            _task_queue.get()
            _count_queue.put(1)
            print(
                f"完成任務:{_count_queue.qsize()} \t\t\t 剩餘任務:{_task_queue.qsize()} \t\t\t 程序名稱:{current_process().name}\n")
        else:
            # 佇列中沒任務的時候,間隔一段時間後,再繼續
            print(f'所有任務消費完畢,休息一段時間,等待生產 \t\t 程序名稱:{current_process().name}\n')
            time.sleep(10)


if __name__ == '__main__':
    """
    程序間通訊 用 multiprocessing.Queue 佇列
    傳參 args 傳遞的是元組
    只傳一個引數時:args = (引數1,)
    """

    # 生產 佇列
    tasksQueue = Queue()
    # 已經執行的任務數
    ftQueue = Queue()

    # 2個程序生產任務
    for i in range(1, 3):
        p = Process(target=production, args=(tasksQueue,), name=f'生產程序{i}')
        p.start()

    # 3個程序消費任務
    for i in range(1, 4):
        p = Process(target=consumption, args=(tasksQueue, ftQueue), name=f'消費程序{i}')
        p.start()



多程序多執行緒

import time
from threading import Thread, current_thread
from multiprocessing import Process, current_process, Queue


def production(_task_queue: Queue):
    """
    一個程序中,開多個執行緒進行生產
    :param _task_queue: 生產任務 佇列
    """

    def production_individually():
        """
        間隔一秒,模擬一秒生產一個任務
        一直生產任務
        """
        while True:
            time.sleep(1)
            _task_queue.put(1)
            print(f"生產:{_task_queue.qsize()} \t 程序:{current_process().name} \t 執行緒:{current_thread().name}\n")

    # 每個程序中,3個執行緒生產
    for n in range(1, 4):
        t = Thread(target=production_individually, name=f'生產執行緒{n}')
        t.start()


def consumption(_task_queue: Queue, _count_queue: Queue):
    """
    一個程序中,開多個執行緒進行消費
    :param _task_queue:  需要消費的任務數
    :param _count_queue: 已經消費的任務數
    """

    def consumption_individually():
        """
        間隔一秒,模擬一秒消費完一個任務
        只要任務佇列中有任務就一直消費
        """
        while True:
            if _task_queue.qsize():
                time.sleep(1)
                _task_queue.get()
                _count_queue.put(1)
                print(
                    f"完成:{_count_queue.qsize()} \t 剩餘:{_task_queue.qsize()} \t "
                    f"程序:{current_process().name} \t 執行緒:{current_thread().name}\n"
                )
            else:
                # 佇列中沒任務的時候,間隔一段時間後,再繼續
                print(f'所有任務消費完畢,休息一段時間,等待生產 \t 程序:{current_process().name} \t 執行緒:{current_thread().name}\n')
                time.sleep(10)

    # 每個程序中,2個執行緒消費
    for n in range(1, 3):
        t = Thread(target=consumption_individually, name=f'消費執行緒{n}')
        t.start()


if __name__ == '__main__':
    """
    程序間通訊 用 multiprocessing.Queue 佇列
    傳參 args 傳遞的是元組
    只傳一個引數時:args = (引數1,)
    """

    # 生產 佇列
    tasksQueue = Queue()
    # 已經執行的任務數
    ftQueue = Queue()

    # 2個程序生產任務
    for i in range(1, 3):
        p = Process(target=production, args=(tasksQueue,), name=f'生產程序{i}')
        p.start()

    # 3個程序消費任務
    for i in range(1, 4):
        p = Process(target=consumption, args=(tasksQueue, ftQueue), name=f'消費程序{i}')
        p.start()







路漫漫其修遠兮,吾將上下而求索

本文來自部落格園,作者:三個零,轉載請註明原文連結:https://www.cnblogs.com/jiyu-hlzy/p/15948408.html