尚矽谷java學習筆記——JUC(java.util.concurrent)
在 Java 5.0 提供了 java.util.concurrent (簡稱JUC )包,在此包中增加了在併發程式設計中很常用的實用工具類,用於定義類似於執行緒的自定義子系統,包括執行緒池、非同步 IO 和輕量級任務框架。提供可調的、靈活的執行緒池。還提供了設計用於多執行緒上下文中的 Collection 實現等。
一、volatile關鍵字、記憶體可見性
記憶體可見性
記憶體可見性(Memory Visibility)是指當某個執行緒正在使用物件狀態而另一個執行緒在同時修改該狀態,需要確保當一個執行緒修改了物件狀態後,其他執行緒能夠看到發生的狀態變化。
可見性錯誤是指當讀操作與寫操作在不同的執行緒中執行時,我們無法確保執行讀操作的執行緒能適時地看到其他執行緒寫入的值,有時甚至是根本不可能的事情。
我們可以通過同步來保證物件被安全地釋出。除此之外我們也可以使用一種更加輕量級的 volatile 變數。
volatile 關鍵字
Java 提供了一種稍弱的同步機制,即 volatile 變數,用來確保將變數的更新操作通知到其他執行緒,可以保證記憶體中的資料可見。可以將 volatile 看做一個輕量級的鎖,但是又與鎖有些不同:
- 對於多執行緒,不是一種互斥關係
- 不能保證變數狀態的“原子性操作”
public class TestVolatile {
public static void main(String[] args){
ThreadDemo td=new ThreadDemo();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("-----------");
break;
}
}
}
}
class ThreadDemo implements Runnable{
private volatile boolean flag=false;
public void run () {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag=true;
System.out.println("flag="+isFlag());
}
public boolean isFlag(){
return flag;
}
public void setFlag(boolean flag){
this.flag=flag;
}
}
執行結果:
-----------
flag=true
如果不加 volatile 關鍵字,只會輸出它,且程式死迴圈 :
flag=true
volatile 關鍵字保證了flag變數對所有執行緒記憶體課件,所以當flag變數 值變化後,主執行緒 while 迴圈中檢測到,列印後 程式執行完成,退出;如果 flag 不加 volatile 關鍵字,主執行緒將一直while 死迴圈 ,不退出。
二、原子變數 、CAS
原子變數:jdk1.5 後 java.util.concurrent.atomic 類的小工具包,支援在單個變數上解除鎖的執行緒安全程式設計,包下提供了常用的原子變數:
- AtomicBoolean 、AtomicInteger 、AtomicLong 、 AtomicReference
- AtomicIntegerArray 、AtomicLongArray
- AtomicMarkableReference
- AtomicReferenceArray
- AtomicStampedReference
1.類中的變數都是volatile型別:保證記憶體可見性
2.使用CAS演算法:保證資料的原子性
CAS (Compare-And-Swap) 是一種硬體對併發的支援,針對多處理器操作而設計的處理器中的一種特殊指令,用於管理對共享資料的併發訪問。
CAS 是一種無鎖的非阻塞演算法的實現。
CAS包含三個運算元:
記憶體值 V
預估值 A
更新值 B
當且僅當V==A時,B的值才更新給A,否則將不做任何操作。
public class TestAtomicDemo {
public static void main(String[] args) {
AtomicDemo ad = new AtomicDemo();
for (int i = 0; i < 10; i++) {
new Thread(ad).start();
}
}
}
class AtomicDemo implements Runnable{
// private volatile int serialNumber = 0;
private AtomicInteger serialNumber = new AtomicInteger(0);
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(getSerialNumber());
}
public int getSerialNumber(){
return serialNumber.getAndIncrement();//i++ 實際是int temp=i;i=i+1;i=temp; 需要原子性操作
}
}
CAS 演算法實際是由硬體機制完成的,我們使用synchronized方法模擬CAS 演算法,用10個執行緒代表對記憶體中資料的10次修改請求。只有上個執行緒修改完,這個執行緒從記憶體中獲取的記憶體值當成期望值,才等於記憶體值,才能對記憶體值進行修改。
public class TestCompareAndSwap {
public static void main(String[] args) {
final CompareAndSwap cas=new CompareAndSwap();
for(int i=0;i<10;i++){
new Thread(new Runnable(){
@Override
public void run() {
int expectedValue=cas.get();
boolean b=cas.compareAndSwap(expectedValue, (int)(Math.random()*101));
System.out.println(b);
}
}).start();
}
}
}
class CompareAndSwap{
private int value;//記憶體值
//獲取記憶體值
public synchronized int get(){
return value;
}
//比較
public synchronized boolean compareAndSwap(int expectedValue,int newValue){
int oldValue=value;//執行緒讀取記憶體值,與預估值比較
if(oldValue==expectedValue){
this.value=newValue;
return true;
}
return false;
}
}
三、ConcurrentHashMap、鎖分段
HashMap 執行緒不安全
Hashtable 內部採用獨佔鎖,執行緒安全,但效率低
ConcurrentHashMap同步容器類是java5 新增的一個執行緒安全的雜湊表,效率介於HashMap和Hashtable之間。內部採用“鎖分段”機制。
java.util.concurrent 包還提供了設計用於多執行緒上下文中的Collection實現:
當期望許多執行緒訪問一個給定 collection 時,
ConcurrentHashMap 通常優於同步的 HashMap,
ConcurrentSkipListMap 通常優於同步的 TreeMap
ConcurrentSkipListSet通常優於同步的 TreeSet.
當期望的讀數和遍歷遠遠大於列表的更新數時,
CopyOnWriteArrayList 優於同步的 ArrayList。因為每次新增時都會進行復制,開銷非常的大,併發迭代操作多時 ,選擇。
四、CountDownLatch 閉鎖
CountDownLatch 一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。
閉鎖可以延遲執行緒的進度直到其到達終止狀態,閉鎖可以用來確保某些活動直到其他活動都完成才繼續執行:
確保某個計算在其需要的所有資源都被初始化之後才繼續執行;
確保某個服務在其依賴的所有其他服務都已經啟動之後才啟動;
等待直到某個操作所有參與者都準備就緒再繼續執行。
/*
* CountDownLatch:閉鎖,在完成某些運算時,只有其他所有執行緒的運算全部完成,當前運算才繼續執行
*/
public class TestCountDownLatch {
public static void main(String[] args) {
final CountDownLatch latch=new CountDownLatch(50);
LatchDemo ld=new LatchDemo(latch);
long start=System.currentTimeMillis();
for(int i=0;i<50;i++){
new Thread(ld).start();
}
try {
latch.await(); //直到50個人子執行緒都執行完,latch的值減到0時,才往下執行
} catch (InterruptedException e) {
e.printStackTrace();
}
long end=System.currentTimeMillis();
System.out.println("耗費時間為:"+(end-start));
}
}
class LatchDemo implements Runnable{
private CountDownLatch latch;
public LatchDemo(CountDownLatch latch){
this.latch=latch;
}
@Override
public void run() {
try{
for(int i=0;i<50000;i++){
if(i%2==0){
System.out.println(i);
}
}
}finally{
latch.countDown();//latch的值減一
}
}
}
五、實現Callable介面
Java 5.0 在 java.util.concurrent 提供了一個新的建立執行執行緒的方式:Callable 介面
實現Callable 介面,相較於實現 Runnable介面的方式,方法可以有返回值,並且可以丟擲異常。
Callable 需要依賴FutureTask ,用於接收返回值,FutureTask 也可以用作閉鎖。
public interface RunnableFuture<V> extends Runnable, Future<V>
public class FutureTask<V> implements RunnableFuture<V>
Future的核心思想是:一個方法f,計算過程可能非常耗時,等待f返回,顯然不明智。可以在呼叫f的時候,立馬返回一個Future,可以通過Future這個資料結構去控制方法f的計算過程。
這裡的控制包括:
get方法:獲取計算結果(如果還沒計算完,也是必須等待的)
cancel方法:還沒計算完,可以取消計算過程
isDone方法:判斷是否計算完
isCancelled方法:判斷計算是否被取消
這個方法在 FutureTask 中實現
/*
* 一、建立執行執行緒的方式三:實現Callable介面。相較於實現Runnable介面的方式,方法可以有返回值,並且可以丟擲異常。
* 二、執行Callable方式,需要FutureTask實現類的支援,用於接收運算結果。FutureTask是Future介面的實現類
*/
public class TestCallable {
public static void main(String[] args) {
ThreadDemo2 td=new ThreadDemo2();
//1.執行Callable方式,需要FutureTask實現類的支援,用於接收執行結果。
FutureTask<Integer> result=new FutureTask<>(td);
new Thread(result).start();
//2.接收執行緒運算後的結果
try {
Integer sum = result.get();//FutureTask 可用於 閉鎖 當子執行緒執行完畢,才會執行此後語句
System.out.println(sum);
System.out.println("----------------------");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class ThreadDemo2 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
int sum=0;
for(int i=0;i<=100000;i++){
sum+=i;
}
return sum;
}
}
六、Lock 同步鎖
在 Java 5.0 之前,協調共享物件的訪問時可以使用的機制只有 synchronized 和 volatile 。Java 5.0 後增加了一些新的機制,但並不是一種替代內建鎖的方法,而是當內建鎖不適用時,作為一種可選擇的高階功能。
ReentrantLock 實現了 Lock 介面,並提供了與synchronized 相同的互斥性和記憶體可見性。但相較於synchronized 提供了更高的處理鎖的靈活性。
/*
* 一、用於解決多執行緒安全問題的方式:
* synchronized:隱式鎖
* 1、同步程式碼塊
* 2、同步方法
* jdk 1.5後
* 3、同步鎖 Lock
* 注意:是一個顯式鎖,通過lock()方式上鎖,必須通過unlock()方法釋放鎖
*/
public class TestLock {
public static void main(String[] args) {
Ticket ticket=new Ticket();
new Thread(ticket,"1號視窗").start();
new Thread(ticket,"2號視窗").start();
new Thread(ticket,"3號視窗").start();
}
}
class Ticket implements Runnable{
private int tick=100;
private Lock lock=new ReentrantLock();
@Override
public void run() {
while(true){
lock.lock();
try{
if(tick>0){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"完成售票為:"+--tick);
}
else{
break;
}
}finally{
lock.unlock();//釋放鎖一定要放在finally裡,保證一定執行
}
}
}
}
/*
* 生產者和消費者案例,優化,防止出現虛假喚醒,執行緒無法停止
*/
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk=new Clerk();
Productor pro=new Productor(clerk);
Consumer cus=new Consumer(clerk);
new Thread(pro,"生產者 A").start();
new Thread(cus,"消費者 B").start();
new Thread(pro,"生產者 C").start();
new Thread(cus,"消費者 D").start();
}
}
//店員 假如只有一個商品位置
class Clerk{
private int product=0;
//進貨
public synchronized void get(){
while(product>=1){//為了避免虛假喚醒問題,應該總是使用在迴圈中
System.out.println("產品已滿!");
try{
this.wait();
}catch(InterruptedException e){
}
}
System.out.println(Thread.currentThread().getName()+" : "+ ++product);
this.notifyAll();
}
//賣貨
public synchronized void sale(){
while(product<=0){
System.out.println("缺貨!");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" : "+ --product);
this.notifyAll();
}
}
//生產者
class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
for(int i=0;i<20;i++){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
clerk.get();
}
}
}
//消費者
class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
for(int i=0;i<20;i++){
clerk.sale();
}
}
}
七、Condition 控制執行緒通訊
Condition 介面描述了可能會與鎖有關聯的條件變數。這些變數在用法上與使用 Object.wait 訪問的隱式監視器類似,但提供了更強大的功能。需要特別指出的是,單個 Lock 可能new多個 Condition 物件關聯, 進而可以控制喚醒哪個執行緒。為了避免相容性問題,Condition 方法的名稱與對應的 Object 版本中的不同。
在 Condition 物件中,與 wait、notify 和 notifyAll 方法對應的分別是await、signal 和 signalAll。
Condition 例項實質上被繫結到一個鎖上。要為特定 Lock 例項獲得Condition 例項,請使用其 newCondition() 方法。
public class TestProductorAndConsumerForLock {
public static void main(String[] args) {
Clerk clerk=new Clerk();
Productor pro=new Productor(clerk);
Consumer cus=new Consumer(clerk);
new Thread(pro,"生產者 A").start();
new Thread(cus,"消費者 B").start();
new Thread(pro,"生產者 C").start();
new Thread(cus,"消費者 D").start();
}
}
//店員 假如只有一個商品位置
class Clerk{
private int product=0;
private Lock lock=new ReentrantLock();
private Condition condition=lock.newCondition();
//進貨
public void get(){
lock.lock();
try{
while(product>=1){//為了避免虛假喚醒問題,應該總是使用在迴圈中
System.out.println("產品已滿!");
try{
condition.await();//this.wait();
}catch(InterruptedException e){
}
}
System.out.println(Thread.currentThread().getName()+" : "+ ++product);
condition.signalAll();//this.notifyAll();
}finally{
lock.unlock();
}
}
//賣貨
public void sale(){
lock.lock();
try{
while(product<=0){
System.out.println("缺貨!");
try {
condition.await();//this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" : "+ --product);
condition.signalAll();//this.notifyAll();
}finally{
lock.unlock();
}
}
}
//生產者
class Productor implements Runnable{
private Clerk clerk;
public Productor(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
for(int i=0;i<20;i++){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
clerk.get();
}
}
}
//消費者
class Consumer implements Runnable{
private Clerk clerk;
public Consumer(Clerk clerk){
this.clerk=clerk;
}
@Override
public void run() {
for(int i=0;i<20;i++){
clerk.sale();
}
}
}
八、執行緒按序交替
編寫一個程式,開啟 3 個執行緒,這三個執行緒的 ID 分別為A、B、C,每個執行緒將自己的 ID 在螢幕上列印 10 遍,要求輸出的結果必須按順序顯示。如:ABCABCABC…… 依次遞迴
public class TestABCAlternate {
public static void main(String[] args) {
AlternateDemo ad=new AlternateDemo();
new Thread(new Runnable(){
@Override
public void run() {
for(int i=1;i<=20;i++){
ad.loopA(i);
}
}
},"A").start();
new Thread(new Runnable(){
@Override
public void run() {
for(int i=1;i<=20;i++){
ad.loopB(i);
}
}
},"B").start();
new Thread(new Runnable(){
@Override
public void run() {
for(int i=1;i<=20;i++){
ad.loopC(i);
System.out.println("-----------------------------------");
}
}
},"C").start();
}
}
class AlternateDemo{
private int number=1;//當前正在執行執行緒的標記
private Lock lock=new ReentrantLock();
private Condition condition1=lock.newCondition();
private Condition condition2=lock.newCondition();
private Condition condition3=lock.newCondition();
/*
* @param totalLoop:迴圈第幾輪
*/
public void loopA(int totalLoop){
lock.lock();
try{
//1.判斷
if(number!=1){
condition1.await();
}
//2.列印
System.out.println(Thread.currentThread().getName()+"\t"+totalLoop);
//3.喚醒
number=2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
public void loopB(int totalLoop){
lock.lock();
try{
//1.判斷
if(number!=2){
condition2.await();
}
//2.列印
System.out.println(Thread.currentThread().getName()+"\t"+totalLoop);
//3.喚醒
number=3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
public void loopC(int totalLoop){
lock.lock();
try{
//1.判斷
if(number!=3){
condition3.await();
}
//2.列印
System.out.println(Thread.currentThread().getName()+"\t"+totalLoop);
//3.喚醒
number=1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
}
九、ReadWriteLock 讀寫鎖
ReadWriteLock 維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 執行緒同時保持。寫入鎖是獨佔的。
ReadWriteLock 讀取操作通常不會改變共享資源,但執行寫入操作時,必須獨佔方式來獲取鎖。對於讀取操作佔多數的資料結構。 ReadWriteLock 能提供比獨佔鎖更高的併發性。而對於只讀的資料結構,其中包含的不變性可以完全不需要考慮加鎖操作。
/*
* 1.ReadWriteLock:讀寫鎖
* 寫寫|讀寫 需要"互斥"
* 讀讀 不需要"互斥"
*/
public class TestReadWriteLock {
public static void main(String[] args) {
ReadWriteLockDemo rw=new ReadWriteLockDemo();
new Thread(new Runnable() {
@Override
public void run() {
rw.set((int)(Math.random()*101));
}
},"Write").start();
for(int i=0;i<100;i++){
new Thread(new Runnable() {
@Override
public void run() {
rw.get();
}
}).start();
}
}
}
class ReadWriteLockDemo{
private int number=0;
private ReadWriteLock lock=new ReentrantReadWriteLock();
//讀
public void get(){
lock.readLock().lock();//上鎖
try{
System.out.println(Thread.currentThread().getName()+" : "+number);
}finally{
lock.readLock().unlock();//釋放鎖
}
}
//寫
public void set(int number){
lock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName());
this.number=number;
}finally{
lock.writeLock().unlock();
}
}
}
十、執行緒八鎖
- 一個物件裡面如果有多個synchronized方法,某一個時刻內,只要一個執行緒去呼叫其中的一個synchronized方法了,其它的執行緒都只能等待,換句話說,某一個時刻內,只能有唯一一個執行緒去訪問這些synchronized方法。鎖的是當前物件this,被鎖定後,其它的執行緒都不能進入到當前物件的其它的synchronized方法。
- 加個普通方法後發現和同步鎖無關
- 換成兩個物件後,不是同一把鎖了,情況立刻變化。
- 都換成靜態同步方法後,情況又變化
- 所有的非靜態同步方法用的都是同一把鎖——例項物件本身,也就是說如果一個例項物件的非靜態同步方法獲取鎖後,該例項物件的其他非靜態同步方法必須等待獲取鎖的方法釋放鎖後才能獲取鎖,可是別的例項物件的非靜態同步方法因為跟該例項物件的非靜態同步方法用的是不同的鎖,所以毋須等待該例項物件已獲取鎖的非靜態同步方法釋放鎖就可以獲取他們自己的鎖。
- 所有的靜態同步方法用的也是同一把鎖——類物件本身,這兩把鎖是兩個不同的物件,所以靜態同步方法與非靜態同步方法之間是不會有競態條件的。但是一旦一個靜態同步方法獲取鎖後,其他的靜態同步方法都必須等待該方法釋放鎖後才能獲取鎖,而不管是同一個例項物件的靜態同步方法之間,還是不同的例項物件的靜態同步方法之間,只是它們同一個類的例項物件!
/*
* 實驗:觀察列印的"one" or "two" ?
*
* 1.兩個普通同步方法,兩個執行緒,標準列印,列印? //one two 因為同步鎖是this(呼叫物件本身),被鎖定後,其它的執行緒都不能進入到當前物件的其它的synchronized方法
* 2.新增Thread.sleep()給 getOne(),列印? //等3秒後 one two
* 3.新增普通方法(非同步) getThree(),列印?//three 等3秒 one two 因為同步鎖不影響普通方法的執行
* 4.兩個普通同步方法,兩個Number物件,列印?//two 等3秒 one 因為用的不是同一把鎖,互不影響
* 5.修改 getOne() 為靜態同步方法,使用一個Number物件列印? //two 等3秒 one 因為靜態同步方法用的鎖是類物件本身,Number.class; 和物件用的是不同的鎖
* 6.修改兩個方法均為靜態同步方法,一個Number物件?//等3秒 one two 用的鎖都是Number類物件本身
* 7.一個靜態同步方法,一個非靜態同步方法,兩個Number物件?//two 等3秒one
* 8.兩個靜態同步方法,兩個Number物件?//等3秒後 one two 用的鎖都是Number類物件本身
*
* 執行緒八鎖的關鍵:
* ①非靜態方法的鎖預設為 this, 靜態方法的鎖為 對應的 Class 例項
* ②某一個時刻內,只能有一個執行緒持有同一把鎖,無論幾個方法。
*/
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
Number number2 = new Number();
new Thread(new Runnable() {
public void run() {
number.getOne();
}
}).start();
new Thread(new Runnable() {
public void run() {
// number.getTwo();
number2.getTwo();
}
}).start();
// new Thread(new Runnable() {
// public void run() {
// number.getThree();
// }
// }).start();
}
}
class Number {
public static synchronized void getOne() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public static synchronized void getTwo() {
System.out.println("two");
}
// public void getThree(){
// System.out.println("three");
// }
}
十一、執行緒池
獲取執行緒第四種方法。
執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。
為了便於跨大量上下文使用,此類提供了很多可調整的引數和擴充套件鉤子 (hook)。但是,強烈建議程式設計師使用較為方便的 Executors 工廠方法 :
- Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)
- Executors.newFixedThreadPool(int)(固定大小執行緒池)
- Executors.newSingleThreadExecutor()(單個後臺執行緒)
它們均為大多數使用場景預定義了設定。
/*
* 一、執行緒池:提供了一個執行緒佇列,佇列中儲存著所有等待狀態的執行緒。避免了建立與銷燬額外開銷,提高了響應的速度。
* 二、執行緒池的體系結構:
* java.util.concurrent.Executor:負責執行緒的使用與排程的根介面
* |--**ExecutorService 子介面:執行緒池的主要介面
* |--ThreadPoolExecutor 執行緒池的實現類
* |--ScheduledExecutorService 子介面:負責執行緒的排程
* |--ScheduledThreadPoolExecutor:繼承ThreadPoolExecutor,實現ScheduledExecutorService介面
* 三、工具類:Executors
* 方法有:
* ExecutorService newFixedThreadPool(): 建立固定大小的執行緒池
* ExecutorService newCachedThreadPool():快取執行緒池,執行緒池的數量不固定,可以根據需要自動的更改數量。
* ExecutorService newSingleThreadExecutor():建立單個執行緒池。執行緒池中只有一個執行緒
*
* ScheduledExecutorService newScheduledThreadPool():建立固定大小的執行緒,可以延遲或定時的執行任務。
*
*/
public class TestThreadPool {
public static void main(String[] args) {
//1.建立執行緒池
ExecutorService pool=Executors.newFixedThreadPool(5);
List<Future<Integer>> list=new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<Integer> future=pool.submit(new Callable<Integer>(){
@Override
public Integer call() throws Exception {
int sum=0;
for(int i=0;i<=100;i++){
sum+=i;
}
return sum;
}
});
list.add(future);
}
pool.shutdown();
for(Future<Integer> future:list){
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
十二、執行緒排程
public class TestScheduledThreadPool {
public static void main(String[] args) throws Exception {
ScheduledExecutorService pool=Executors.newScheduledThreadPool(5);
for (int i = 0; i < 5; i++) {
Future<Integer> result=pool.schedule(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int num=new Random().nextInt(100);//生成隨機數
System.out.println(Thread.currentThread().getName()+" : "+num);
return num;
}
}, 2, TimeUnit.SECONDS);//每次延遲兩秒後執行
System.out.println(result.get());
}
}
}
十三、ForkJoinPool 分支/ 合併框架 工作竊取
Fork/Join 框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行 join 彙總。
Fork/Join 框架與執行緒池的區別:
採用 “工作竊取”模式(work-stealing):相對於一般的執行緒池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上.在一般的執行緒池中,如果一個執行緒正在執行的任務由於某些原因無法繼續執行,那麼該執行緒會處於等待狀態。而在fork/join框架實現中,如果某個子問題由於等待另外一個子問題 的完成而無法繼續執行。那麼處理該子問題的執行緒會主動尋找其他尚未執行的子問題來執行.這種方式減少了執行緒的等待時間,提高了效能。
public class TestForkJoinPool {
public static void main(String[] args) {
Instant start=Instant.now();
ForkJoinPool pool=new ForkJoinPool();
ForkJoinTask<Long> task=new ForkJoinSumCalculate(0L, 50000000000L);
Long sum=pool.invoke(task);
System.out.println(sum);
Instant end=Instant.now();
System.out.println("耗費時間為:"+Duration.between(start, end).toMillis());//耗費時間為:21020
}
//一般的方法
@Test
public void test1(){
Instant start=Instant.now();
long sum=0L;
for(long i=0L;i<=50000000000L;i++){
sum+=i;
}
System.out.println(sum);
Instant end=Instant.now();
System.out.println("耗費時間為:"+Duration.between(start, end).toMillis());//耗費時間為:27040
}
//java8 新特性
@Test
public void test2(){
Instant start=Instant.now();
Long sum=LongStream.rangeClosed(0L,50000000000L).parallel().reduce(0L, Long::sum);
System.out.println(sum);
Instant end=Instant.now();
System.out.println("耗費時間為:"+Duration.between(start, end).toMillis());//耗費時間為:14281
}
}
class ForkJoinSumCalculate extends RecursiveTask<Long>{
private static final long serialVersionUID=-54565646543212315L;
private long start;
private long end;
private static final long THURSHOLD=10000L;//臨界值,小於這個值就不拆了,直接運算
public ForkJoinSumCalculate(long start,long end){
this.start=start;
this.end=end;
}
@Override
protected Long compute() {
long length=end-start;
if(length<=THURSHOLD){
long sum=0L;
for(long i=start;i<=end;i++){
sum+=i;
}
return sum;
}else{
//進行拆分,同時壓入執行緒佇列
long middle=(start+end)/2;
ForkJoinSumCalculate left=new ForkJoinSumCalculate(start, middle);
left.fork();
ForkJoinSumCalculate right=new ForkJoinSumCalculate(middle+1, end);
right.fork();
return left.join()+right.join();
}
}
}