使用多線程和並發編程(一)
一.先初步了解一下基本的概念
進程:在一個操作系統中,每個獨立執行的程序都可以是一個進程。
線程:一個程序至少有一個進程,一個進程至少有一個線程,java裏有一個主線程和垃圾回收線程。
線程的3中創建方式:
1.繼承Thread類
2.實現Runnable接口
3.實現Callable接口,和Future、線程池一起使用
線程的優先級:
優先級的返回是1-10,默認是5,數越大優先級越高。
join的作用是:
等待該線程終止,指的是主線程等待子線程的終止。子線程調用了join方法,只有子線程執行完之後才會調用主線程。(主線程需要用到子線程的處理結果是使用join)
二.多線程之間的狀態轉換
狀態之間裝換的demo:
這個例子裏面涉及到了線程的創建,運行,堵塞,等待,鎖池等狀態,主要是為了加深自己對狀態的理解。
package cn; public class ThreadDemo extends Thread { //標識 private boolean runningFlag=false; public ThreadDemo(){ runningFlag = false; } public synchronized void setRunningFlag(boolean runningFlag) { this.runningFlag = runningFlag; if(runningFlag){ this.notify(); }else{ try { System.out.println("線程"+Thread.currentThread().getName()+"開始等待"); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void run() { sayHello(); } //鎖池狀態 public synchronized void sayHello(){ while(true){ if(!runningFlag){ try { System.out.println("線程"+Thread.currentThread().getName()+"開始等待"); this.wait();//等待狀態 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else{ try { sleep(3000);//堵塞狀態 System.out.println("線程"+Thread.currentThread().getName()+"任務完成\n"); setRunningFlag(false); //讓當前線程處於等待任務狀態 } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { int i=10; while (i-->0) { //新建創建 ThreadDemo demo=new ThreadDemo(); demo.setName("demo"+i); demo.setRunningFlag(true); //可運行狀態,start之後等待cpu獲取時間片 demo.start(); } } }
三.使用4個線程分別計算1-25,26-50,51-75,76-100的值,最後求和。
實現思路:定義4個線程分別計算每份數據的和,然後把求的和加入到集合中,最後對集合中的值進行相加。
使用技術:使用ExecutorService、Callable、Future實現有返回結果的多線程。
shutdown:調用後,不可以再submit新的task,已經submit的將繼續執行。
shutdownNow:試圖停止當前正執行的task,並返回尚未執行的task的list
package cn; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ThreadAddFuture { //創建多個有返回值的任務 public static List<Future> futureList=new ArrayList<Future>(); public static void main(String[] args) throws InterruptedException, ExecutionException { int sum=0; int taskSize=4; ThreadAddFuture add=new ThreadAddFuture(); //創建一個線程池有4個 ExecutorService pool=Executors.newFixedThreadPool(taskSize); for (int i =1; i <=76;) { ThreadTest thread=add.new ThreadTest(i,i+24); //執行任務並獲取Future對象 Future<Integer> future=pool.submit(thread); //添加到futureList futureList.add(future); //正好實現4個線程分別計算1-25|26-50|51-75|76-100的和 i+=25; } //獲取所有並發任務的返回結果 if(futureList!=null && futureList.size()>0){ for(Future<Integer> future:futureList){ sum+=(Integer)future.get(); } } System.out.println("total result: "+sum); //關閉線程池 pool.shutdown(); } //實現Callable接口 class ThreadTest implements Callable<Integer>{ private int begin; private int end; private int sum=0; public ThreadTest(int begin,int end){ this.begin=begin; this.end=end; } @Override public Integer call() throws Exception { for (int i =begin; i <=end; i++) { sum+=i; } System.out.println("from "+Thread.currentThread().getName()+" sum="+sum); return sum; } } }
四.使用runnable、CountDownLatch、線程池的demo
CountDownLatch:CountDownLatch是一個同步的輔助器是一個計數器,只要計數器為0主線程就可以結束堵塞進行執行,和join很像但是比join更加靈活。
await:await() 方法會一直阻塞直到計數器為0,主線程才會繼續往下執行。
countDown():countDown() 方法將計數器減1。
CountDownLatch和join的區別:
調用thread.join() 方法必須等thread 執行完畢,當前線程才能繼續往下執行,而CountDownLatch通過計數器提供了更靈活的控制,只要檢測到計數器為0當前線程就可以往下執行而不用管相應的thread是否執行完畢。
package cn; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class WatchThread { private String name=UUID.randomUUID().toString(); public void testThread() throws InterruptedException{ int threadNum=10; //創建計數器 CountDownLatch threadSignal=new CountDownLatch(threadNum);
//創建線程池 ExecutorService executor=Executors.newFixedThreadPool(threadNum); for (int i = 0; i <threadNum; i++) { TestThread task=new TestThread(threadSignal); //執行任務 executor.execute(task); }
//堵塞線程 threadSignal.await();
//關閉線程池 executor.shutdown();
System.out.println(Thread.currentThread().getName() + "+++++++結束."); } public static void main(String[] args) throws InterruptedException{ WatchThread test=new WatchThread(); test.testThread(); } private class TestThread implements Runnable{ private CountDownLatch threadsSignal; public TestThread(CountDownLatch threadsSignal){ this.threadsSignal=threadsSignal; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "開始..." + name); System.out.println("開始了線程::::" + threadsSignal.getCount()); threadsSignal.countDown();//必須等核心處理邏輯處理完成後才可以減1 System.out.println(Thread.currentThread().getName() + "結束. 還有" + threadsSignal.getCount() + " 個線程"); } } }
五.模擬實現消息的發送,一個服務端,一個客戶端。
實現思路:
發送的話,要保證信息100%的發送給客戶端,那麽發給客戶端之後,客戶端返回一個消息告訴服務器,已經收到。當服務器一直沒有收到客戶端返回的消息,那麽服務器會一直發 送這個信息,直到客戶端發送回確認信息,這時候再刪除重復發送的這個信息。
在服務端創建一個ConcurrentHashMap,當客戶端正確接收服務端發送的數據並返回成功標識,從ConcurrentHashMap中刪除成功的消息。
1.創建PushThread類向客戶端發送數據
package cn1; import java.util.Map.Entry; public class PushThread extends Thread{ //發送代碼,是不斷遍歷內存對象councurrenthashmap,從中取出信息,不斷的重發 @Override public void run() { try { //重發消息 for(Entry<Integer,String> hashMap:MainThread.pushmessage.entrySet()){ System.out.println("消息id:"+hashMap.getKey()+"未發送成功,在此重發:"+hashMap.getValue()); } sleep(1000); } catch (Exception e) { e.printStackTrace(); } } }
2.創建RemoveThread類接收客戶端返回的數據,成功從ConcurrentHashMap中刪除消息
package cn1; import java.util.Map.Entry; public class RemoveThread extends Thread { //循環的接收客戶端返回的消息,返回成功從concurrentHashMap中刪除 @Override public void run() { try { for (int i = 0; i < 10000; i++) { sleep(2000); for(Entry<Integer, String> map:MainThread.pushmessage.entrySet()){ if (map.getKey()==i) { System.out.println("成功收到id為:"+map.getKey()+"返回的信息,刪除該元素"); MainThread.pushmessage.remove(map.getKey()); } } System.out.println("內存對象中的元素數量為:"+MainThread.pushmessage.size()); } } catch (InterruptedException e) { e.printStackTrace(); } } }
3.創建MainThread類,向concurrentHashMap中添加測試數據
package cn1; import java.util.concurrent.ConcurrentHashMap; public class MainThread { /** * 消息的發送,一個是服務器,一個是客戶端。發送的話,要保證信息100%的發送給客戶端,那麽發給客戶端之後,客戶端返回一個消息告訴服務器,已經收到。當服務器一直沒有收到客戶端返回的消息,那麽服務器會一直發送這個信息,直到客戶端發送回確認信息,這時候再刪除重復發送的這個信息。 */ public static ConcurrentHashMap<Integer, String> pushmessage=new ConcurrentHashMap<Integer,String>(); public static void main(String[] args) { for (int i = 0; i < 10; i++) { pushmessage.put(i, "該消息是id為"+i+"的消息"); } Thread pushThread=new PushThread(); Thread remove=new RemoveThread(); pushThread.start(); remove.start(); for (int i = 10; i < 20; i++) { pushmessage.put(i, "又一波到來,消息是id為"+i+"的消息"); } } }
六.synchronized的實現原理
synchronized實現同步的基礎:java中的每一個對象都可以是鎖,主要有3種形式:
. 對於普通同步方法,鎖的是實例對象。
.對於靜態同步方法,鎖的是當前類的Class對象。
. 對於同步代碼塊,鎖的是括號裏配置的對象。
JVM是基於進入和退出Moniter對象來實現同步和代碼塊同步的,同步代碼塊是基於monitorenter和monitorexit實現的,同步方法ACC_synchronized實現的,monitorenter放在同步代碼塊的開始位置,monitorexit放在同步代碼塊的結束位置,必須是成對出現的。當前一個moniter被持有後,它將處於鎖定狀態,線程執行到moniteorenter時,獲得monitor的所有權,所得對象的鎖。
synchronized獲得的鎖是存放在對象頭裏的,堆中的對象由對象頭,實例變量和填充數據組成。對象頭的markword存儲的是HashCode、分代年齡和鎖標記位,鎖標記位有無鎖狀態、偏向鎖、輕量級鎖、重量級鎖。對象頭還會存儲是否是偏向鎖的標識。
使用多線程和並發編程(一)