1. 程式人生 > 實用技巧 >java執行緒學習

java執行緒學習

//執行緒池建立,執行緒池提交任務使用Callable

int corePoolSize = 3;
int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什麼也不做,直接忽略
ExecutorService threadPoolExecutor = new
ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy); submitTisks(gatherDate, hbaseConfig, corePoolSize, threadPoolExecutor); /************************************使用Callable介面**************************************/ /** * 提交任務 * @param gatherDate * @param
hbaseConfig * @param corePoolSize * @param threadPoolExecutor */ private static void submitTisks(String gatherDate, Configuration hbaseConfig, int corePoolSize, ExecutorService threadPoolExecutor) { List<Future<Boolean>> taskFutureList = new ArrayList<>(corePoolSize);
for (int i = 0; i < corePoolSize; i++) { // 提交任務,任務的執行由執行緒池去呼叫執行並管理。 // 這裡獲取結果任務的Future,並放到list中,供所有任務提交完後,通過每個任務的Future判斷執行狀態和結果。 Future<Boolean> gpsfuture = threadPoolExecutor.submit(new GPSTask(hbaseConfig, gatherDate, xikangTidList, rootPath)); taskFutureList.add(gpsfuture); } int done = 0; //完成任務的數量 while (!taskFutureList.isEmpty()) { Iterator<Future<Boolean>> iter = taskFutureList.iterator(); while (iter.hasNext()) { Future<Boolean> fut = iter.next(); if (fut.isDone()) { try{ Boolean flag = fut.get(); if (flag){ done++;} }catch (Exception e){ e.printStackTrace(); } iter.remove(); } } // 停留一會,避免一直迴圈。 try { Thread.sleep(1000L); }catch (InterruptedException e){ e.printStackTrace(); } } } /** * gps執行緒 */ static class GPSTask implements Callable<Boolean> { Configuration config; String recordDate; List<Long> xkDatanoList; String rootPath; Thread currentThread; public GPSTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) { this.config = config; this.recordDate = recordDate; this.xkDatanoList = xkDatanoList; this.rootPath = rootPath; } @Override public Boolean call() throws Exception { this.currentThread = Thread.currentThread(); return FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread); } }

//執行緒池建立,執行緒池提交任務使用Thread

int corePoolSize = 3;
int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什麼也不做,直接忽略
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy);
GPSRunTask gpsRunTask = new GPSRunTask(hbaseConfig, gatherDate, xikangTidList, rootPath);
        threadPoolExecutor.execute(gpsRunTask);
try {
            gpsRunTask.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 // 等待已提交的任務全部結束 不再接受新的任務
        threadPoolExecutor.shutdown();
 /**
    * gps執行緒
    */
  static class GPSRunTask extends Thread  {
    Configuration config;
    String recordDate;
    List<Long> xkDatanoList;
    String rootPath;
    Thread currentThread;
    public GPSRunTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) {
        this.config = config;
        this.recordDate = recordDate;
        this.xkDatanoList = xkDatanoList;
        this.rootPath = rootPath;
    }
    @Override
    public void run(){
        this.currentThread = Thread.currentThread();
        //logger.info("gps執行緒:"+currentThread.getName());
        FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread);
    }
  }