Java多執行緒應用例項
阿新 • • 發佈:2019-01-10
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { //最大執行緒數 private static final int MaxWorkerNumber=10; //預設執行緒數 private static final int DefaultWorkerNumber=5; //最小執行緒數 private static final int MinWorkerNumber=1; //這是一個任務佇列,將會向裡面插入任務 private final LinkedList<Job> jobs=new LinkedList<Job>(); //工作者列表 private final List<Worker> workers= Collections.synchronizedList(new ArrayList<Worker>()); //工作者執行緒的數量 private int workerNum=DefaultWorkerNumber; //執行緒編號生成 private AtomicLong threadNum=new AtomicLong(); public DefaultThreadPool(){ initializeWorker(DefaultWorkerNumber); } public DefaultThreadPool(int num){ workerNum=num>MaxWorkerNumber?MaxWorkerNumber: num<MinWorkerNumber?MinWorkerNumber:num; initializeWorker(workerNum); } //初始化工作者 private void initializeWorker(int num){ for(int i=0;i<num;i++){ Worker worker=new Worker(); workers.add(worker); Thread thread=new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet()); System.out.println("新增工作者執行緒:ThreadPool-Worker-"+threadNum.get()); thread.start(); } } @Override public void execute(Job job) { if(job!=null){ //新增一個任務,然後進行通知 synchronized(jobs){ jobs.addLast(job); jobs.notifyAll(); System.out.println("提交任務:"+job.toString()); } } } @Override public void shutDown() { for(Worker worker:workers){ worker.shutdown(); } System.err.println(" 關閉執行緒池! "); //注意這裡需要通知等待佇列中的工作者執行緒 synchronized(jobs){ jobs.notifyAll(); } } @Override public void addWorkers(int num) { //限定新增的Worder數量不能超過最大值 if(num+this.workerNum>MaxWorkerNumber){ num=MaxWorkerNumber-this.workerNum; } synchronized(workers){ initializeWorker(num); this.workerNum+=num; } } @Override public void removeWorker(int num) { if(num>=this.workerNum){ throw new IllegalArgumentException("beyond workNum"); } synchronized(workers){ //按照給定的數量停止Worker int count=num; while(count>0){ Worker worker=workers.get(0); if(workers.remove(worker)){ worker.shutdown(); count--; } } this.workerNum-=num; } } @Override public int getJobSize() { return jobs.size(); } //工作者執行緒消費任務 public class Worker implements Runnable{ //是否工作 private volatile boolean running=true; @Override public void run() { while(true){ Job job=null; synchronized(jobs){ //如果任務佇列是空的,那麼就wait while(jobs.isEmpty()){ if(running==false){ System.out.println(Thread.currentThread().getName()+"結束"); return; } try { jobs.wait(); } catch (InterruptedException e) { //感知外部對WorkerThread的中斷操作,返回 Thread.currentThread().interrupt(); return; } } if(running==false){ System.out.println(Thread.currentThread().getName()+"結束"); return; } //取出一個Job job=jobs.removeFirst(); } if(job!=null){ try{ System.out.println(Thread.currentThread().getName()+" 執行任務 "+job.toString()); job.run(); }catch(Exception e){ //忽略Job執行中的Exception } } } } //關閉當前工作者 public void shutdown(){ running=false; System.err.println("關閉一個工作者執行緒"); } } }
public class ThreadLocalTest { //建立一個Integer型的執行緒本地變數 public static final ThreadLocal<Integer> local = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return 0; } }; //計數 static class Counter implements Runnable{ @Override public void run() { //獲取當前執行緒的本地變數,然後累加5次 int num = local.get(); for (int i = 0; i < 100; i++) { num++; } //重新設定累加後的本地變數 local.set(num); System.out.println(Thread.currentThread().getName() + " : "+ local.get()); } } public static void main(String[] args) throws InterruptedException { Thread[] threads = new Thread[5]; for (int i = 0; i < 5; i++) { threads[i] = new Thread(new Counter() ,"CounterThread-[" + i+"]"); threads[i].start(); } } }