1. 程式人生 > >python多線程限制並發數示例

python多線程限制並發數示例

star 使用 str rtp 隊列 -- col bin append

#coding: utf-8
#!/usr/bin/env python
import Queue
import threading
import time

prolock = threading.Lock()

# 定義同時隊列數
queue = Queue.Queue(maxsize=10)

# 定義任務初值值及最大值
taskidx = 0
maxidx = 100

# 生成任務列表
def taskList():
    task = []
    for i in range(100):
        task.append("task" + str(i))
    
return task # 把任務放入隊列中 class Producer(threading.Thread): def __init__(self, name, queue): self.__name = name self.__queue = queue super(Producer, self).__init__() def run(self): while True: global taskidx, prolock, maxidx time.sleep(
4) prolock.acquire() print Producer name: %s % (self.__name) if maxidx == taskidx: prolock.release() break ips = taskList() ip = ips[taskidx] self.__queue.put(ip) taskidx = taskidx + 1 prolock.release()
# 線程處理任務 class Consumer(threading.Thread): def __init__(self, name, queue): self.__name = name self.__queue = queue super(Consumer, self).__init__() def run(self): while True: ip = self.__queue.get() print Consumer name: %s % (self.__name) consumer_process(ip) self.__queue.task_done() def consumer_process(ip): time.sleep(1) print ip def startProducer(thread_num): t_produce = [] for i in range(thread_num): p = Producer("producer"+str(i), queue) p.setDaemon(True) p.start() t_produce.append(p) return t_produce def startConsumer(thread_num): t_consumer = [] for i in range(thread_num): c = Consumer("Consumer"+str(i), queue) c.setDaemon(True) c.start() t_consumer.append(c) return t_consumer def main(): t_produce = startProducer(3) t_consumer = startConsumer(5) # 確保所有的任務都生成 for p in t_produce: p.join() # 等待處理完所有任務 queue.join() if __name__ == __main__: main() print ------end-------

一般生成任務都會比較快,可以使用單線程來生成任務,示例如下:

#coding: utf-8
#!/usr/bin/env python
import Queue
import threading
import time

# 定義同時處理任務數
queue = Queue.Queue(maxsize=3)

# 生成任務列表
def taskList():
    task = []
    for i in range(100):
        task.append("task" + str(i))
    return task


# 把任務放入隊列中
class Producer(threading.Thread):
    def __init__(self, name, queue):
        self.__name = name
        self.__queue = queue
        super(Producer, self).__init__()

    def run(self):
        for ip in taskList():
            self.__queue.put(ip)


# 線程處理任務
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        self.__name = name
        self.__queue = queue
        super(Consumer, self).__init__()

    def run(self):
        while True:
            ip = self.__queue.get()
            print Consumer name: %s % (self.__name)
            consumer_process(ip)
            self.__queue.task_done()

def consumer_process(ip):
    time.sleep(1)
    print ip


def startConsumer(thread_num):
    t_consumer = []
    for i in range(thread_num):
        c = Consumer(i, queue)
        c.setDaemon(True)
        c.start()
        t_consumer.append(c)
    return t_consumer

def main():
    p = Producer("Producer task0", queue)
    p.setDaemon(True)
    p.start()
    startConsumer(9)

    # 確保所有的任務都生成
    p.join()

    # 等待處理完所有任務
    queue.join()



if __name__ == __main__:
    main()
    print ------end-------

python多線程限制並發數示例