1. 程式人生 > 其它 >[實戰篇] Python 運維中使用併發

[實戰篇] Python 運維中使用併發

今天從大哥手裡接了一個需求:

驗證一下新的 Docker 映象倉庫(Docker Registry)是否遷移成功了

簡單粗暴的方法就是拿到老倉庫中的映象列表(Image List),在新倉庫模擬使用者重新拉取(pull)一遍來驗證,我們開始


subprocess

如果我們用 Shell 來寫,執行 Docker 命令很容易,直接寫就是了,但是對結果的判斷就不那麼友好了(Shell 大神忽略),那麼 Python 呢,如何優雅的執行 Linux 命令呢?這裡我們用到了一個 Python 標準庫(standard module) :

import subprocess

我們都知道,命令執行過程中會有標準輸出(stdout)和標準錯誤(stderror):

def run_cmd(cmd):
    return subprocess.Popen(cmd, 
                  stdout=subprocess.PIPE,
                  stderr=subprocess.PIPE).communicate()

上面程式碼封裝了一個方法,它會啟動一個子程序執行命令,並將標準輸出和標準錯誤通過管道(程序間通訊最常用的方式)收集

管道其實就是檔案描述符,子程序會繼承父程序中的所有檔案描述符

最後,通過序列解包

stdout, stderr = run_cmd('uname -a')

獲取標準輸出和標準錯誤,這個方法我們後面要用到好多

我拿到映象列表檔案了,先使用

cat imagelist | wc -l

查看了一下行數(映象數量),4254 個,還行,不算太多


思路:

  • 拉取列表中的映象,拉取成功後將其刪除並標記為成功
  • 拉取失敗就標記為失敗和並記錄錯誤
  • 如果拉取超時,就標記超時

如何標記呢,因為我們將會使用多程序,多個程序間通訊還是蠻麻煩的,這裡偷個懶:直接使用 append 模式直接將結果寫入檔案

with open('timeout_image.txt','a') as timeout_file:
    timeout_file.write(image)

我們先寫出如何驗證一個映象的邏輯:

def pull_worker(image):
    # ?

公眾號程式碼支援太差了,可以去文末的點選閱讀原文檢視

後面就僅僅是併發的問題了


sys

首先我們想控制併發數量,最簡單是使用 sys 模組

if len(sys.argv) == 4:
    pass
else:
    print “Need three params
    return
    
# 這裡同樣使用了序列解包,第一個引數是指令碼名字,忽略掉
_, file, coreNum, poolNum = sys.argv

這樣的程式執行起來像這樣:

python check_images.py imagelist 8 5

gevent

然後是實現,我們使用的這個模組需要安裝,它是大名鼎鼎的 gevent,為什麼使用它,因為我們的任務是 I/O密集 型的,gevent 擅長處理這類任務(有興趣可以去了解下猴子補丁)

pip install gevent

我們看匯入模組的程式碼:

import gevent.pool
import gevent.monkey
from gevent import Timeout
gevent.monkey.patch_all() # 猴子補丁
from multiprocessing import Process

最後一行也是使用了 Python 的標準庫,多程序模組:multiprocessing

不要和我說什麼Python 有全域性直譯器鎖(GIL),多程序沒有 GIL,多程序沒有 GIL,多程序沒有 GIL

如何併發呢:

  1. 啟動和核數相等的程序(跑滿機器,儘快完成任務為目的)
  2. 每個程序裡面 docker pull 的併發為 5(gevent 協程池)

所以我們總的併發數就是 40,這樣就完成了可控制併發的指令碼

程式碼如下:

def each_process(task_object_list):
    pool = gevent.pool.Pool(int(poolNum))
    pool.map(pull_worker, task_object_list)
    stop = time.time()
    elapsed = stop - start
    print "End precess with {0} s".format(elapsed)

with open(file) as f:     
    for line in f:
        line = line.strip()
        all_task_list.append(line)

print "All task: {0}".format(len(all_task_list))
for sliced_task_list in slice_list(all_task_list, int(coreNum)):
     print "Start process with tasks: {0}".format(len(sliced_task_list))
     p = Process(target=each_process, args=(sliced_task_list,))
     p.start()

這裡需要注意的一點是,4254 個映象,是按照核心數量分組(slice_list),然後交給不同的程序處理的。