java執行緒學習
阿新 • • 發佈:2020-07-14
//執行緒池建立,執行緒池提交任務使用Callable
int corePoolSize = 3; int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什麼也不做,直接忽略 ExecutorService threadPoolExecutor = newThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy); submitTisks(gatherDate, hbaseConfig, corePoolSize, threadPoolExecutor); /************************************使用Callable介面**************************************/ /** * 提交任務 * @param gatherDate * @paramhbaseConfig * @param corePoolSize * @param threadPoolExecutor */ private static void submitTisks(String gatherDate, Configuration hbaseConfig, int corePoolSize, ExecutorService threadPoolExecutor) { List<Future<Boolean>> taskFutureList = new ArrayList<>(corePoolSize);for (int i = 0; i < corePoolSize; i++) { // 提交任務,任務的執行由執行緒池去呼叫執行並管理。 // 這裡獲取結果任務的Future,並放到list中,供所有任務提交完後,通過每個任務的Future判斷執行狀態和結果。 Future<Boolean> gpsfuture = threadPoolExecutor.submit(new GPSTask(hbaseConfig, gatherDate, xikangTidList, rootPath)); taskFutureList.add(gpsfuture); } int done = 0; //完成任務的數量 while (!taskFutureList.isEmpty()) { Iterator<Future<Boolean>> iter = taskFutureList.iterator(); while (iter.hasNext()) { Future<Boolean> fut = iter.next(); if (fut.isDone()) { try{ Boolean flag = fut.get(); if (flag){ done++;} }catch (Exception e){ e.printStackTrace(); } iter.remove(); } } // 停留一會,避免一直迴圈。 try { Thread.sleep(1000L); }catch (InterruptedException e){ e.printStackTrace(); } } } /** * gps執行緒 */ static class GPSTask implements Callable<Boolean> { Configuration config; String recordDate; List<Long> xkDatanoList; String rootPath; Thread currentThread; public GPSTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) { this.config = config; this.recordDate = recordDate; this.xkDatanoList = xkDatanoList; this.rootPath = rootPath; } @Override public Boolean call() throws Exception { this.currentThread = Thread.currentThread(); return FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread); } }
//執行緒池建立,執行緒池提交任務使用Thread
int corePoolSize = 3; int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什麼也不做,直接忽略 ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy); GPSRunTask gpsRunTask = new GPSRunTask(hbaseConfig, gatherDate, xikangTidList, rootPath); threadPoolExecutor.execute(gpsRunTask); try { gpsRunTask.join(); } catch (InterruptedException e) { e.printStackTrace(); } // 等待已提交的任務全部結束 不再接受新的任務 threadPoolExecutor.shutdown(); /** * gps執行緒 */ static class GPSRunTask extends Thread { Configuration config; String recordDate; List<Long> xkDatanoList; String rootPath; Thread currentThread; public GPSRunTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) { this.config = config; this.recordDate = recordDate; this.xkDatanoList = xkDatanoList; this.rootPath = rootPath; } @Override public void run(){ this.currentThread = Thread.currentThread(); //logger.info("gps執行緒:"+currentThread.getName()); FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread); } }