1. 程式人生 > >執行緒與執行緒池

執行緒與執行緒池

1.Callable和Runnable

I    Callable定義的方法是call,而Runnable定義的方法是run。

II   Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。

III  Callable的call方法可丟擲異常,而Runnable的run方法不能丟擲異常。

    Runnable 不做具體介紹

通過實現Callable介面來建立Thread執行緒:其中,Callable介面(也只有一個方法)定義如下:

public interface Callable<V>   {       V call() throws Exception;   } 步驟1:建立實現Callable介面的類SomeCallable<Integer>(略);

步驟2:建立一個類物件:

      Callable<Integer> oneCallable = new SomeCallable<Integer>();

步驟3:由Callable<Integer>建立一個FutureTask<Integer>物件:

      FutureTask<Integer> oneTask = new FutureTask<Integer>(oneCallable);

      註釋:FutureTask<Integer>是一個包裝器,它通過接受Callable<Integer>來建立,它同時實現了Future和Runnable介面。 步驟4:由FutureTask<Integer>建立一個Thread物件:

       Thread oneThread = new Thread(oneTask);

步驟5:啟動執行緒:

       oneThread.start();

至此,一個執行緒就建立完成了。

例:

  1. package reed.meituan.com;
  2. import java.util.Random;
  3. import java.util.concurrent.Callable;
  4. import java.util.concurrent.FutureTask;
  5. public class CallableAndFuture1 {
  6. public static void main(String[] args)
    throws Exception
    {
  7. Callable<Integer> callable = new Callable<Integer>() {
  8. @Override
  9. public Integer call() throws Exception {
  10. return new Random().nextInt(100);
  11. }
  12. };
  13. FutureTask<Integer> futureTask = new FutureTask<Integer>(callable);
  14. new Thread(futureTask).start();
  15. System.out.println(futureTask.get());
  16. }
  17. }
callable和future,一個產生結果,一個拿到結果

FutureTask實現了兩個介面,Runnable和Future,所以它既可以作為Runnable被執行緒執行,又可以作為Future得到Callable的返回值..

 下面來看另一種方式使用Callable和Future,通過ExecutorService的submit方法執行Callable,並返回Future,程式碼如下:

  1. import java.util.Random;
  2. import java.util.concurrent.*;
  3. public class CallableAndFuture {
  4. public static void main(String[] args) throws Exception{
  5. ExecutorService threadPool = Executors.newSingleThreadExecutor();
  6. Future<Integer> future = threadPool.submit(new Callable<Integer>() {
  7. @Override
  8. public Integer call() throws Exception {
  9. return new Random().nextInt(100);
  10. }
  11. });
  12. Thread.sleep(5000);
  13. System.out.println(future.get());
  14. }
  15. }

2.鎖的相關概念介紹

1)可重入鎖

  如果鎖具備可重入性,則稱作為可重入鎖。像synchronized和ReentrantLock都是可重入鎖,可重入性在我看來實際上表明瞭鎖的分配機制:基於執行緒的分配,而不是基於方法呼叫的分配。

舉個簡單的例子,當一個執行緒執行到某個synchronized方法時,比如說method1,而在method1中會呼叫另外一個synchronized方法method2,此時執行緒不必重新去申請鎖,而是可以直接執行方法method2。

  看下面這段程式碼就明白了:

class MyClass {

    public synchronized void method1() {

        method2();

    }

    public synchronized void method2() {

    }

}

上述程式碼中的兩個方法method1和method2都用synchronized修飾了,假如某一時刻,執行緒A執行到了method1,此時執行緒A獲取了這個物件的鎖,而由於method2也是synchronized方法,

假如synchronized不具備可重入性,此時執行緒A需要重新申請鎖。但是這就會造成一個問題,因為執行緒A已經持有了該物件的鎖,而又在申請獲取該物件的鎖,這樣就會執行緒A一直等待永遠不會獲取到的鎖。

而由於synchronized和Lock都具備可重入性,所以不會發生上述現象。

