1. 程式人生 > 其它 >大資料流處理框架介紹

大資料流處理框架介紹

從python2.4版本開始,可以用subprocess這個模組來產生子程序,並連線到子程序的標準輸入/輸出/錯誤中去,還可以得到子程序的返回值。

subprocess意在替代其他幾個老的模組或者函式,比如:os.system os.spawn* os.popen* popen2.* commands.*

可以執行shell命令的相關模組和函式有:

  • os.system
  • os.spawn*
  • os.popen* --廢棄
  • popen2.* --廢棄
  • commands.* --廢棄,3.x中被移除

以上執行shell命令的相關的模組和函式的功能均在 subprocess 模組中實現,並提供了更豐富的功能。

以下常見的類和方法: 參考https://www.jb51.net/article/177149.htm

一、subprocess.Popen

subprocess模組定義了一個類: Popen

1 2 3 4 5 6 7 8 9 10 11 12 13 14 class subprocess.Popen( args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env
=None, universal_newlines=False, startupinfo=None, creationflags=0)

各引數含義如下:

args:

args引數。可以是一個字串,可以是一個包含程式引數的列表。要執行的程式一般就是這個列表的第一項,或者是字串本身。

1 2 subprocess.Popen(["cat","test.txt"]) subprocess.Popen("cat test.txt")

這兩個之中,後者將不會工作。因為如果是一個字串的話,必須是程式的路徑才可以。(考慮unix的api函式exec,接受的是字串列表)但是下面的可以工作

1 subprocess.Popen("cat test.txt", shell=True)

這是因為它相當於

1 subprocess.Popen(["/bin/sh", "-c", "cat test.txt"])

在*nix下,當shell=False(預設)時,Popen使用os.execvp()來執行子程式。args一般要是一個【列表】。如果args是個字串的話,會被當做是可執行檔案的路徑,這樣就不能傳入任何引數了。

注意:

shlex.split()可以被用於序列化複雜的命令引數,比如:

1 2 3 4 5 6 7 8 9 >>> shlex.split('ls ps top grep pkill') ['ls', 'ps', 'top', 'grep', 'pkill'] >>>import shlex, subprocess >>>command_line = raw_input() /bin/cat -input test.txt -output "diege.txt" -cmd "echo '$MONEY'" >>>args = shlex.split(command_line) >>> print args ['/bin/cat', '-input', 'test.txt', '-output', 'diege.txt', '-cmd', "echo '$MONEY'"] >>>p=subprocess.Popen(args)

可以看到,空格分隔的選項(如-input)和引數(如test.txt)會被分割為列表裡獨立的項,但引號裡的或者轉義過的空格不在此列。這也有點像大多數shell的行為。

在linux下,當shell=True時,如果arg是個字串,就使用shell來解釋執行這個字串。如果args是個列表,則第一項被視為命令,其餘的都視為是給shell本身的引數。也就是說,等效於:

1 subprocess.Popen(['/bin/sh', '-c', args[0], args[1], ...])

在Windows下,下面的卻又是可以工作的

1 2 subprocess.Popen(["notepad.exe", "test.txt"]) subprocess.Popen("notepad.exe test.txt")

這是由於windows下的api函式CreateProcess接受的是一個字串。即使是列表形式的引數,也需要先合併成字串再傳遞給api函式

1 subprocess.Popen("notepad.exe test.txt" shell=True)

等同於

