1. 程式人生 > >利用python在linux下分散式任務管理

利用python在linux下分散式任務管理

本人是新手,還請各位大神指教,覺得可以的話記得點贊!!!

我們先建一個python檔案

import random, time, queue,uuid

from multiprocessing.managers import BaseManager
import threading
import pymysql
def settask(n):
    connect = pymysql.Connect(  
            host='填自己外網地址',  
            port=3306,  
            user='使用者名稱',  
            passwd='密碼',  
            db='資料庫名',  
            charset='utf8'  
            )  

    cursor = connect.cursor()
    uid = str(uuid.uuid1())
    sql = "INSERT INTO task (rq,name,pc,canshu,uid) VALUES ('%s','%s','%s','%s','%s')" #task是建在資料庫的表名
    rq =  time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 
    name ='pc1name'
    pc = 'pc1'
    canshu = str(n)
    data = (rq,name,pc,canshu,uid)
    print(sql%data)
    
    cursor.execute(sql%data)  
    connect.commit()
    connect.commit() 
    cursor.close()  
    connect.close()
def readtask():
    connect = pymysql.Connect(  
            host='外網地址',  
            port=3306,  
            user='使用者名稱',  
            passwd='密碼',  
            db='資料庫名',  
            charset='utf8'  
            )  
    cursor = connect.cursor()
    sql  ='select * from  task where ip is null order by rq desc '
    print(sql)
    cursor.execute(sql)
    row = cursor.fetchone()
    description=cursor.description  #列名
    tbhnames= list(map(lambda x:x[0]  ,description)) #列名 
    tbhtypes= list(map(lambda x:x[1]  ,description))#資料型別
    tbhsizes= list(map(lambda x:x[3]  ,description))#資料定義的長度
    rq=row[tbhnames.index('rq')]
    name=row[tbhnames.index('name')] 
    pc=row[tbhnames.index('pc')]
    canshu=row[tbhnames.index('canshu')]

    
            
    connect.commit() 
    cursor.close()  
    connect.close()
    return(str(row))
def dotask():
    settask(random.random() * 10)
    task.put(readtask())
 
    global timer
    timer = threading.Timer(5.5, dotask)
    timer.start()


# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()


# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass


# 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 繫結埠5000, 設定驗證碼'abc':
manager = QueueManager(address=('172.18.83.9', 8100), authkey=b'abc')
# 啟動Queue:
manager.start()
# 獲得通過網路訪問的Queue物件:
task = manager.get_task_queue()
result = manager.get_result_queue()


timer = threading.Timer(1, dotask)

timer.start()

再新建一個客戶端的python檔案

import time, sys, queue
from multiprocessing.managers import BaseManager
import threading
# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass


def mktask():
    connect = pymysql.Connect(  
            host='外網地址',  
            port=3306,  
            user='使用者名稱',  
            passwd='密碼',  
            db='資料庫名',  
            charset='utf8'  
            )  
    cursor = connect.cursor()
    sql  ='update task set ip= from  task where ip is null '
    print(sql)
    cursor.execute(sql)
    row = cursor.fetchone()
    description=cursor.description  #列名
    tbhnames= list(map(lambda x:x[0]  ,description)) #列名 
    tbhtypes= list(map(lambda x:x[1]  ,description))#資料型別
    tbhsizes= list(map(lambda x:x[3]  ,description))#資料定義的長度
    rq=row[tbhnames.index('rq')]
    name=row[tbhnames.index('name')] 
    pc=row[tbhnames.index('pc')]
    canshu=row[tbhnames.index('canshu')]

    
            
    connect.commit() 
    cursor.close()  
    connect.close()
    return(str(row))
def gettask():
    print('get',task.get(timeout = 1))
    global timer
    timer = threading.Timer(3, gettask)
    timer.start()


try:
# 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字:
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')
    
    # 連線到伺服器,也就是執行task_master.py的機器:
    server_addr = '內網地址
'
    print('Connect to server %s...' % server_addr)
    # 埠和驗證碼注意保持與task_master.py設定的完全一致:
    m = QueueManager(address=(server_addr, 8100), authkey=b'abc')
    # 從網路連線:
    m.connect()
    # 獲取Queue的物件:
    task = m.get_task_queue()
    result = m.get_result_queue()
    timer = threading.Timer(1, gettask)
    timer.start()
except Exception as e:
    print(e)
finally:
    pass