2)可中斷鎖

  可中斷鎖:顧名思義,就是可以相應中斷的鎖。

  在Java中,synchronized就不是可中斷鎖,而Lock是可中斷鎖。

  如果某一執行緒A正在執行鎖中的程式碼,另一執行緒B正在等待獲取該鎖,可能由於等待時間過長,執行緒B不想等待了,想先處理其他事情,

       我們可以讓它中斷自己或者在別的執行緒中中斷它,這種就是可中斷鎖。

  lockInterruptibly()可體現了Lock的可中斷性。

  1. import java.util.concurrent.TimeUnit;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. /**
  5. * Created by fanqunsong on 2017/8/5.
  6. */
  7. public class LockTest {
  8. public static void main(String[] args) {
  9. Thread t1 = new Thread(new RunIt());
  10. Thread t2 = new Thread(new RunIt());
  11. t1.start();
  12. t2.start();
  13. t2.interrupt();
  14. }
  15. }
  16. class RunIt implements Runnable{
  17. private static Lock lock = new ReentrantLock();
  18. @Override
  19. public void run() {
  20. try {
  21. //----------------------a
  22.  //lock.lock();
  23. lock.lockInterruptibly();
  24. System.out.println(Thread.currentThread().getName() + " running");
  25. TimeUnit.SECONDS.sleep(5);
  26. lock.unlock();
  27. System.out.println(Thread.currentThread().getName()+" finishend");
  28. } catch (InterruptedException e) {
  29. System.out.println(Thread.currentThread().getName()+ " interrupted");
  30. }
  31. }
  32. }
如果a處 是lock.lock(); 輸出 Thread-0 running (這裡休眠了5s) Thread-0 finished Thread-1 running Thread-1 interrupted

============================ 如果a處是lock.lockInterruptibly() Thread-0 running Thread-1 interrupted (這裡休眠了5s) Thread-0 finished

3)公平鎖

  公平鎖即儘量以請求鎖的順序來獲取鎖。比如同是有多個執行緒在等待一個鎖,當這個鎖被釋放時,等待時間最久的執行緒(最先請求的執行緒)會獲得該所,這種就是公平鎖。

  非公平鎖即無法保證鎖的獲取是按照請求鎖的順序進行的。這樣就可能導致某個或者一些執行緒永遠獲取不到鎖。

  在Java中,synchronized就是非公平鎖,它無法保證等待的執行緒獲取鎖的順序。

  而對於ReentrantLock和ReentrantReadWriteLock,它預設情況下是非公平鎖,但是可以設定為公平鎖。

       在ReentrantLock中定義了2個靜態內部類,一個是NotFairSync,一個是FairSync,分別用來實現非公平鎖和公平鎖。

  我們可以在建立ReentrantLock物件時,通過以下方式來設定鎖的公平性:

        ReentrantLock lock = new ReentrantLock(true);

   如果引數為true表示為公平鎖,為fasle為非公平鎖。預設情況下,如果使用無參構造器,則是非公平鎖。

4)讀寫鎖

  讀寫鎖將對一個資源(比如檔案)的訪問分成了2個鎖,一個讀鎖和一個寫鎖。

  正因為有了讀寫鎖,才使得多個執行緒之間的讀操作不會發生衝突。

  ReadWriteLock就是讀寫鎖,它是一個介面,ReentrantReadWriteLock實現了這個介面。

  可以通過readLock()獲取讀鎖,通過writeLock()獲取寫鎖。

       在多執行緒開發中,經常會出現一種情況,我們希望讀寫分離。就是對於讀取這個動作來說,可以同時有多個執行緒同

時去讀取這個資源,但是對於寫這個動作來說,只能同時有一個執行緒來操作,而且同時,當有一個寫執行緒在操作這個資

