執行緒與執行緒池
1.Callable和Runnable
I Callable定義的方法是call,而Runnable定義的方法是run。
II Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
III Callable的call方法可丟擲異常,而Runnable的run方法不能丟擲異常。
Runnable 不做具體介紹
通過實現Callable介面來建立Thread執行緒:其中,Callable介面(也只有一個方法)定義如下:
public interface Callable<V> { V call() throws Exception; } 步驟1:建立實現Callable介面的類SomeCallable<Integer>(略);
步驟2:建立一個類物件:
Callable<Integer> oneCallable = new SomeCallable<Integer>();
步驟3:由Callable<Integer>建立一個FutureTask<Integer>物件:
FutureTask<Integer> oneTask = new FutureTask<Integer>(oneCallable);
註釋:FutureTask<Integer>是一個包裝器,它通過接受Callable<Integer>來建立,它同時實現了Future和Runnable介面。 步驟4:由FutureTask<Integer>建立一個Thread物件:
Thread oneThread = new Thread(oneTask);
步驟5:啟動執行緒:
oneThread.start();
至此,一個執行緒就建立完成了。
例:
- package reed.meituan.com;
- import java.util.Random;
- import java.util.concurrent.Callable;
- import java.util.concurrent.FutureTask;
- public class CallableAndFuture1 {
- public static void main(String[] args)
throws Exception{ - Callable<Integer> callable = new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return new Random().nextInt(100);
- }
- };
- FutureTask<Integer> futureTask = new FutureTask<Integer>(callable);
- new Thread(futureTask).start();
- System.out.println(futureTask.get());
- }
- }
callable和future,一個產生結果,一個拿到結果FutureTask實現了兩個介面,Runnable和Future,所以它既可以作為Runnable被執行緒執行,又可以作為Future得到Callable的返回值..
下面來看另一種方式使用Callable和Future,通過ExecutorService的submit方法執行Callable,並返回Future,程式碼如下:
- import java.util.Random;
- import java.util.concurrent.*;
- public class CallableAndFuture {
- public static void main(String[] args) throws Exception{
- ExecutorService threadPool = Executors.newSingleThreadExecutor();
- Future<Integer> future = threadPool.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- return new Random().nextInt(100);
- }
- });
- Thread.sleep(5000);
- System.out.println(future.get());
- }
- }
2.鎖的相關概念介紹
1)可重入鎖
如果鎖具備可重入性,則稱作為可重入鎖。像synchronized和ReentrantLock都是可重入鎖,可重入性在我看來實際上表明瞭鎖的分配機制:基於執行緒的分配,而不是基於方法呼叫的分配。
舉個簡單的例子,當一個執行緒執行到某個synchronized方法時,比如說method1,而在method1中會呼叫另外一個synchronized方法method2,此時執行緒不必重新去申請鎖,而是可以直接執行方法method2。
看下面這段程式碼就明白了:
class MyClass {
public synchronized void method1() {
method2();
}
public synchronized void method2() {
}
}
上述程式碼中的兩個方法method1和method2都用synchronized修飾了,假如某一時刻,執行緒A執行到了method1,此時執行緒A獲取了這個物件的鎖,而由於method2也是synchronized方法,
假如synchronized不具備可重入性,此時執行緒A需要重新申請鎖。但是這就會造成一個問題,因為執行緒A已經持有了該物件的鎖,而又在申請獲取該物件的鎖,這樣就會執行緒A一直等待永遠不會獲取到的鎖。
而由於synchronized和Lock都具備可重入性,所以不會發生上述現象。
2)可中斷鎖
可中斷鎖:顧名思義,就是可以相應中斷的鎖。
在Java中,synchronized就不是可中斷鎖,而Lock是可中斷鎖。
如果某一執行緒A正在執行鎖中的程式碼,另一執行緒B正在等待獲取該鎖,可能由於等待時間過長,執行緒B不想等待了,想先處理其他事情,
我們可以讓它中斷自己或者在別的執行緒中中斷它,這種就是可中斷鎖。
lockInterruptibly()可體現了Lock的可中斷性。
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- * Created by fanqunsong on 2017/8/5.
- */
- public class LockTest {
- public static void main(String[] args) {
- Thread t1 = new Thread(new RunIt());
- Thread t2 = new Thread(new RunIt());
- t1.start();
- t2.start();
- t2.interrupt();
- }
- }
- class RunIt implements Runnable{
- private static Lock lock = new ReentrantLock();
- @Override
- public void run() {
- try {
- //----------------------a
- //lock.lock();
- lock.lockInterruptibly();
- System.out.println(Thread.currentThread().getName() + " running");
- TimeUnit.SECONDS.sleep(5);
- lock.unlock();
- System.out.println(Thread.currentThread().getName()+" finishend");
- } catch (InterruptedException e) {
- System.out.println(Thread.currentThread().getName()+ " interrupted");
- }
- }
- }
如果a處 是lock.lock(); 輸出 Thread-0 running (這裡休眠了5s) Thread-0 finished Thread-1 running Thread-1 interrupted
============================ 如果a處是lock.lockInterruptibly() Thread-0 running Thread-1 interrupted (這裡休眠了5s) Thread-0 finished
3)公平鎖
公平鎖即儘量以請求鎖的順序來獲取鎖。比如同是有多個執行緒在等待一個鎖,當這個鎖被釋放時,等待時間最久的執行緒(最先請求的執行緒)會獲得該所,這種就是公平鎖。
非公平鎖即無法保證鎖的獲取是按照請求鎖的順序進行的。這樣就可能導致某個或者一些執行緒永遠獲取不到鎖。
在Java中,synchronized就是非公平鎖,它無法保證等待的執行緒獲取鎖的順序。
而對於ReentrantLock和ReentrantReadWriteLock,它預設情況下是非公平鎖,但是可以設定為公平鎖。
在ReentrantLock中定義了2個靜態內部類,一個是NotFairSync,一個是FairSync,分別用來實現非公平鎖和公平鎖。
我們可以在建立ReentrantLock物件時,通過以下方式來設定鎖的公平性:
ReentrantLock lock = new ReentrantLock(true);
如果引數為true表示為公平鎖,為fasle為非公平鎖。預設情況下,如果使用無參構造器,則是非公平鎖。
4)讀寫鎖
讀寫鎖將對一個資源(比如檔案)的訪問分成了2個鎖,一個讀鎖和一個寫鎖。
正因為有了讀寫鎖,才使得多個執行緒之間的讀操作不會發生衝突。
ReadWriteLock就是讀寫鎖,它是一個介面,ReentrantReadWriteLock實現了這個介面。
可以通過readLock()獲取讀鎖,通過writeLock()獲取寫鎖。
在多執行緒開發中,經常會出現一種情況,我們希望讀寫分離。就是對於讀取這個動作來說,可以同時有多個執行緒同
時去讀取這個資源,但是對於寫這個動作來說,只能同時有一個執行緒來操作,而且同時,當有一個寫執行緒在操作這個資
源的時候,其他的讀執行緒是不能來操作這個資源的,這樣就極大的發揮了多執行緒的特點,能很好的將多執行緒的能力發揮。
- package reed.thread;
- import java.util.Random;
- import java.util.concurrent.locks.ReadWriteLock;
- import java.util.concurrent.locks.ReentrantReadWriteLock;
- public class ReadWriteLockTest {
- public static void main(String[] args) {
- final Data data = new Data();
- for (int i = 0; i <3 ; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- for(int j = 0; j <5;j++) {
- data.set(new Random().nextInt(100));
- }
- }
- }
- ).start();
- }
- for (int i = 0; i <3; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- for(int j = 0; j <5;j++){
- data.get();
- }
- }
- }).start();
- }
- }
- }
- class Data{
- private int data;//共享資料1
- private ReadWriteLock rwl = new ReentrantReadWriteLock();
- public void set(int data){
- rwl.writeLock().lock();// 取到寫鎖
- try{
- System.out.println(Thread.currentThread().getName()+"準備寫入資料");
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- this.data = data;
- System.out.println(Thread.currentThread().getName() + "寫入" + this.data);
- }finally {
- rwl.writeLock().unlock();
- }
- }
- public void get() {
- rwl.readLock().lock();// 取到讀鎖
- try {
- System.out.println(Thread.currentThread().getName() + "準備讀取資料");
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "讀取" + this.data);
- } finally {
- rwl.readLock().unlock();// 釋放讀鎖
- }
- }
- }
執行結果
Thread-1準備寫入資料
Thread-1寫入57
Thread-4準備讀取資料
Thread-3準備讀取資料
Thread-5準備讀取資料
Thread-4讀取57
Thread-5讀取57
Thread-3讀取57
Thread-2準備寫入資料
Thread-2寫入64
Thread-2準備寫入資料
Thread-2寫入82
Thread-2準備寫入資料
Thread-2寫入26
Thread-4準備讀取資料
Thread-5準備讀取資料
Thread-3準備讀取資料
Thread-5讀取26
Thread-4讀取26
Thread-3讀取26
Thread-4準備讀取資料
Thread-5準備讀取資料
Thread-3準備讀取資料
Thread-5讀取26
Thread-3讀取26
Thread-4讀取26
Thread-3準備讀取資料
Thread-5準備讀取資料
Thread-4準備讀取資料
Thread-5讀取26
Thread-3讀取26
Thread-4讀取26
Thread-3準備讀取資料
Thread-5準備讀取資料
Thread-4準備讀取資料
Thread-4讀取26
Thread-5讀取26
Thread-3讀取26
- import java.util.concurrent.locks.ReentrantReadWriteLock;
- /**
- * 使用讀寫鎖,可以實現讀寫分離鎖定,讀操作併發進行,寫操作鎖定單個執行緒
- *
- * 如果有一個執行緒已經佔用了讀鎖,則此時其他執行緒如果要申請寫鎖,則申請寫鎖的執行緒會一直等待釋放讀鎖。
- * 如果有一個執行緒已經佔用了寫鎖,則此時其他執行緒如果申請寫鎖或者讀鎖,則申請的執行緒會一直等待釋放寫鎖。
- * @author
- *
- */
- public class MyReentrantReadWriteLock {
- private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- public static void main(String[] args) {
- final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock();
- new Thread(){
- public void run() {
- test.get(Thread.currentThread());
- test.write(Thread.currentThread());
- };
- }.start();
- new Thread(){
- public void run() {
- test.get(Thread.currentThread());
- test.write(Thread.currentThread());
- };
- }.start();
- }
- /**
- * 讀操作,用讀鎖來鎖定
- * @param thread
- */
- public void get(Thread thread) {
- rwl.readLock().lock();
- try {
- long start = System.currentTimeMillis();
- while(System.currentTimeMillis() - start <= 1) {
- System.out.println(thread.getName()+"正在進行讀操作");
- }
- System.out.println(thread.getName()+"讀操作完畢");
- } finally {
- rwl.readLock().unlock();
- }
- }
- /**
- * 寫操作,用寫鎖來鎖定
- * @param thread
- */
- public void write(Thread thread) {
- rwl.writeLock().lock();;
- try {
- long start = System.currentTimeMillis();
- while(System.currentTimeMillis() - start <= 1) {
- System.out.println(thread.getName()+"正在進行寫操作");
- }
- System.out.println(thread.getName()+"寫操作完畢");
- } finally {
- rwl.writeLock().unlock();
- }
- }
- }
一個執行緒讀的時候另外一個執行緒也可以讀
5)條件變數condition
條件變數很大一個程度上是為了解決Object.wait/notify/notifyAll難以使用的問題。
await*對應於Object.wait,signal對應於Object.notify,signalAll對應於Object.notifyAll。特別說明的是Condition的介面改變名稱就是為了避免與
Object中的wait/notify/notifyAll的語義和使用上混淆,因為Condition同樣有wait/notify/notifyAll方法。
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- public class Test{
- static class NumberWrapper {
- public int value = 1;
- }
- public static void main(String[] args) {
- //初始化可重入鎖
- final Lock lock = new ReentrantLock();
- //第一個條件當螢幕上輸出到3
- final Condition reachThreeCondition = lock.newCondition();
- //第二個條件當螢幕上輸出到6
- final Condition reachSixCondition = lock.newCondition();
- //NumberWrapper只是為了封裝一個數字,一邊可以將數字物件共享,並可以設定為final
- //注意這裡不要用Integer, Integer 是不可變物件
- final NumberWrapper num = new NumberWrapper();
- //初始化A執行緒
- Thread threadA = new Thread(new Runnable() {
- @Override
- public void run() {
- //需要先獲得鎖
- lock.lock();
- try {
- System.out.println("threadA start write");
- //A執行緒先輸出前3個數
- while (num.value <= 3) {
- System.out.println(num.value);
- num.value++;
- }
- //輸出到3時要signal,告訴B執行緒可以開始了
- reachThreeCondition.signal();
- } finally {
- lock.unlock();
- }
- lock.lock();
- try {
- //等待輸出6的條件
- reachSixCondition.await();
- System.out.println("threadA start write");
- //輸出剩餘數字
- while (num.value <= 9) {
- System.out.println(num.value);
- num.value++;
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- });
- Thread threadB = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock.lock();
- while (num.value <= 3) {
- //等待3輸出完畢的訊號
- reachThreeCondition.await();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- try {
- lock.lock();
- //已經收到訊號,開始輸出4,5,6
- System.out.println("threadB start write");
- while (num.value <= 6) {
- System.out.println(num.value);
- num.value++;
- }
- //4,5,6輸出完畢,告訴A執行緒6輸出完了
- reachSixCondition.signal();
- } finally {
- lock.unlock();
- }
- }
- });
- //啟動兩個執行緒
- threadB.start();
- threadA.start();
- }
- }
輸出結果
threadA start write 1 2 3 threadB start write 4 5 6 ThreadA start write 7 8 9
3.ThreadLocal
用處:儲存執行緒的獨立變數。對一個執行緒類(繼承自Thread)
當使用ThreadLocal維護變數時,ThreadLocal為每個使用該變數的執行緒提供獨立的變數副本,所以每一個執行緒都可以獨立地改變自己的副本,
而不會影響其它執行緒所對應的副本。ThreadLocal類中有一個map,用於儲存每一個執行緒的變數的副本,Map中元素的鍵為執行緒物件,
而值對應執行緒的變數副本,由於Key值不可重複,每一個“執行緒物件”對應執行緒的“變數副本”,而到達了執行緒安全。
同步與ThreadLocal是兩種思路,前者是資料共享的思路,後者是資料隔離的思路。同步是以時間換空間,ThreadLocal是以空間換時間。
synchronized利用鎖的機制,讓變數或者程式碼塊在同一時間內只能被某一個執行緒訪問,ThreadLocal為每一個執行緒儲存自己獨立的副本,
使得每個執行緒在同一時刻訪問的並不是同一個物件。
JDK 5 以後提供了泛型支援,ThreadLocal 被定義為支援泛型: public class ThreadLocal<T> extends Object T 為執行緒區域性變數的型別。該類定義了 4 個方法: 1) protected T initialValue():返回此執行緒區域性變數的當前執行緒的“初始值”。執行緒第一次使用 get() 方法訪問變數時將呼叫此方法,
但如果執行緒之前呼叫了 set(T) 方法,則不會對該執行緒再呼叫 initialValue 方法。通常,此方法對每個執行緒最多呼叫一次,
但如果在呼叫 get() 後又呼叫了 remove(),則可能再次呼叫此方法。 該實現返回 null;如果程式設計師希望執行緒區域性變數具有 null 以外的值,
則必須為 ThreadLocal 建立子類,並重寫此方法。通常將使用匿名內部類完成此操作。 2)public T get():返回此執行緒區域性變數的當前執行緒副本中的值。如果變數沒有用於當前執行緒的值,則先將其初始化為呼叫 initialValue() 方法返回的值。 3)public void set(T value):將此執行緒區域性變數的當前執行緒副本中的值設定為指定值。大部分子類不需要重寫此方法,
它們只依靠 initialValue() 方法來設定執行緒區域性變數的值。
4)public void remove():移除此執行緒區域性變數當前執行緒的值。如果此執行緒區域性變數隨後被當前執行緒讀取,且這期間當前執行緒沒有設定其值,
則將呼叫其 initialValue() 方法重新初始化其值。這將導致在當前執行緒多次呼叫 initialValue 方法。
下面是一個使用 ThreadLocal 的例子,每個執行緒產生自己獨立的序列號。就是使用ThreadLocal儲存每個執行緒獨立的序列號複本,執行緒之間互不干擾。
- package sync;
- /**
- * Created by fanqunsong on 2017/7/24.
- */
- public class SequenceNumber {
- // 定義匿名子類建立ThreadLocal的變數
- private static ThreadLocal<Integer>seqNum = new ThreadLocal<Integer>(){
- // 覆蓋初始化方法
- public Integer initialValue() {
- return 0;
- }
- };
- // 下一個序列號
- public int getNextNum() {
- seqNum.set(seqNum.get() + 1);
- return seqNum.get();
- }
- private static class TestClient extends Thread {
- private SequenceNumber sn;
- public TestClient(SequenceNumber sn) {
- this.sn = sn;
- }
- public void run() {
- for (int i = 0; i < 3; i++) {
- System.out.println("thread[" + Thread.currentThread().getName() + "] sn[" + sn.getNextNum() + "]");
- }
- }
- }
- public static void main(String[] args) {
- SequenceNumber sn = new SequenceNumber();
- TestClient t1 = new TestClient(sn);
- TestClient t2 = new TestClient(sn);
- TestClient t3 = new TestClient(sn);
- t1.start();
- t2.start();
- t3.start();
- }
- }
ThreadLocal 是如何實現為每個執行緒儲存獨立的變數的副本的呢?通過檢視它的原始碼,我們會發現,是通過把當前“執行緒物件”當作鍵,變數作為值儲存在一個 Map 中。
- private T setInitialValue() {
- T value = initialValue();
- Thread t = Thread.currentThread();
- ThreadLocalMap map = getMap(t);
- if (map != null)
- map.set(this, value);
- else
- createMap(t, value);
- return value;
- }
4.執行緒池
1)初識執行緒池:
根據系統自身的環境情況,有效的限制執行執行緒的數量,使得執行效果達到最佳。執行緒主要是通過控制執行的執行緒的數量,超出數量的執行緒排隊等候,等待有任務執行完畢,再從佇列最前面取出任務執行。2)執行緒池作用:
減少建立和銷燬執行緒的次數,每個工作執行緒可以多次使用 可根據系統情況調整執行的執行緒數量,防止消耗過多記憶體3)使用
ExecutorService:執行緒池介面 ExecutorService pool = Executors.常見執行緒 eg:ExecutorService pool = Executors.newSingleThreadExecutor();4)常見執行緒池
①newSingleThreadExecutor 單個執行緒的執行緒池,即執行緒池中每次只有一個執行緒工作,單執行緒序列執行任務 ②newFixedThreadExecutor(n) 固定數量的執行緒池,沒提交一個任務就是一個執行緒,直到達到執行緒池的最大數量,然後後面進入等待佇列直到前面的任務完成才繼續執行 ③newCacheThreadExecutor(推薦使用) 可快取執行緒池,當執行緒池大小超過了處理任務所需的執行緒,那麼就會回收部分空閒(一般是60秒無執行)的執行緒,當有任務來時,又智慧的新增新執行緒來執行。 ④newScheduleThreadExecutor 大小無限制的執行緒池,支援定時和週期性的執行執行緒5)例項
- public class MyThread extends Thread {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+"執行中。。。");
- }
- }
- public class TestFixedThreadPool {
- public static void main(String[] args) {
- ExecutorService pool = Executors.newFixedThreadPool(5);
- Thread t1 = new MyThread();
- Thread t2 = new MyThread();
- Thread t3 = new MyThread();
- Thread t4 = new MyThread();
- Thread t5 = new MyThread();
- pool.execute(t1);
- pool.execute(t2);
- pool.execute(t3);
- pool.execute(t4);
- pool.execute(t5);
- pool.shutdown();
- }
- }
執行結果
pool-1-thread-1執行中。。。
pool-1-thread-3執行中。。。