1. 程式人生 > 其它 >python分散式版本二分法驗證

python分散式版本二分法驗證

  1. 思路1,master等待遠端返回結果

from multiprocessing import Pool
import os, time, random


# 在遠端執行的任務程序,,hang住等待返回
def long_time_task(ip,url):
    print('Run task %s-%s (%s)...' % (ip, url, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (ip, (end - start)))
    return True  # 版本驗證通過返回true


if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    url_list = [1,2,3,4,5,6,7,8]   # 版本list,stat[0],end[-1]
    p = Pool(4)  # 遠端執行的節點數
    # 根據節點數二分法取出節點數個版本待驗證
    for i in range(5):
        p.apply_async(long_time_task, args=(i,'url:1'))  # 傳入ip地址,和版本url
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()   # 所有程序都完成後,根據返回結果更新二分法的start和end
    print('All subprocesses done.')
  1. 思路2,QueueManager通訊
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random, time, queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()


# 增加如下的函式以適應windows ,【PicklingError】
def task_queue_fun():
    return task_queue


def result_queue_fun():
    return result_queue
    # return queue.Queue()  # 這樣不行


# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass


if __name__ == '__main__':

    # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
    # 修改如下的函式,以適應windows環境
    # QueueManager.register('get_task_queue', callable=lambda: task_queue)
    # QueueManager.register('get_result_queue', callable=lambda: result_queue)
    QueueManager.register('get_task_queue', callable=task_queue_fun)
    QueueManager.register('get_result_queue', callable=result_queue_fun)
    # 繫結埠5000, 設定驗證碼'abc':
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
    # 啟動Queue:
    manager.start()
    # 獲得通過網路訪問的Queue物件:
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    # 放幾個任務進去:
    for i in range(10):
        n = random.randint(0, 10000)
        print('Put task %d...' % n)
        task.put(n)
    # 從result佇列讀取結果:
    print('Try get results...')
    for i in range(10):
        r = result.get(timeout=10)
        print('Result: %s' % r)
    # 關閉:
    manager.shutdown()
    print('master exit.')