源的時候,其他的讀執行緒是不能來操作這個資源的,這樣就極大的發揮了多執行緒的特點,能很好的將多執行緒的能力發揮。

  1. package reed.thread;
  2. import java.util.Random;
  3. import java.util.concurrent.locks.ReadWriteLock;
  4. import java.util.concurrent.locks.ReentrantReadWriteLock;
  5. public class ReadWriteLockTest {
  6. public static void main(String[] args) {
  7. final Data data = new Data();
  8. for (int i = 0; i <3 ; i++) {
  9. new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. for(int j = 0; j <5;j++) {
  13. data.set(new Random().nextInt(100));
  14. }
  15. }
  16. }
  17. ).start();
  18. }
  19. for (int i = 0; i <3; i++) {
  20. new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. for(int j = 0; j <5;j++){
  24. data.get();
  25. }
  26. }
  27. }).start();
  28. }
  29. }
  30. }
  31. class Data{
  32. private int data;//共享資料1
  33. private ReadWriteLock rwl = new ReentrantReadWriteLock();
  34. public void set(int data){
  35. rwl.writeLock().lock();// 取到寫鎖
  36. try{
  37. System.out.println(Thread.currentThread().getName()+"準備寫入資料");
  38. try {
  39. Thread.sleep(20);
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. this.data = data;
  44. System.out.println(Thread.currentThread().getName() + "寫入" + this.data);
  45. }finally {
  46. rwl.writeLock().unlock();
  47. }
  48. }
  49. public void get() {
  50. rwl.readLock().lock();// 取到讀鎖
  51. try {
  52. System.out.println(Thread.currentThread().getName() + "準備讀取資料");
  53. try {
  54. Thread.sleep(20);
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. }
  58. System.out.println(Thread.currentThread().getName() + "讀取" + this.data);
  59. } finally {
  60. rwl.readLock().unlock();// 釋放讀鎖
  61. }
  62. }
  63. }
執行結果 Thread-1準備寫入資料 Thread-1寫入57 Thread-4準備讀取資料 Thread-3準備讀取資料 Thread-5準備讀取資料 Thread-4讀取57 Thread-5讀取57 Thread-3讀取57 Thread-2準備寫入資料 Thread-2寫入64 Thread-2準備寫入資料 Thread-2寫入82 Thread-2準備寫入資料 Thread-2寫入26 Thread-4準備讀取資料 Thread-5準備讀取資料 Thread-3準備讀取資料 Thread-5讀取26 Thread-4讀取26 Thread-3讀取26 Thread-4準備讀取資料 Thread-5準備讀取資料 Thread-3準備讀取資料 Thread-5讀取26 Thread-3讀取26 Thread-4讀取26 Thread-3準備讀取資料 Thread-5準備讀取資料 Thread-4準備讀取資料 Thread-5讀取26 Thread-3讀取26 Thread-4讀取26 Thread-3準備讀取資料 Thread-5準備讀取資料 Thread-4準備讀取資料 Thread-4讀取26 Thread-5讀取26 Thread-3讀取26
  1. import java.util.concurrent.locks.ReentrantReadWriteLock;
  2. /**
  3. * 使用讀寫鎖,可以實現讀寫分離鎖定,讀操作併發進行,寫操作鎖定單個執行緒
  4. *
  5. * 如果有一個執行緒已經佔用了讀鎖,則此時其他執行緒如果要申請寫鎖,則申請寫鎖的執行緒會一直等待釋放讀鎖。
  6. * 如果有一個執行緒已經佔用了寫鎖,則此時其他執行緒如果申請寫鎖或者讀鎖,則申請的執行緒會一直等待釋放寫鎖。
  7. * @author
  8. *
  9. */
  10. public class MyReentrantReadWriteLock {
  11. private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  12. public static void main(String[] args) {
  13. final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock();
  14. new Thread(){
  15. public void run() {
  16. test.get(Thread.currentThread());
  17. test.write(Thread.currentThread());
  18. };
  19. }.start();
  20. new Thread(){
  21. public void run() {
  22. test.get(Thread.currentThread());
  23. test.write(Thread.currentThread());
  24. };
  25. }.start();
  26. }
  27. /**
  28. * 讀操作,用讀鎖來鎖定
  29. * @param thread
  30. */
  31. public void get(Thread thread) {
  32. rwl.readLock().lock();
  33. try {
  34. long start = System.currentTimeMillis();
  35. while(System.currentTimeMillis() - start <= 1) {
  36. System.out.println(thread.getName()+"正在進行讀操作");
  37. }
  38. System.out.println(thread.getName()+"讀操作完畢");
  39. } finally {
  40. rwl.readLock().unlock();
  41. }
  42. }
  43. /**
  44. * 寫操作,用寫鎖來鎖定
  45. * @param thread
  46. */
  47. public void write(Thread thread) {
  48. rwl.writeLock().lock();;
  49. try {
  50. long start = System.currentTimeMillis();
  51. while(System.currentTimeMillis() - start <= 1) {
  52. System.out.println(thread.getName()+"正在進行寫操作");
  53. }
  54. System.out.println(thread.getName()+"寫操作完畢");
  55. } finally {
  56. rwl.writeLock().unlock();
  57. }
  58. }
  59. }

一個執行緒讀的時候另外一個執行緒也可以讀

5)條件變數condition

條件變數很大一個程度上是為了解決Object.wait/notify/notifyAll難以使用的問題。

await*對應於Object.waitsignal對應於Object.notifysignalAll對應於Object.notifyAll。特別說明的是Condition的介面改變名稱就是為了避免與

