利用python在linux下分散式任務管理
阿新 • • 發佈:2019-01-24
本人是新手,還請各位大神指教,覺得可以的話記得點贊!!!
我們先建一個python檔案
import random, time, queue,uuid
from multiprocessing.managers import BaseManagerimport threading
import pymysql
def settask(n):
connect = pymysql.Connect(
host='填自己外網地址',
port=3306,
user='使用者名稱',
passwd='密碼',
db='資料庫名',
charset='utf8'
)
cursor = connect.cursor()
uid = str(uuid.uuid1())
sql = "INSERT INTO task (rq,name,pc,canshu,uid) VALUES ('%s','%s','%s','%s','%s')" #task是建在資料庫的表名
rq = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
name ='pc1name'
pc = 'pc1'
canshu = str(n)
data = (rq,name,pc,canshu,uid)
print(sql%data)
cursor.execute(sql%data)
connect.commit()
connect.commit()
cursor.close()
connect.close()
def readtask():
connect = pymysql.Connect(
host='外網地址',
port=3306,
user='使用者名稱',
passwd='密碼',
db='資料庫名',
charset='utf8'
)
cursor = connect.cursor()
sql ='select * from task where ip is null order by rq desc '
print(sql)
cursor.execute(sql)
row = cursor.fetchone()
description=cursor.description #列名
tbhnames= list(map(lambda x:x[0] ,description)) #列名
tbhtypes= list(map(lambda x:x[1] ,description))#資料型別
tbhsizes= list(map(lambda x:x[3] ,description))#資料定義的長度
rq=row[tbhnames.index('rq')]
name=row[tbhnames.index('name')]
pc=row[tbhnames.index('pc')]
canshu=row[tbhnames.index('canshu')]
connect.commit()
cursor.close()
connect.close()
return(str(row))
def dotask():
settask(random.random() * 10)
task.put(readtask())
global timer
timer = threading.Timer(5.5, dotask)
timer.start()
# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
# 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 繫結埠5000, 設定驗證碼'abc':
manager = QueueManager(address=('172.18.83.9', 8100), authkey=b'abc')
# 啟動Queue:
manager.start()
# 獲得通過網路訪問的Queue物件:
task = manager.get_task_queue()
result = manager.get_result_queue()
timer = threading.Timer(1, dotask)
timer.start()
再新建一個客戶端的python檔案
import time, sys, queuefrom multiprocessing.managers import BaseManager
import threading
# 建立類似的QueueManager:
class QueueManager(BaseManager):
pass
def mktask():
connect = pymysql.Connect(
host='外網地址',
port=3306,
user='使用者名稱',
passwd='密碼',
db='資料庫名',
charset='utf8'
)
cursor = connect.cursor()
sql ='update task set ip= from task where ip is null '
print(sql)
cursor.execute(sql)
row = cursor.fetchone()
description=cursor.description #列名
tbhnames= list(map(lambda x:x[0] ,description)) #列名
tbhtypes= list(map(lambda x:x[1] ,description))#資料型別
tbhsizes= list(map(lambda x:x[3] ,description))#資料定義的長度
rq=row[tbhnames.index('rq')]
name=row[tbhnames.index('name')]
pc=row[tbhnames.index('pc')]
canshu=row[tbhnames.index('canshu')]
connect.commit()
cursor.close()
connect.close()
return(str(row))
def gettask():
print('get',task.get(timeout = 1))
global timer
timer = threading.Timer(3, gettask)
timer.start()
try:
# 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 連線到伺服器,也就是執行task_master.py的機器:
server_addr = '內網地址
print('Connect to server %s...' % server_addr)
# 埠和驗證碼注意保持與task_master.py設定的完全一致:
m = QueueManager(address=(server_addr, 8100), authkey=b'abc')
# 從網路連線:
m.connect()
# 獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
timer = threading.Timer(1, gettask)
timer.start()
except Exception as e:
print(e)
finally:
pass