1 subprocess.Popen("cmd.exe /C "+"notepad.exe test.txt" shell=True

bufsize引數:

如果指定了bufsize引數作用就和內建函式open()一樣:0表示不緩衝,1表示行緩衝,其他正數表示近似的緩衝區位元組數,負數表示使用系統預設值。預設是0。

executable引數:

指定要執行的程式。它很少會被用到:一般程式可以由args 引數指定。如果shell=True ,executable 可以用於指定用哪個shell來執行(比如bash、csh、zsh等)。*nix下,預設是 /bin/sh ,windows下,就是環境變數 COMSPEC 的值。windows下,只有當你要執行的命令確實是shell內建命令(比如dir ,copy 等)時,你才需要指定shell=True ,而當你要執行一個基於命令列的批處理指令碼的時候,不需要指定此項。

stdin stdout和stderr:

stdin stdout和stderr,分別表示子程式的標準輸入、標準輸出和標準錯誤。可選的值有PIPE或者一個有效的檔案描述符(其實是個正整數)或者一個檔案物件,還有None。如果是PIPE,則表示需要建立一個新的管道,如果是None,不會做任何重定向工作,子程序的檔案描述符會繼承父程序的。另外,stderr的值還可以是STDOUT,表示子程序的標準錯誤也輸出到標準輸出。

preexec_fn引數:

如果把preexec_fn設定為一個可呼叫的物件(比如函式),就會在子程序被執行前被呼叫。(僅限*nix)

close_fds引數:

如果把close_fds設定成True,*nix下會在開子程序前把除了0、1、2以外的檔案描述符都先關閉。在 Windows下也不會繼承其他檔案描述符。

shell引數:

如果把shell設定成True,指定的命令會在shell裡解釋執行。

cwd引數:

如果cwd不是None,則會把cwd做為子程式的當前目錄。注意,並不會把該目錄做為可執行檔案的搜尋目錄,所以不要把程式檔案所在目錄設定為cwd 。

env引數:

如果env不是None,則子程式的環境變數由env的值來設定,而不是預設那樣繼承父程序的環境變數。注意,即使你只在env裡定義了某一個環境變數的值,也會阻止子程式得到其他的父程序的環境變數(也就是說,如果env裡只有1項,那麼子程序的環境變數就只有1個了)。例如:

1 2 3 4 >>> subprocess.Popen('env', env={'test':'123', 'testtext':'zzz'}) test=123 <subprocess.Popen object at 0x2870ad2c> testtext=zzz

universal_newlines引數:

如果把universal_newlines 設定成True,則子程序的stdout和stderr被視為文字物件,並且不管是*nix的行結束符('/n'),還是老mac格式的行結束符('/r' ),還是windows 格式的行結束符('/r/n' )都將被視為 '/n' 。

startupinfo和creationflags引數:

如果指定了startupinfo和creationflags,將會被傳遞給後面的CreateProcess()函式,用於指定子程式的各種其他屬性,比如主視窗樣式或者是子程序的優先順序等。(僅限Windows)

二、subprocess.PIPE

1 subprocess.PIPE

一個可以被用於Popen的stdin 、stdout 和stderr 3個引數的特輸值,表示需要建立一個新的管道。

1 subprocess.STDOUT

一個可以被用於Popen的stderr引數的輸出值,表示子程式的標準錯誤匯合到標準輸出。

例項:

1 2 3 4 5 6 7 8 9 10 11 12 13 >>>p=subprocess.Popen("df -h",shell=True,stdout=subprocess.PIPE) >>>out=p.stdout.readlines() >>>out [b'Filesystem Size Used Avail Capacity Mounted on\n', b'/dev/ad0s1a 713M 313M 343M 48% /\n', b'devfs 1.0K 1.0K 0B 100% /dev\n', b'/dev/ad0s1e 514M 2.1M 471M 0% /tmp\n', b'/dev/ad0s1f 4.3G 2.5G 1.4G 64% /usr\n', b'/dev/ad0s1d 2.0G 121M 1.7G 6% /var\n' >>> for line in out: ... print line.strip() ... Filesystem Size Used Avail Capacity Mounted on /dev/ad0s1a 713M 313M 343M 48% / devfs 1.0K 1.0K 0B 100% /dev /dev/ad0s1e 514M 2.1M 471M 0% /tmp /dev/ad0s1f 4.3G 2.5G 1.4G 64% /usr /dev/ad0s1d 2.0G 121M 1.7G 6% /var

stdout可以使用read(),readline(),readlines()等方法

三、方便的函式

1、subprocess.call

1 subprocess.call (*popenargs , **kwargs )

執行命令,並等待命令結束,再返回子程序的返回值。引數同Popen,檢視/usr/lib/python2.7/subprocess.py

去掉文件,其實是這樣的:

1 2 3 def call(*popenargs, **kwargs): return Popen(*popenargs, **kwargs).wait() >>> subprocess.call('ifconfig',shell=True)

2、subprocess.check_call

1 subprocess.check_call (*popenargs , **kwargs )

執行上面的call命令,並檢查返回值,如果子程序返回非0,則會丟擲CalledProcessError異常,這個異常會有個returncode

屬性,記錄子程序的返回值。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def check_call(*popenargs, **kwargs): retcode = call(*popenargs, **kwargs) if retcode: cmd = kwargs.get("args") raise CalledProcessError(retcode, cmd) return 0 >>> subprocess.check_call('ifconfig') >>> subprocess.call('noifconfig') Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/subprocess.py", line 493, in call return Popen(*popenargs, **kwargs).wait() File "/usr/local/lib/python2.7/subprocess.py", line 679, in __init__ errread, errwrite) File "/usr/local/lib/python2.7/subprocess.py", line 1228, in _execute_child raise child_exception OSError: [Errno 2] No such file or directory

異常子程序裡丟擲的異常,會在父程序中再次丟擲。並且,異常會有個叫child_traceback的額外屬性,這是個包含子程序錯誤traceback資訊的字串。遇到最多的錯誤回是 OSError,比如執行了一個並不存在的子程式就會產生OSError。另外,如果使用錯誤的引數呼叫Popen,會丟擲ValueError。當子程式返回非0時,check_call()還會產生CalledProcessError 異常。

安全性不像其他的popen函式,本函式不會呼叫/bin/sh來解釋命令,也就是說,命令中的每一個字元都會被安全地傳遞到子程序裡。

3、check_output

1 2 3 4 5 6 7 8 9 10 check_output()執行程式,並返回其標準輸出. def check_output(*popenargs, **kwargs): process = Popen(*popenargs, stdout=PIPE, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: cmd = kwargs.get("args") raise CalledProcessError(retcode, cmd, output=output) return output p=subprocess.check_output('ifconfig')

結果是所有行/n分割的一個字串可以直接print出來 這裡開始

4、Popen物件

產生物件

1 2 p=subprocess.Popen("df -h",shell=True,stdout=subprocess.PIPE) >>> dir(p)

Popen物件有以下方法:

1 Popen.poll()

檢查子程序是否已結束,設定並返回returncode屬性。

1 2 3 4 >>> p.poll() 0 Popen.wait()

等待子程序結束,設定並返回returncode屬性。

1 2 >>> p.wait() 0

注意: 如果子程序輸出了大量資料到stdout或者stderr的管道,並達到了系統pipe的快取大小的話,子程序會等待父程序讀取管道,而父程序此時正wait著的話,將會產生傳說中的死鎖,後果是非常嚴重滴。建議使用communicate() 來避免這種情況的發生。

1 Popen.communicate(input=None)

和子程序互動:傳送資料到stdin,並從stdout和stderr讀資料,直到收到EOF。等待子程序結束。可選的input如有有的話,要為字串型別。

此函式返回一個元組: (stdoutdata , stderrdata ) 。

注意,要給子程序的stdin傳送資料,則Popen的時候,stdin要為PIPE;同理,要可以接收資料的話,stdout或者stderr也要為PIPE。

1 2 p1=subprocess.Popen('cat /etc/passwd',shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE) >>> p2=subprocess.Popen('grep 0:0',shell=True,stdin=p1.stdout,stdout=subprocess.PIPE)

注意:讀到的資料會被快取在記憶體裡,所以資料量非常大的時候要小心了。

1 2 3 4 >>> p.communicate() (b'Filesystem Size Used Avail Capacity Mounted on\n/dev/ad0s1a 713M 313M 343M 48% /\ndevfs 1.0K 1.0K 0B 100% /dev\n/dev/ad0s1e 514M 2.1M 471M 0% /tmp\n/dev/ad0s1f 4.3G 2.5G 1.4G 64% /usr\n/dev/ad0s1d 2.0G 121M 1.7G 6% /var\n', None) Popen.send_signal(signal)

給子程序傳送signal訊號。

注意:windows下目前只支援傳送SIGTERM,等效於下面的terminate() 。

1 Popen.terminate()

停止子程序。Posix下是傳送SIGTERM訊號。windows下是呼叫TerminateProcess()這個API。

1 Popen.kill()

殺死子程序。Posix下是傳送SIGKILL訊號。windows下和terminate() 無異。

1 Popen.stdin

如果stdin 引數是PIPE,此屬性就是一個檔案物件,否則為None 。

1 Popen.stdout

如果stdout引數是PIPE,此屬性就是一個檔案物件,否則為None 。

1 Popen.stderr

如果stderr 引數是PIPE,此屬性就是一個檔案物件,否則為None 。

1 Popen.pid

子程序的程序號。注意,如果shell 引數為True,這屬性指的是子shell的程序號。

1 2 3 4 >>> p.pid 22303 Popen.returncode

子程式的返回值,由poll()或者wait()設定,間接地也由communicate()設定。

如果為None,表示子程序還沒終止。

如果為負數-N的話,表示子程序被N號訊號終止。(僅限*nux)

用subprocess來代替其他函式都可以用subprocess來完成,我們假定是用 “from subprocess import *” 來匯入模組的:

代替shell命令:

1 p=`ls -l`

等效於

1 p=Popen(['ls','-l'],stdout=PIPE).communicate()[0]

代替shell管道:

1 p=`dmesg | grep cpu`

等效於

1 2 3 4 5 6 7 8 9 10 p1=Popen(['dmesg'],stdout=PIPE) p2=Popen(['grep','cpu'],stdin=p1.stdout,stdout=PIPE) output = p2.communicate()[0] output cpu0: <ACPI CPU> on acpi0\nacpi_throttle0: <ACPI CPU Throttling> on cpu0\n >>> p1=subprocess.Popen('cat /etc/passwd',shell=True,stdout=subprocess.PIPE) >>> p2=subprocess.Popen('grep 0:0',shell=True,stdin=p1.stdout,stdout=subprocess.PIPE) >>> p3=subprocess.Popen("cut -d ':' -f 7",shell=True,stdin=p2.stdout,stdout=subprocess.PIPE) >>> print p3.stdout.read()

代替os.system()

1 lsl = os.system('ls '+'-l')

這個是一個返回狀態

等效於

1 2 p=Popen('ls -l', shell=True) lsl=os.waitpid(p.pid,0)[1]

注意:

通常並不需要用shell來呼叫程式。用subprocess可以更方便地得到子程式的返回值。

其實,更真實的替換是:

1 2 3 4 5 6 7 8 try: retcode = call(“mycmd” + ” myarg”, shell=True) if retcode < 0: print >>sys.stderr, “Child was terminated by signal”, -retcode else: print >>sys.stderr, “Child returned”, retcode except OSError, e: print >>sys.stderr, “Execution failed:”, e

代替os.spawn系列

P_NOWAIT的例子

1 pid = os.spawnlp(os.P_NOWAIT, “/bin/mycmd”, “mycmd”, “myarg”)

等效於

1 pid = Popen(["/bin/mycmd", "myarg"]).pid

P_WAIT的例子

1 retcode = os.spawnlp(os.P_WAIT, “/bin/mycmd”, “mycmd”, “myarg”)

等效於

1 retcode = call(["/bin/mycmd", "myarg"])

返回值處理:

1 2 3 4 5 pipe = os.popen(“cmd”, ‘w') ... rc = pipe.close() if rc != None and rc % 256: print “There were some errors”

等效於