Object中的wait/notify/notifyAll的語義和使用上混淆,因為Condition同樣有wait/notify/notifyAll方法。

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class Test{
  5. static class NumberWrapper {
  6. public int value = 1;
  7. }
  8. public static void main(String[] args) {
  9. //初始化可重入鎖
  10. final Lock lock = new ReentrantLock();
  11. //第一個條件當螢幕上輸出到3
  12. final Condition reachThreeCondition = lock.newCondition();
  13. //第二個條件當螢幕上輸出到6
  14. final Condition reachSixCondition = lock.newCondition();
  15. //NumberWrapper只是為了封裝一個數字,一邊可以將數字物件共享,並可以設定為final
  16. //注意這裡不要用Integer, Integer 是不可變物件
  17. final NumberWrapper num = new NumberWrapper();
  18. //初始化A執行緒
  19. Thread threadA = new Thread(new Runnable() {
  20. @Override
  21. public void run() {
  22. //需要先獲得鎖
  23. lock.lock();
  24. try {
  25. System.out.println("threadA start write");
  26. //A執行緒先輸出前3個數
  27. while (num.value <= 3) {
  28. System.out.println(num.value);
  29. num.value++;
  30. }
  31. //輸出到3時要signal,告訴B執行緒可以開始了
  32. reachThreeCondition.signal();
  33. } finally {
  34. lock.unlock();
  35. }
  36. lock.lock();
  37. try {
  38. //等待輸出6的條件
  39. reachSixCondition.await();
  40. System.out.println("threadA start write");
  41. //輸出剩餘數字
  42. while (num.value <= 9) {
  43. System.out.println(num.value);
  44. num.value++;
  45. }
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. } finally {
  49. lock.unlock();
  50. }
  51. }
  52. });
  53. Thread threadB = new Thread(new Runnable() {
  54. @Override
  55. public void run() {
  56. try {
  57. lock.lock();
  58. while (num.value <= 3) {
  59. //等待3輸出完畢的訊號
  60. reachThreeCondition.await();
  61. }
  62. } catch (InterruptedException e) {
  63. e.printStackTrace();
  64. } finally {
  65. lock.unlock();
  66. }
  67. try {
  68. lock.lock();
  69. //已經收到訊號,開始輸出4,5,6
  70. System.out.println("threadB start write");
  71. while (num.value <= 6) {
  72. System.out.println(num.value);
  73. num.value++;
  74. }
  75. //4,5,6輸出完畢,告訴A執行緒6輸出完了
  76. reachSixCondition.signal();
  77. } finally {
  78. lock.unlock();
  79. }
  80. }
  81. });
  82. //啟動兩個執行緒
  83. threadB.start();
  84. threadA.start();
  85. }
  86. }
輸出結果

threadA start write 1 2 3 threadB start write 4 5 6 ThreadA start write 7 8 9

3.ThreadLocal

用處:儲存執行緒的獨立變數。對一個執行緒類(繼承自Thread)

當使用ThreadLocal維護變數時,ThreadLocal為每個使用該變數的執行緒提供獨立的變數副本,所以每一個執行緒都可以獨立地改變自己的副本,

而不會影響其它執行緒所對應的副本。ThreadLocal類中有一個map,用於儲存每一個執行緒的變數的副本,Map中元素的鍵為執行緒物件,

而值對應執行緒的變數副本,由於Key值不可重複,每一個“執行緒物件”對應執行緒的“變數副本”,而到達了執行緒安全。

同步與ThreadLocal是兩種思路,前者是資料共享的思路,後者是資料隔離的思路。同步是以時間換空間,ThreadLocal是以空間換時間。

synchronized利用鎖的機制,讓變數或者程式碼塊在同一時間內只能被某一個執行緒訪問,ThreadLocal為每一個執行緒儲存自己獨立的副本,

使得每個執行緒在同一時刻訪問的並不是同一個物件。

JDK 5 以後提供了泛型支援,ThreadLocal 被定義為支援泛型: public class ThreadLocal<T> extends Object T 為執行緒區域性變數的型別。該類定義了 4 個方法:  1) protected T initialValue():返回此執行緒區域性變數的當前執行緒的“初始值”。執行緒第一次使用 get()  方法訪問變數時將呼叫此方法,

            但如果執行緒之前呼叫了 set(T)  方法,則不會對該執行緒再呼叫 initialValue  方法。通常,此方法對每個執行緒最多呼叫一次,

           但如果在呼叫 get() 後又呼叫了 remove(),則可能再次呼叫此方法。  該實現返回 null;如果程式設計師希望執行緒區域性變數具有 null  以外的值,

           則必須為 ThreadLocal 建立子類,並重寫此方法。通常將使用匿名內部類完成此操作。  2)public T get():返回此執行緒區域性變數的當前執行緒副本中的值。如果變數沒有用於當前執行緒的值,則先將其初始化為呼叫 initialValue() 方法返回的值。  3)public void set(T value):將此執行緒區域性變數的當前執行緒副本中的值設定為指定值。大部分子類不需要重寫此方法,

           它們只依靠 initialValue()  方法來設定執行緒區域性變數的值。 

    4)public void remove():移除此執行緒區域性變數當前執行緒的值。如果此執行緒區域性變數隨後被當前執行緒讀取,且這期間當前執行緒沒有設定其值,

          則將呼叫其 initialValue()  方法重新初始化其值。這將導致在當前執行緒多次呼叫 initialValue  方法。 

