[Java][Android][Process] Process 創建+控制+分析 經驗淺談
不管是Android亦或者Java中或多或少須要調用底層的一些命令。運行一些參數;
此時我們須要用到Java的Process來創建一個子進程。之所以是子進程是由於此進程依賴於發起創建請求的進程,假設發起者被Kill那個子進程也將Kill。
對於Process相信使用過的朋友一定不會陌生,它具有例如以下特點:
1.創建簡單
2.控制難
3.easy導致無法創建子進程
4.假設是多線程那麽非常有可能造成內存溢出
以上現象假設你僅僅是偶爾使用一次,創建一個進程也許你什麽都沒有感覺到,可是假設你使用了多線程,進行了大量的創建,以上問題你都會遇到。
相關:[Android] ProcessBuilder與Runtime.getRuntime().exec分別創建進程的差別,[Android] [Java] 分享 Process 運行命令行封裝類
這兩個星期一直在研究上面的問題,我要做的軟件是在Android中進行TraceRoute,因為手機不可能全然Root所以不能採用JNI來發送ICMP請求的方式,終於僅僅能使用創建進程方式進行。詳細實現思路是:使用PING命令來PING百度等地址,在PING命令中增加TTL,得到每一次的IP地址。當IP地址與目標IP地址符合時退出。而且還須要單獨PING一次每一跳的延遲和丟包。
單線程:PING 百度 TTL=1 =》 得到IP,PING IP 得到延遲丟包。改變TTL,進行下一次PING。直到所得到的IP與目標(百度)一樣時停止。
依照上面的思路一次須要創建兩個子進程。一般到百度時TTL大約為12跳左右,所以就是2*12=24個子進程;假設是在單線程下簡單明了,可是速度慢,整個過程大約須要1分鐘左右。
多線程:同一時候發起3個線程進行3跳測試TTL=(1,2。3),測試完畢後測試下一批數據TTL=(4,5,6)。假設也是12跳的話。那麽也是24個子進程,可是總體耗時將會為1/3.可見此時效率較高。
可是多線程須要考慮的是線程的同步問題,以及得到數據後的寫入問題,這些贊不談。僅僅談進程問題。經過我的測試假如如今測試100個站點的TraceRoute數據,在上層控制一次測試4個站點,底層實現並發3個線程,此時在一定時間內將會同一時候存在3*4個進程。依照平均每一個站點12跳來算:12*2*100=240個子進程,須要的子線程為12*100=120個。
這個時候問題來了,假如如今程序子進程不正常了,遇到了一個一定的問題導致進程無法運行完畢。此時你的現象是:一個子進程卡住,隨後創建的全部子進程都卡住。
假如最上層線程做了任務時間限制。那麽到時間後將會嘗試銷毀,可是你會發現無法銷毀,所持有的線程也不會銷毀。
可是上層以為銷毀掉了,然後繼續進行下一批的數據測試。此時你的線程數量會逐漸添加,假設100任務下來你的線程也許會達到3*4*100=1200假設有前期沒有這種情況那個就是一半:600個線程左右,假設後期還有任務將會繼續添加可是卻永遠不會銷毀。可是我們知道JVM的內存是有限的。所以此時將會出現內存溢出。
以上就是我遇到的問題,我最先改為了等待線程全然返回後再進行下一批數據測試。此時內存溢出是攻克了,可是任務卻一直卡住在哪裏了,永遠也不走。
我就在想要解決這一的問題須要解決根本上的問題才行。經過研究我發如今程序創建了子進程後JVM將會創建一個子進程管理線程:“ProcessManager”:
正常情況下該線程狀態為Native。可是假設創建大量子進程後有可能會出現此線程為Monitor狀態,過一段時間後全部創建子進程的線程狀態也將會變為Monitor狀態,然後將一直死鎖,後面創建線程也是繼續死鎖,無法繼續。
通過查看ProcessManager源代碼發現,當中啟動了一個線程用於監聽子進程狀態,同一時候管理子進程,比方輸出消息以及關閉子進程等操作,詳細例如以下:
/** * Copyright (C) 2007 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package java.lang; import java.io.File; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.util.HashMap; import java.util.Map; import java.util.Arrays; import java.util.logging.Logger; import java.util.logging.Level; /*** * Manages child processes. * * <p>Harmony‘s native implementation (for comparison purposes): * http://tinyurl.com/3ytwuq */ final class ProcessManager { /*** * constant communicated from native code indicating that a * child died, but it was unable to determine the status */ private static final int WAIT_STATUS_UNKNOWN = -1; /*** * constant communicated from native code indicating that there * are currently no children to wait for */ private static final int WAIT_STATUS_NO_CHILDREN = -2; /*** * constant communicated from native code indicating that a wait() * call returned -1 and set an undocumented (and hence unexpected) errno */ private static final int WAIT_STATUS_STRANGE_ERRNO = -3; /*** * Initializes native static state. */ static native void staticInitialize(); static { staticInitialize(); } /*** * Map from pid to Process. We keep weak references to the Process objects * and clean up the entries when no more external references are left. The * process objects themselves don‘t require much memory, but file * descriptors (associated with stdin/out/err in this case) can be * a scarce resource. */ private final Map<Integer, ProcessReference> processReferences = new HashMap<Integer, ProcessReference>(); /*** Keeps track of garbage-collected Processes. */ private final ProcessReferenceQueue referenceQueue = new ProcessReferenceQueue(); private ProcessManager() { // Spawn a thread to listen for signals from child processes. Thread processThread = new Thread(ProcessManager.class.getName()) { @Override public void run() { watchChildren(); } }; processThread.setDaemon(true); processThread.start(); } /*** * Kills the process with the given ID. * * @parm pid ID of process to kill */ private static native void kill(int pid) throws IOException; /*** * Cleans up after garbage collected processes. Requires the lock on the * map. */ void cleanUp() { ProcessReference reference; while ((reference = referenceQueue.poll()) != null) { synchronized (processReferences) { processReferences.remove(reference.processId); } } } /*** * Listens for signals from processes and calls back to * [email protected] #onExit(int,int)}. */ native void watchChildren(); /*** * Called by [email protected] #watchChildren()} when a child process exits. * * @param pid ID of process that exited * @param exitValue value the process returned upon exit */ void onExit(int pid, int exitValue) { ProcessReference processReference = null; synchronized (processReferences) { cleanUp(); if (pid >= 0) { processReference = processReferences.remove(pid); } else if (exitValue == WAIT_STATUS_NO_CHILDREN) { if (processReferences.isEmpty()) { /** * There are no eligible children; wait for one to be * added. The wait() will return due to the * notifyAll() call below. */ try { processReferences.wait(); } catch (InterruptedException ex) { // This should never happen. throw new AssertionError("unexpected interrupt"); } } else { /** * A new child was spawned just before we entered * the synchronized block. We can just fall through * without doing anything special and land back in * the native wait(). */ } } else { // Something weird is happening; abort! throw new AssertionError("unexpected wait() behavior"); } } if (processReference != null) { ProcessImpl process = processReference.get(); if (process != null) { process.setExitValue(exitValue); } } } /*** * Executes a native process. Fills in in, out, and err and returns the * new process ID upon success. */ static native int exec(String[] command, String[] environment, String workingDirectory, FileDescriptor in, FileDescriptor out, FileDescriptor err, boolean redirectErrorStream) throws IOException; /*** * Executes a process and returns an object representing it. */ Process exec(String[] taintedCommand, String[] taintedEnvironment, File workingDirectory, boolean redirectErrorStream) throws IOException { // Make sure we throw the same exceptions as the RI. if (taintedCommand == null) { throw new NullPointerException(); } if (taintedCommand.length == 0) { throw new IndexOutOfBoundsException(); } // Handle security and safety by copying mutable inputs and checking them. String[] command = taintedCommand.clone(); String[] environment = taintedEnvironment != null ? taintedEnvironment.clone() : null; SecurityManager securityManager = System.getSecurityManager(); if (securityManager != null) { securityManager.checkExec(command[0]); } // Check we‘re not passing null Strings to the native exec. for (String arg : command) { if (arg == null) { throw new NullPointerException(); } } // The environment is allowed to be null or empty, but no element may be null. if (environment != null) { for (String env : environment) { if (env == null) { throw new NullPointerException(); } } } FileDescriptor in = new FileDescriptor(); FileDescriptor out = new FileDescriptor(); FileDescriptor err = new FileDescriptor(); String workingPath = (workingDirectory == null) ? null : workingDirectory.getPath(); // Ensure onExit() doesn‘t access the process map before we add our // entry. synchronized (processReferences) { int pid; try { pid = exec(command, environment, workingPath, in, out, err, redirectErrorStream); } catch (IOException e) { IOException wrapper = new IOException("Error running exec()." + " Command: " + Arrays.toString(command) + " Working Directory: " + workingDirectory + " Environment: " + Arrays.toString(environment)); wrapper.initCause(e); throw wrapper; } ProcessImpl process = new ProcessImpl(pid, in, out, err); ProcessReference processReference = new ProcessReference(process, referenceQueue); processReferences.put(pid, processReference); /** * This will wake up the child monitor thread in case there * weren‘t previously any children to wait on. */ processReferences.notifyAll(); return process; } } static class ProcessImpl extends Process { /*** Process ID. */ final int id; final InputStream errorStream; /*** Reads output from process. */ final InputStream inputStream; /*** Sends output to process. */ final OutputStream outputStream; /*** The process‘s exit value. */ Integer exitValue = null; final Object exitValueMutex = new Object(); ProcessImpl(int id, FileDescriptor in, FileDescriptor out, FileDescriptor err) { this.id = id; this.errorStream = new ProcessInputStream(err); this.inputStream = new ProcessInputStream(in); this.outputStream = new ProcessOutputStream(out); } public void destroy() { try { kill(this.id); } catch (IOException e) { Logger.getLogger(Runtime.class.getName()).log(Level.FINE, "Failed to destroy process " + id + ".", e); } } public int exitValue() { synchronized (exitValueMutex) { if (exitValue == null) { throw new IllegalThreadStateException( "Process has not yet terminated."); } return exitValue; } } public InputStream getErrorStream() { return this.errorStream; } public InputStream getInputStream() { return this.inputStream; } public OutputStream getOutputStream() { return this.outputStream; } public int waitFor() throws InterruptedException { synchronized (exitValueMutex) { while (exitValue == null) { exitValueMutex.wait(); } return exitValue; } } void setExitValue(int exitValue) { synchronized (exitValueMutex) { this.exitValue = exitValue; exitValueMutex.notifyAll(); } } @Override public String toString() { return "Process[id=" + id + "]"; } } static class ProcessReference extends WeakReference<ProcessImpl> { final int processId; public ProcessReference(ProcessImpl referent, ProcessReferenceQueue referenceQueue) { super(referent, referenceQueue); this.processId = referent.id; } } static class ProcessReferenceQueue extends ReferenceQueue<ProcessImpl> { @Override public ProcessReference poll() { // Why couldn‘t they get the generics right on ReferenceQueue?:( Object reference = super.poll(); return (ProcessReference) reference; } } static final ProcessManager instance = new ProcessManager(); /*** Gets the process manager. */ static ProcessManager getInstance() { return instance; } /*** Automatically closes fd when collected. */ private static class ProcessInputStream extends FileInputStream { private FileDescriptor fd; private ProcessInputStream(FileDescriptor fd) { super(fd); this.fd = fd; } @Override public void close() throws IOException { try { super.close(); } finally { synchronized (this) { if (fd != null && fd.valid()) { try { ProcessManager.close(fd); } finally { fd = null; } } } } } } /*** Automatically closes fd when collected. */ private static class ProcessOutputStream extends FileOutputStream { private FileDescriptor fd; private ProcessOutputStream(FileDescriptor fd) { super(fd); this.fd = fd; } @Override public void close() throws IOException { try { super.close(); } finally { synchronized (this) { if (fd != null && fd.valid()) { try { ProcessManager.close(fd); } finally { fd = null; } } } } } } /*** Closes the given file descriptor. */ private static native void close(FileDescriptor fd) throws IOException; }
在當中有一個“native void watchChildren();”方法,此方法為線程主方法。詳細實現能夠看看JNI,在當中回調了方法:“void onExit(int pid, int exitValue);” 在方法中:
void onExit(int pid, int exitValue) { ProcessReference processReference = null; synchronized (processReferences) { cleanUp(); if (pid >= 0) { processReference = processReferences.remove(pid); } else if (exitValue == WAIT_STATUS_NO_CHILDREN) { if (processReferences.isEmpty()) { /** * There are no eligible children; wait for one to be * added. The wait() will return due to the * notifyAll() call below. */ try { processReferences.wait(); } catch (InterruptedException ex) { // This should never happen. throw new AssertionError("unexpected interrupt"); } } else { /** * A new child was spawned just before we entered * the synchronized block. We can just fall through * without doing anything special and land back in * the native wait(). */ } } else { // Something weird is happening; abort! throw new AssertionError("unexpected wait() behavior"); } } if (processReference != null) { ProcessImpl process = processReference.get(); if (process != null) { process.setExitValue(exitValue); } } }
此方法作用是刪除子進程隊列中子進程同一時候通知子進程ProcessImpl已完畢。
可是在方法:“watchChildren()”中假設出現System.in緩沖期滿的情況那麽進程將無法正常結束,它將一直等待緩沖區有空間存在。而緩沖區又是公共區間,假設一個出現等待那麽興許子進程也將所有等待。假設緩沖區無法清空。那麽所有子進程將會所有死鎖掉。
這就是導致子進程卡死的兇手。
知道問題關鍵點那麽就會有人想辦法解決,比如:
//...讀取數據... process.waitFor(); //....再次讀取這種方式看似非常好,可是你有沒有想過有些數據無法及時返回,所以在waitfor()之前讀取非常有可能沒有數據導致進行waitfor()等待,這時我們能夠看看源代碼:
public int waitFor() throws InterruptedException { synchronized (exitValueMutex) { while (exitValue == null) { exitValueMutex.wait(); } return exitValue; } }
void setExitValue(int exitValue) { synchronized (exitValueMutex) { this.exitValue = exitValue; exitValueMutex.notifyAll(); } }這裏能夠看見假如沒有退出值將會進行等待,直到通知發生,可是通知想要發生必需要靠“ProcessManager”線程來告訴你。
可是假如在等待過程中出現了大量的數據。導致System.IN滿了,此時“ProcessManager”線程非常傻非常傻的進入了等待狀態中,也將無法進行通知。而這邊也就無法往下走。無法到達第二次讀取,所以第二次讀取就非常隨機了,在大量數據下第二次讀取基本上就是擺設。也就是說無法正常的運行,終於也將導致死鎖。
解決的方法也非常easy。創建線程後我們能夠創建一個線程來專門讀取信息。直到“ProcessManager”線程通知結束的時候,才退出線程。
首先我們看看Process提供的“exitValue()”方法:
public int exitValue() { synchronized (exitValueMutex) { if (exitValue == null) { throw new IllegalThreadStateException( "Process has not yet terminated."); } return exitValue; } }
可見在”exitValue“沒有值時將會拋出異常而不會堵塞,所以能夠得出:”exitValue()“與”waitfor()“都能夠用於推斷線程是否完畢。可是一個是堵塞的一個是不堵塞的方法。在線程中當然使用不堵塞的來完畢我們的工作:
/** * 實例化一個ProcessModel * * @param process Process */ private ProcessModel(Process process) { //init this.process = process; //get out = process.getOutputStream(); in = process.getInputStream(); err = process.getErrorStream(); //in if (in != null) { isInReader = new InputStreamReader(in); bInReader = new BufferedReader(isInReader, BUFFER_LENGTH); } sbReader = new StringBuilder(); //start read thread readThread(); } .................... //讀取結果 private void read() { String str; //read In try { while ((str = bInReader.readLine()) != null) { sbReader.append(str); sbReader.append(BREAK_LINE); } } catch (Exception e) { e.printStackTrace(); Logs.e(TAG, e.getMessage()); } } /** * 啟動線程進行異步讀取結果 */ private void readThread() { Thread thread = new Thread(new Runnable() { @Override public void run() { // while (true) { try { process.exitValue(); //read last read(); break; } catch (IllegalThreadStateException e) { read(); } StaticFunction.sleepIgnoreInterrupt(300); } //read end int len; if (in != null) { try { while ((len = in.read(BUFFER)) > 0) { Logs.d(TAG, String.valueOf(len)); } } catch (IOException e) { e.printStackTrace(); Logs.e(TAG, e.getMessage()); } } //close close(); //done isDone = true; } }); thread.setName("DroidTestAgent.Test.TestModel.ProcessModel:ReadThread"); thread.setDaemon(true); thread.start(); }
當創建進程後把進程丟進我建立的類中實例化為一個進程管理類。隨後啟動線程,線程運行中調用進程的”exitValue()“,假設異常就進入讀取數據,直到不異常時再次讀取一次最後數據,隨後退出循環,退出後還讀取了一次底層的數據(這個事實上能夠不用要,純屬心理作用!)。最後寫入完畢標記。
當中”StaticFunction.sleepIgnoreInterrupt(300);“是我寫的靜態方法用於休眠等待而已。也就是Sleep。僅僅只是增加了try catch。
當然光是讀取IN流是不行的,還有Error流,這個時候就須要兩個線程來完畢。一個也行。只是我為了簡單採用了:ProcessBuilder類創建進程並重定向了錯誤流到IN流中。這樣簡化了操作。
而使用ProcessBuilder類須要註意的是同一個ProcessBuilder實例創建子進程的時候是須要進行線程同步操作的,由於假設並發操作將會導致進程參數錯誤等現象發生。所以建議加上線程相互排斥來實現。可是不建議反復創建ProcessBuilder實例。創建那麽多實例,何不把全部子進程放在一個ProcessBuilder實例裏邊。降低內存消耗啊,手機傷不起啊。
有必要提出的是,當線程推斷結束的時候,也就是退出值(exitvalue)有值得時候此時事實上在”ProcessManager“線程中已經殺掉了進程了。此時在進程中事實上沒有此進程了。有的也就是運行後的數據流而已。所以正常結束情況下無需自己調用”destroy()“方法。調用後將會觸發異常,說沒有找到此進程。
public void destroy() { try { kill(this.id); } catch (IOException e) { Logger.getLogger(Runtime.class.getName()).log(Level.FINE, "Failed to destroy process " + id + ".", e); } }
最終講完了。累啊;
最後給大家分享我自己弄得一個類(ProcessModel)。大家喜歡就直接拿去,假設有好的建議希望大家提出來:
import com.droidtestagent.journal.Logs; import com.droidtestagent.util.StaticFunction; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Create By Qiujuer * 2014-08-05 * <p/> * 運行命令行語句進程管理封裝 */ public class ProcessModel { private static final String TAG = "ProcessModel"; //換行符 private static final String BREAK_LINE; //錯誤緩沖 private static final byte[] BUFFER; //緩沖區大小 private static final int BUFFER_LENGTH; //創建進程時須要相互排斥進行 private static final Lock lock = new ReentrantLock(); //ProcessBuilder private static final ProcessBuilder prc; final private Process process; final private InputStream in; final private InputStream err; final private OutputStream out; final private StringBuilder sbReader; private BufferedReader bInReader = null; private InputStreamReader isInReader = null; private boolean isDone; /** * 靜態變量初始化 */ static { BREAK_LINE = "\n"; BUFFER_LENGTH = 128; BUFFER = new byte[BUFFER_LENGTH]; prc = new ProcessBuilder(); } /** * 實例化一個ProcessModel * * @param process Process */ private ProcessModel(Process process) { //init this.process = process; //get out = process.getOutputStream(); in = process.getInputStream(); err = process.getErrorStream(); //in if (in != null) { isInReader = new InputStreamReader(in); bInReader = new BufferedReader(isInReader, BUFFER_LENGTH); } sbReader = new StringBuilder(); //start read thread readThread(); } /** * 運行命令 * * @param params 命令參數 eg: "/system/bin/ping", "-c", "4", "-s", "100","www.qiujuer.net" */ public static ProcessModel create(String... params) { Process process = null; try { lock.lock(); process = prc.command(params) .redirectErrorStream(true) .start(); } catch (IOException e) { e.printStackTrace(); } finally { //sleep 100 StaticFunction.sleepIgnoreInterrupt(100); lock.unlock(); } if (process == null) return null; return new ProcessModel(process); } /** * 通過Android底層實現進程關閉 * * @param process 進程 */ public static void kill(Process process) { int pid = getProcessId(process); if (pid != 0) { try { android.os.Process.killProcess(pid); } catch (Exception e) { try { process.destroy(); } catch (Exception ex) { //ex.printStackTrace(); } } } } /** * 獲取進程的ID * * @param process 進程 * @return id */ public static int getProcessId(Process process) { String str = process.toString(); try { int i = str.indexOf("=") + 1; int j = str.indexOf("]"); str = str.substring(i, j); return Integer.parseInt(str); } catch (Exception e) { return 0; } } //讀取結果 private void read() { String str; //read In try { while ((str = bInReader.readLine()) != null) { sbReader.append(str); sbReader.append(BREAK_LINE); } } catch (Exception e) { e.printStackTrace(); Logs.e(TAG, e.getMessage()); } } /** * 啟動線程進行異步讀取結果 */ private void readThread() { Thread thread = new Thread(new Runnable() { @Override public void run() { //while to end while (true) { try { process.exitValue(); //read last read(); break; } catch (IllegalThreadStateException e) { read(); } StaticFunction.sleepIgnoreInterrupt(300); } //read end int len; if (in != null) { try { while ((len = in.read(BUFFER)) > 0) { Logs.d(TAG, String.valueOf(len)); } } catch (IOException e) { e.printStackTrace(); Logs.e(TAG, e.getMessage()); } } //close close(); //done isDone = true; } }); thread.setName("DroidTestAgent.Test.TestModel.ProcessModel:ReadThread"); thread.setDaemon(true); thread.start(); } /** * 獲取運行結果 * * @return 結果 */ public String getResult() { //waite process setValue try { process.waitFor(); } catch (Exception e) { e.printStackTrace(); Logs.e(TAG, e.getMessage()); } //until startRead en while (true) { if (isDone) break; StaticFunction.sleepIgnoreInterrupt(100); } //return if (sbReader.length() == 0) return null; else return sbReader.toString(); } /** * 關閉全部流 */ private void close() { //close out if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } //err if (err != null) { try { err.close(); } catch (IOException e) { e.printStackTrace(); } } //in if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (isInReader != null) { try { isInReader.close(); } catch (IOException e) { e.printStackTrace(); } } if (bInReader != null) { try { bInReader.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 銷毀 */ public void destroy() { //process try { process.destroy(); } catch (Exception ex) { kill(process); } } }
想了想還是把代碼托管到了GitHub上,方便以後分享其它的代碼。
地址:Android Utils
很歡迎大家找出不足發表問題。
[Java][Android][Process] Process 創建+控制+分析 經驗淺談