執行緒間通訊的幾種方式
執行緒間的通訊方式
①同步
這裡講的同步是指多個執行緒通過synchronized關鍵字這種方式來實現執行緒間的通訊。
參考示例:
public class MyObject { synchronized public void methodA() { //do something.... } synchronized public void methodB() { //do some other thing } } public class ThreadA extends Thread { private MyObject object;//省略構造方法 @Override public void run() { super.run(); object.methodA(); } } public class ThreadB extends Thread { private MyObject object; //省略構造方法 @Override public void run() { super.run(); object.methodB(); } } public class Run { public staticvoid main(String[] args) { MyObject object = new MyObject(); //執行緒A與執行緒B 持有的是同一個物件:object ThreadA a = new ThreadA(object); ThreadB b = new ThreadB(object); a.start(); b.start(); } }
由於執行緒A和執行緒B持有同一個MyObject類的物件object,儘管這兩個執行緒需要呼叫不同的方法,但是它們是同步執行的,比如:執行緒B需要等待執行緒A執行完了methodA()方法之後,它才能執行methodB()方法。這樣,執行緒A和執行緒B就實現了 通訊。
這種方式,本質上就是“共享記憶體”式的通訊。多個執行緒需要訪問同一個共享變數,誰拿到了鎖(獲得了訪問許可權),誰就可以執行。
②while輪詢的方式
程式碼如下:
1 import java.util.ArrayList; 2 import java.util.List; 3 4 public class MyList { 5 6 private List<String> list = new ArrayList<String>(); 7 public void add() { 8 list.add("elements"); 9 } 10 public int size() { 11 return list.size(); 12 } 13 } 14 15 import mylist.MyList; 16 17 public class ThreadA extends Thread { 18 19 private MyList list; 20 21 public ThreadA(MyList list) { 22 super(); 23 this.list = list; 24 } 25 26 @Override 27 public void run() { 28 try { 29 for (int i = 0; i < 10; i++) { 30 list.add(); 31 System.out.println("添加了" + (i + 1) + "個元素"); 32 Thread.sleep(1000); 33 } 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 } 38 } 39 40 import mylist.MyList; 41 42 public class ThreadB extends Thread { 43 44 private MyList list; 45 46 public ThreadB(MyList list) { 47 super(); 48 this.list = list; 49 } 50 51 @Override 52 public void run() { 53 try { 54 while (true) { 55 if (list.size() == 5) { 56 System.out.println("==5, 執行緒b準備退出了"); 57 throw new InterruptedException(); 58 } 59 } 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 66 import mylist.MyList; 67 import extthread.ThreadA; 68 import extthread.ThreadB; 69 70 public class Test { 71 72 public static void main(String[] args) { 73 MyList service = new MyList(); 74 75 ThreadA a = new ThreadA(service); 76 a.setName("A"); 77 a.start(); 78 79 ThreadB b = new ThreadB(service); 80 b.setName("B"); 81 b.start(); 82 } 83 }
在這種方式下,執行緒A不斷地改變條件,執行緒ThreadB不停地通過while語句檢測這個條件(list.size()==5)是否成立 ,從而實現了執行緒間的通訊。但是這種方式會浪費CPU資源。之所以說它浪費資源,是因為JVM排程器將CPU交給執行緒B執行時,它沒做啥“有用”的工作,只是在不斷地測試 某個條件是否成立。就類似於現實生活中,某個人一直看著手機螢幕是否有電話來了,而不是: 在幹別的事情,當有電話來時,響鈴通知TA電話來了。關於執行緒的輪詢的影響,可參考:JAVA多執行緒之當一個執行緒在執行死迴圈時會影響另外一個執行緒嗎?
這種方式還存在另外一個問題:
輪詢的條件的可見性問題,關於記憶體可見性問題,可參考:JAVA多執行緒之volatile 與 synchronized 的比較中的第一點“一,volatile關鍵字的可見性”
執行緒都是先把變數讀取到本地執行緒棧空間,然後再去再去修改的本地變數。因此,如果執行緒B每次都在取本地的 條件變數,那麼儘管另外一個執行緒已經改變了輪詢的條件,它也察覺不到,這樣也會造成死迴圈。
③wait/notify機制
程式碼如下:
1 import java.util.ArrayList; 2 import java.util.List; 3 4 public class MyList { 5 6 private static List<String> list = new ArrayList<String>(); 7 8 public static void add() { 9 list.add("anyString"); 10 } 11 12 public static int size() { 13 return list.size(); 14 } 15 } 16 17 18 public class ThreadA extends Thread { 19 20 private Object lock; 21 22 public ThreadA(Object lock) { 23 super(); 24 this.lock = lock; 25 } 26 27 @Override 28 public void run() { 29 try { 30 synchronized (lock) { 31 if (MyList.size() != 5) { 32 System.out.println("wait begin " 33 + System.currentTimeMillis()); 34 lock.wait(); 35 System.out.println("wait end " 36 + System.currentTimeMillis()); 37 } 38 } 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 } 42 } 43 } 44 45 46 public class ThreadB extends Thread { 47 private Object lock; 48 49 public ThreadB(Object lock) { 50 super(); 51 this.lock = lock; 52 } 53 54 @Override 55 public void run() { 56 try { 57 synchronized (lock) { 58 for (int i = 0; i < 10; i++) { 59 MyList.add(); 60 if (MyList.size() == 5) { 61 lock.notify(); 62 System.out.println("已經發出了通知"); 63 } 64 System.out.println("添加了" + (i + 1) + "個元素!"); 65 Thread.sleep(1000); 66 } 67 } 68 } catch (InterruptedException e) { 69 e.printStackTrace(); 70 } 71 } 72 } 73 74 public class Run { 75 76 public static void main(String[] args) { 77 78 try { 79 Object lock = new Object(); 80 81 ThreadA a = new ThreadA(lock); 82 a.start(); 83 84 Thread.sleep(50); 85 86 ThreadB b = new ThreadB(lock); 87 b.start(); 88 } catch (InterruptedException e) { 89 e.printStackTrace(); 90 } 91 } 92 }
執行緒A要等待某個條件滿足時(list.size()==5),才執行操作。執行緒B則向list中新增元素,改變list 的size。
A,B之間如何通訊的呢?也就是說,執行緒A如何知道 list.size() 已經為5了呢?
這裡用到了Object類的 wait() 和 notify() 方法。
當條件未滿足時(list.size() !=5),執行緒A呼叫wait() 放棄CPU,並進入阻塞狀態。---不像②while輪詢那樣佔用CPU
當條件滿足時,執行緒B呼叫 notify()通知 執行緒A,所謂通知執行緒A,就是喚醒執行緒A,並讓它進入可執行狀態。
這種方式的一個好處就是CPU的利用率提高了。
但是也有一些缺點:比如,執行緒B先執行,一下子添加了5個元素並呼叫了notify()傳送了通知,而此時執行緒A還執行;當執行緒A執行並呼叫wait()時,那它永遠就不可能被喚醒了。因為,執行緒B已經發了通知了,以後不再發通知了。這說明:通知過早,會打亂程式的執行邏輯。
④管道通訊就是使用java.io.PipedInputStream 和 java.io.PipedOutputStream進行通訊
具體就不介紹了。分散式系統中說的兩種通訊機制:共享記憶體機制和訊息通訊機制。感覺前面的①中的synchronized關鍵字和②中的while輪詢 “屬於” 共享記憶體機制,由於是輪詢的條件使用了volatile關鍵字修飾時,這就表示它們通過判斷這個“共享的條件變數“是否改變了,來實現程序間的交流。
而管道通訊,更像訊息傳遞機制,也就是說:通過管道,將一個執行緒中的訊息傳送給另一個。
java執行緒間通訊:
1:執行緒上下文
2:共享記憶體
3:IPC通訊
4:套接字(Socket),不同的機器之間進行通訊
另外:附註通訊內容:
linux常用的程序間的通訊方式(1)、管道(pipe):管道可用於具有親緣關係的程序間的通訊,是一種半雙工的方式,資料只能單向流動,允許一個程序和另一個與它有共同祖先的程序之間進行通訊。
(2)、命名管道(named pipe):命名管道克服了管道沒有名字的限制,同時除了具有管道的功能外(也是半雙工),它還允許無親緣關係程序間的通訊。命名管道在檔案系統中有對應的檔名。命名管道通過命令mkfifo或系統呼叫mkfifo來建立。
(3)、訊號(signal):訊號是比較複雜的通訊方式,用於通知接收程序有某種事件發生了,除了程序間通訊外,程序還可以傳送訊號給程序本身;linux除了支援Unix早期訊號語義函式sigal外,還支援語義符合Posix.1標準的訊號函式sigaction(實際上,該函式是基於BSD的,BSD為了實現可靠訊號機制,又能夠統一對外介面,用sigaction函式重新實現了signal函式)。
(4)、訊息佇列:訊息佇列是訊息的連結表,包括Posix訊息佇列system V訊息佇列。有足夠許可權的程序可以向佇列中新增訊息,被賦予讀許可權的程序則可以讀走佇列中的訊息。訊息佇列克服了訊號承載資訊量少,管道只能承載無格式位元組流以及緩衝區大小受限等缺
(5)、共享記憶體:使得多個程序可以訪問同一塊記憶體空間,是最快的可用IPC形式。是針對其他通訊機制執行效率較低而設計的。往往與其它通訊機制,如訊號量結合使用,來達到程序間的同步及互斥。
(6)、記憶體對映:記憶體對映允許任何多個程序間通訊,每一個使用該機制的程序通過把一個共享的檔案對映到自己的程序地址空間來實現它。
(7)、訊號量(semaphore):主要作為程序間以及同一程序不同執行緒之間的同步手段。
(8)、套接字(Socket):更為一般的程序間通訊機制,可用於不同機器之間的程序間通訊。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支援套接字。