下面是一個使用 ThreadLocal 的例子,每個執行緒產生自己獨立的序列號。就是使用ThreadLocal儲存每個執行緒獨立的序列號複本,執行緒之間互不干擾。 

  1. package sync;
  2. /**
  3. * Created by fanqunsong on 2017/7/24.
  4. */
  5. public class SequenceNumber {
  6. // 定義匿名子類建立ThreadLocal的變數
  7. private static ThreadLocal<Integer>seqNum = new ThreadLocal<Integer>(){
  8. // 覆蓋初始化方法
  9. public Integer initialValue() {
  10. return 0;
  11. }
  12. };
  13. // 下一個序列號
  14. public int getNextNum() {
  15. seqNum.set(seqNum.get() + 1);
  16. return seqNum.get();
  17. }
  18. private static class TestClient extends Thread {
  19. private SequenceNumber sn;
  20. public TestClient(SequenceNumber sn) {
  21. this.sn = sn;
  22. }
  23. public void run() {
  24. for (int i = 0; i < 3; i++) {
  25. System.out.println("thread[" + Thread.currentThread().getName() + "] sn[" + sn.getNextNum() + "]");
  26. }
  27. }
  28. }
  29. public static void main(String[] args) {
  30. SequenceNumber sn = new SequenceNumber();
  31. TestClient t1 = new TestClient(sn);
  32. TestClient t2 = new TestClient(sn);
  33. TestClient t3 = new TestClient(sn);
  34. t1.start();
  35. t2.start();
  36. t3.start();
  37. }
  38. }
 

ThreadLocal 是如何實現為每個執行緒儲存獨立的變數的副本的呢?通過檢視它的原始碼,我們會發現,是通過把當前“執行緒物件”當作鍵,變數作為值儲存在一個 Map 中。

  1. private T setInitialValue() {
  2. T value = initialValue();
  3. Thread t = Thread.currentThread();
  4. ThreadLocalMap map = getMap(t);
  5. if (map != null)
  6. map.set(this, value);
  7. else
  8. createMap(t, value);
  9. return value;
  10. }

 4.執行緒池

1)初識執行緒池:

根據系統自身的環境情況,有效的限制執行執行緒的數量,使得執行效果達到最佳。執行緒主要是通過控制執行的執行緒的數量,超出數量的執行緒排隊等候,等待有任務執行完畢,再從佇列最前面取出任務執行。

2)執行緒池作用:

減少建立和銷燬執行緒的次數,每個工作執行緒可以多次使用 可根據系統情況調整執行的執行緒數量,防止消耗過多記憶體

3)使用

ExecutorService:執行緒池介面 ExecutorService pool = Executors.常見執行緒 eg:ExecutorService pool = Executors.newSingleThreadExecutor();

4)常見執行緒池

①newSingleThreadExecutor 單個執行緒的執行緒池,即執行緒池中每次只有一個執行緒工作,單執行緒序列執行任務 ②newFixedThreadExecutor(n) 固定數量的執行緒池,沒提交一個任務就是一個執行緒,直到達到執行緒池的最大數量,然後後面進入等待佇列直到前面的任務完成才繼續執行 ③newCacheThreadExecutor(推薦使用) 可快取執行緒池,當執行緒池大小超過了處理任務所需的執行緒,那麼就會回收部分空閒(一般是60秒無執行)的執行緒,當有任務來時,又智慧的新增新執行緒來執行。 ④newScheduleThreadExecutor 大小無限制的執行緒池,支援定時和週期性的執行執行緒

5)例項

  1. public class MyThread extends Thread {
  2. @Override
  3. public void run() {
  4. System.out.println(Thread.currentThread().getName()+"執行中。。。");
  5. }
  6. }
  7. public class TestFixedThreadPool {
  8. public static void main(String[] args) {
  9. ExecutorService pool = Executors.newFixedThreadPool(5);
  10. Thread t1 = new MyThread();
  11. Thread t2 = new MyThread();
  12. Thread t3 = new MyThread();
  13. Thread t4 = new MyThread();
  14. Thread t5 = new MyThread();
  15. pool.execute(t1);
  16. pool.execute(t2);
  17. pool.execute(t3);
  18. pool.execute(t4);
  19. pool.execute(t5);
  20. pool.shutdown();
  21. }
  22. }

執行結果

pool-1-thread-1執行中。。。

pool-1-thread-3執行中。。。