Java程式設計思想之二十 併發
20.1 併發得多面性
併發程式設計令人困惑的一個主要原因:使用併發時需要解決的問題有多個,而實現併發的方法也有多種,並且在這兩者之間沒有明顯的對映關係。
20.1.1 更快的執行
速度問題初聽起來很簡單:如果你需要一個程式執行得更快,那麼可以將起斷開為多個片段,在單個處理器上執行每個片段。
併發通常是提高執行在單個處理器上的程式的效能,但在單個處理器上執行的併發程式開銷確實應該比該程式所有部分都順序執行開銷大,因為其中增加了所謂的上下文切換的代價。
如果沒有任務會阻塞,那麼在單處理器上使用併發就沒有任何意義。
在單處理器系統中的效能提高常見示例是事件驅動的程式設計。
Java採取的是在順序語言的基礎上提供對執行緒的支援。與在多工作業系統中分叉程序不同,執行緒機制是在由執行程式表示的單一程序中建立任務。
20.1.2 改進程式碼設計
協作多執行緒:Java的執行緒機制是搶佔式的,這表示排程機制週期性的中斷執行緒,將上下文切換到另一個執行緒,從而為每個執行緒都提供時間片,使得每個執行緒都會分配到數量合理得時間去驅動它得任務。在協作式系統中,每個任務都會自動得放棄控制,這要求程式設計師要有意識得插入某種型別得讓步語句。協作式系統得優勢是雙重得:上下文切換的開銷通常比搶佔式要少得多,並且對可以同時執行的執行緒數量在理論上沒有任何限制。
20.2 基本的執行緒機制
通過使用多執行緒機制,這些獨立任務中的每一個將由執行執行緒來驅動,一個執行緒就是在程序中的一個單一順序控制流,當個程序可以擁有多個併發執行的任務。
20.2.1 定義任務
執行緒可以驅動任務,因此你需要一種描述任務的方式,這可以由Runnable介面來提供。要想定義任務,只需實現Runnable介面並編寫run(0方法,使得該任務可以執行你的命令。
public class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } } } ///:~
Thread.yield()的呼叫是對執行緒排程器的以後在哪個建議,它宣告:我已經執行完生命週期中最重要的部分了,此刻正是切換給其他任務執行一段時間的時機了。
public class MainThread {
public static void main(String[] args) throws InterruptedException {
LiftOff launch = new LiftOff();
launch.run();
}
} /* Output:
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
*///:~
當從Runnable匯出一個類時,它必須具有run()方法,但是這個方法並無特殊之處——它不會產生內在的執行緒能力。要實現執行緒行為,你必須顯示的將一個任務附著線上程上。
20.2.2 Thread類
將Runnable物件轉變為工作任務的傳統方式是把它提交給一個Thread構造器:
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
} /* Output: (90% match)
Waiting for LiftOff
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!),
*///:~
可以新增更多的執行緒去驅動更多的任務。
public class MoreBasicThreads {
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new Thread(new LiftOff()).start();
System.out.println("Waiting for LiftOff");
}
} /* Output: (Sample)
Waiting for LiftOff
#0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
*///:~
當main()建立Thread物件時,它並沒有捕獲任何對這些物件的引用。每個Thread都註冊了它自己,因此確實有一個對它的引用,而且在它的任務推出其run()並死亡之前,垃圾回收期無法清除它。
20.2.3 使用Executor
Java SE5的jav.util.concurrent包中的執行器(Executor)將為你管理Thread物件,從而簡化了併發程式設計。Executor在客戶端和任務執行之間提供了一個間接層;與客戶端直接執行任務不同,這個中介物件將執行任務。Executor允許你管理非同步任務的執行,而無須顯示的管理執行緒的宣告週期。
我們可以使用Executor來代替Thread物件。
import java.util.concurrent.*;
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
} /* Output: (Sample)
#0(9), #0(8), #1(9), #2(9), #3(9), #4(9), #0(7), #1(8), #2(8), #3(8), #4(8), #0(6), #1(7), #2(7), #3(7), #4(7), #0(5), #1(6), #2(6), #3(6), #4(6), #0(4), #1(5), #2(5), #3(5), #4(5), #0(3), #1(4), #2(4), #3(4), #4(4), #0(2), #1(3), #2(3), #3(3), #4(3), #0(1), #1(2), #2(2), #3(2), #4(2), #0(Liftoff!), #1(1), #2(1), #3(1), #4(1), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
*///:~
單個的Executor被用來建立和管理系統中所有任務。
對shutdown()方法的呼叫可以防止新任務被提交給這個Executor,當前執行緒將繼續執行在shutdown()被呼叫之前提交所有任務。
FixedThreadPool使用了有限的執行緒集來執行所提交的任務:
import java.util.concurrent.*;
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec =
Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
} /* Output:
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
*///:~
有了FixedThreadPool,就可以一次性預先執行代價高昂的執行緒分配,因而也就可以限制執行緒的數量了。
CachedThreadPool在程式執行過程中通常會創建於所徐數量相同的執行緒,然後再它回收舊執行緒時停止建立新的執行緒,因此它是合理的Executor首選。只有當這種方式會引發問題時,才需要切換到FixedThreadPool。
SingleThreadExecutor就像是執行緒數量為1的FixedThreadPool。
SingleThreadExecutor會序列化所有提交給它的任務,並會維護它自己的懸掛任務佇列。
import java.util.concurrent.*;
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec =
Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++)
exec.execute(new LiftOff());
exec.shutdown();
}
} /* Output:
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(Liftoff!), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(Liftoff!), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(Liftoff!), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(Liftoff!), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(Liftoff!),
*///:~
20.2.4 從任務中產生返回值
Runnable是執行工作的獨立任務,但是它不返回任何值。如果你希望任務再完成時能夠返回一個值,可以實現Callable介面而不是Runnable介面。
//: concurrency/CallableDemo.java
import java.util.concurrent.*;
import java.util.*;
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
public String call() {
return "result of TaskWithResult " + id;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results =
new ArrayList<Future<String>>();
for(int i = 0; i < 10; i++)
results.add(exec.submit(new TaskWithResult(i)));//將產生Future物件
for(Future<String> fs : results)
try {
// get() blocks until completion:
System.out.println(fs.get());
} catch(InterruptedException e) {
System.out.println(e);
return;
} catch(ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
} /* Output:
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
*///:~
還可以使用isDone判斷是否執行完成,如果不呼叫isDone,那個如果沒有完成,get會被阻塞。
20.2.5 休眠
影響任務行為的一種簡單方式是呼叫sleep(),這將使任務中止執行給定的時間。
//: concurrency/SleepingTask.java
// Calling sleep() to pause for a while.
import java.util.concurrent.*;
public class SleepingTask extends LiftOff {
public void run() {
try {
while(countDown-- > 0) {
System.out.print(status());
// Old-style:
Thread.sleep(100);
// Java SE5/6-style:
//TimeUnit.MILLISECONDS.sleep(100);
}
} catch(InterruptedException e) {
System.err.println("Interrupted");
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new SleepingTask());
exec.shutdown();
}
} /* Output:
#0(9), #1(9), #2(9), #3(9), #4(9), #0(8), #1(8), #2(8), #3(8), #4(8), #0(7), #1(7), #2(7), #3(7), #4(7), #0(6), #1(6), #2(6), #3(6), #4(6), #0(5), #1(5), #2(5), #3(5), #4(5), #0(4), #1(4), #2(4), #3(4), #4(4), #0(3), #1(3), #2(3), #3(3), #4(3), #0(2), #1(2), #2(2), #3(2), #4(2), #0(1), #1(1), #2(1), #3(1), #4(1), #0(Liftoff!), #1(Liftoff!), #2(Liftoff!), #3(Liftoff!), #4(Liftoff!),
*///:~
異常不能跨執行緒傳播回main(),所以你必須在本地處理所有在任務內部產生的異常。
20.2.6 優先順序
執行緒的優先順序將該執行緒的重要性傳遞給排程器。
優先順序較低的執行緒僅僅是執行的頻率較低。
在對大多數時間裡,所有執行緒都應該以預設的優先順序執行,試圖操作執行緒的優先順序通常是一種錯誤。
//: concurrency/SimplePriorities.java
// Shows the use of thread priorities.
import java.util.concurrent.*;
public class SimplePriorities implements Runnable {
private int countDown = 5;
private volatile double d; // No optimization
private int priority;
public SimplePriorities(int priority) {
this.priority = priority;
}
public String toString() {
return Thread.currentThread() + ": " + countDown;
}
public void run() {
Thread.currentThread().setPriority(priority);//設定當前執行緒優先順序,使用getPriority獲取當前優先順序
while(true) {
// An expensive, interruptable operation:
for(int i = 1; i < 100000; i++) {
d += (Math.PI + Math.E) / (double)i;
if(i % 1000 == 0)
Thread.yield();
}
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(
new SimplePriorities(Thread.MIN_PRIORITY));
exec.execute(
new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
} /* Output: (70% match)
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-6,10,main]: 4
Thread[pool-1-thread-6,10,main]: 3
Thread[pool-1-thread-6,10,main]: 2
Thread[pool-1-thread-6,10,main]: 1
Thread[pool-1-thread-3,1,main]: 5
Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-1,1,main]: 5
Thread[pool-1-thread-5,1,main]: 5
Thread[pool-1-thread-4,1,main]: 5
...
*///:~
20.2.7 讓步
當呼叫yieId()時,你也是在建議具有相同優先順序的其他執行緒可以執行。
大體上,對於任何重要的控制或在呼叫整個應用時,都不能依賴yieId(),實際上,yieId()經常被誤用。
21.2.8 後臺執行緒
所謂後臺執行緒,是指在程式執行的時候在後臺提供一種通用服務的執行緒,並且這種執行緒並不屬於程式中不可或缺的部分。當所有非後臺執行緒結束時,程式也就終止了,同時會殺死程序中的所有後臺執行緒。
//: concurrency/SimpleDaemons.java
// Daemon threads don't prevent the program from ending.
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
public class SimpleDaemons implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
print(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
print("sleep() interrupted");
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < 10; i++) {
Thread daemon = new Thread(new SimpleDaemons());
daemon.setDaemon(true); // 必須線上程被呼叫之前設定setDaemon
daemon.start();
}
print("All daemons started");
TimeUnit.MILLISECONDS.sleep(175);
}
} /* Output: (Sample)
All daemons started
Thread[Thread-0,5,main] SimpleDaemons@530daa
Thread[Thread-1,5,main] SimpleDaemons@a62fc3
Thread[Thread-2,5,main] SimpleDaemons@89ae9e
Thread[Thread-3,5,main] SimpleDaemons@1270b73
Thread[Thread-4,5,main] SimpleDaemons@60aeb0
Thread[Thread-5,5,main] SimpleDaemons@16caf43
Thread[Thread-6,5,main] SimpleDaemons@66848c
Thread[Thread-7,5,main] SimpleDaemons@8813f2
Thread[Thread-8,5,main] SimpleDaemons@1d58aae
Thread[Thread-9,5,main] SimpleDaemons@83cc67
...
*///:~
必須線上程啟動之前呼叫setDaemom()方法,才能把它設定為後臺執行緒。
通過編寫定製的ThreadFactory可以定製由Executor建立的執行緒的屬性:
package net.mindview.util;
import java.util.concurrent.ThreadFactory;
public class DaemonThreadFactory implements ThreadFactory {
public DaemonThreadFactory() {
}
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
//: concurrency/DaemonFromFactory.java
// Using a Thread Factory to create daemons.
import java.util.concurrent.*;
import net.mindview.util.*;
import static net.mindview.util.Print.*;
public class DaemonFromFactory implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
print(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
print("Interrupted");
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool(
new DaemonThreadFactory());
for(int i = 0; i < 10; i++)
exec.execute(new DaemonFromFactory());
print("All daemons started");
TimeUnit.MILLISECONDS.sleep(500); // Run for a while
}
} /* (Execute to see output) *///:~
可以通過呼叫isDaemon()方法來確定執行緒是否是一個後臺執行緒。如果是一個後臺執行緒,那麼它建立的任何執行緒將被自動設定成後臺執行緒:
// Using a Thread Factory to create daemons.
import java.util.concurrent.*;
import net.mindview.util.*;
import static net.mindview.util.Print.*;
public class DaemonFromFactory implements Runnable {
public void run() {
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
print(Thread.currentThread() + " " + this);
}
} catch(InterruptedException e) {
print("Interrupted");
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool(
new DaemonThreadFactory());
for(int i = 0; i < 10; i++)
exec.execute(new DaemonFromFactory());
print("All daemons started");
TimeUnit.MILLISECONDS.sleep(500); // Run for a while
}
} /* (Execute to see output) *///:~
後臺程序在不執行finaiiy子句的情況下就會終止其run()方法:
//: concurrency/DaemonsDontRunFinally.java
// Daemon threads don't run the finally clause
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
class ADaemon implements Runnable {
public void run() {
try {
print("Starting ADaemon");
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException e) {
print("Exiting via InterruptedException");
} finally {
print("This should always run?");
}
}
}
public class DaemonsDontRunFinally {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new ADaemon());
t.setDaemon(true);
t.start();
}
} /* Output:
Starting ADaemon
*///:~
如果你註釋調對setDaemon()的呼叫,就會看到finally子句將會執行。
當最後一個非後臺執行緒終止時,後臺執行緒會突然終止。因此一旦main()退出,JVM就會立即關閉所有後臺執行緒。因為不能以優雅的方式來關閉後臺執行緒,所以它們幾乎不是一種好的思想。非後臺的Executor通常是一種更好的方法,它控制的所有任務都可以同時被關閉,關閉將以有序的方式執行。
20.2.9 編碼的變體
使用直接從Thread繼承這種可替代的方式:
//: concurrency/SimpleThread.java
// Inheriting directly from the Thread class.
public class SimpleThread extends Thread {
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread() {
// Store the thread name:
super(Integer.toString(++threadCount));
start();
}
public String toString() {
return "#" + getName() + "(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0)
return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SimpleThread();
}
} /* Output:
#1(5), #1(4), #1(3), #1(2), #1(1), #2(5), #2(4), #2(3), #2(2), #2(1), #3(5), #3(4), #3(3), #3(2), #3(1), #4(5), #4(4), #4(3), #4(2), #4(1), #5(5), #5(4), #5(3), #5(2), #5(1),
*///:~
通過呼叫適當的Thread構造器為Thread物件賦予具體的名稱,這個名稱可以通過使用GetName()的toString()中獲得。
慣用法是自管理的Runnable:
public class SelfManaged implements Runnable {
private int countDown = 5;
private Thread t = new Thread(this);//傳入當前物件
public SelfManaged() { t.start(); }
public String toString() {
return Thread.currentThread().getName() +
"(" + countDown + "), ";
}
public void run() {
while(true) {
System.out.print(this);
if(--countDown == 0)
return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SelfManaged();
}
} /* Output:
Thread-0(5), Thread-0(4), Thread-0(3), Thread-0(2), Thread-0(1), Thread-1(5), Thread-1(4), Thread-1(3), Thread-1(2), Thread-1(1), Thread-2(5), Thread-2(4), Thread-2(3), Thread-2(2), Thread-2(1), Thread-3(5), Thread-3(4), Thread-3(3), Thread-3(2), Thread-3(1), Thread-4(5), Thread-4(4), Thread-4(3), Thread-4(2), Thread-4(1),
*///:~
這裡實現介面使得你可以繼承另一個不同的類。
通過使用內部類來將執行緒程式碼隱藏在類中:
//: concurrency/ThreadVariations.java
// Creating threads with inner classes.
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
// Using a named inner class:
class InnerThread1 {//建立一個擴充套件自Thread的匿名內部類
private int countDown = 5;
private Inner inner;
private class Inner extends Thread {
Inner(String name) {
super(name);
start();
}
public void run() {
try {
while (true) {
print(this);
if (--countDown == 0) return;
sleep(10);
}
} catch (InterruptedException e) {
print("interrupted");
}
}
public String toString() {
return getName() + ": " + countDown;
}
}
public InnerThread1(String name) {//建立這個內部類的例項
inner = new Inner(name);
}
}
// Using an anonymous inner class:
class InnerThread2 {
private int countDown = 5;
private Thread t;
public InnerThread2(String name) {//可替換方式:在構造器中建立了一個匿名的Thread子類,並且將其向上轉型為Thread引用t。
t = new Thread(name) {
public void run() {
try {
while (true) {
print(this);
if (--countDown == 0) return;
sleep(10);
}
} catch (InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
// Using a named Runnable implementation:
class InnerRunnable1 {
private int countDown = 5;
private Inner inner;
private class Inner implements Runnable {
Thread t;
Inner(String name) {
t = new Thread(this, name);
t.start();
}
public void run() {
try {
while (true) {
print(this);
if (--countDown == 0) return;
TimeUnit.MILLISECONDS.sleep(10);
}
} catch (InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return t.getName() + ": " + countDown;
}
}
public InnerRunnable1(String name) {
inner = new Inner(name);
}
}
// Using an anonymous Runnable implementation:
class InnerRunnable2 {
private int countDown = 5;
private Thread t;
public InnerRunnable2(String name) {
t = new Thread(new Runnable() {
public void run() {
try {
while (true) {
print(this);
if (--countDown == 0) return;
TimeUnit.MILLISECONDS.sleep(10);
}
} catch (InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return Thread.currentThread().getName() +
": " + countDown;
}
}, name);
t.start();
}
}
// A separate method to run some code as a task:
class ThreadMethod {//在方法內部建立執行緒
private int countDown = 5;
private Thread t;
private String name;
public ThreadMethod(String name) {
this.name = name;
}
public void runTask() {
if (t == null) {
t = new Thread(name) {
public void run() {
try {
while (true) {
print(this);
if (--countDown == 0) return;
sleep(10);
}
} catch (InterruptedException e) {
print("sleep() interrupted");
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
}
public class ThreadVariations {
public static void main(String[] args) {
new InnerThread1("InnerThread1");
new InnerThread2("InnerThread2");
new InnerRunnable1("InnerRunnable1");
new InnerRunnable2("InnerRunnable2");
new ThreadMethod("ThreadMethod").runTask();
}
} /* (Execute to see output) *///:~
20.2.10 術語
你對Thread類實際沒有任何控制權。你建立任務,並通過某種方式將一個執行緒附著到任務上,以使得這個執行緒可以驅動任務。
Java的執行緒機制基於來自C的低階的p執行緒方式,這是一種你必須深入研究,並且需要完全理解其所有細節的方式。
20.2.11 加入一個執行緒
一個執行緒可以在其他執行緒上呼叫join()方法,其效果是等待一段時間知道第二個執行緒結束才繼續執行。
如果某個執行緒在另一個執行緒t上呼叫t.join(),此執行緒將被掛起,知道目標執行緒t結束才恢復。
也可以呼叫join()時帶上一個超時引數,這樣如果目標執行緒在這段時間到期時還沒有結束的話,join()方式總能返回。
對join()方法的呼叫可以被中斷,做法時在呼叫執行緒上呼叫interrupt()方法。
//: concurrency/Joining.java
// Understanding join().
import static net.mindview.util.Print.*;
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
System.out.println(name);
}
public void run() {
try {
sleep(duration);
} catch(InterruptedException e) {
print(getName() + " was interrupted. " +
"isInterrupted(): " + isInterrupted());
return;
}
print(getName() + " has awakened");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
System.out.println(name);
}
public void run() {
try {
sleeper.join();
} catch(InterruptedException e) {
print("Interrupted");
}
print(getName() + " join completed");
}
}
public class Joining {
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
}
} /* Output:
Grumpy was interrupted. isInterrupted(): false
Doc join completed
Sleepy has awakened
Dopey join completed
*///:~
Joiner執行緒將通過在Sleeper物件上呼叫join()方法來等待Sleeper醒來。在main()裡面,每個Sleeper都有一個Joiner,這個可以在輸出中發現,如果Sleeper被中斷或者是正常結束,Joiner將和Sleeper一同結束。
20.2.12 建立有響應的使用者介面
使用執行緒的動機之一就是建立有響應的使用者介面:
//: concurrency/ResponsiveUI.java
// User interface responsiveness.
// {RunByHand}
class UnresponsiveUI {
private volatile double d = 1;
public UnresponsiveUI() throws Exception {
while(d > 0)
d = d + (Math.PI + Math.E) / d;
System.in.read(); // Never gets here
}
}
public class ResponsiveUI extends Thread {
private static volatile double d = 1;
public ResponsiveUI() {
setDaemon(true);
start();
}
public void run() {
while(true) {
d = d + (Math.PI + Math.E) / d;
}
}
public static void main(String[] args) throws Exception {
//new UnresponsiveUI(); // Must kill this process
new ResponsiveUI();//作為後臺執行的同時,還在等待使用者的輸入
System.in.read();
System.out.println("aaaaaa");
System.out.println(d); // Shows progress
}
} ///:~
20.2.13 執行緒組
執行緒組持有一個執行緒集合
20.2.14 捕獲異常
由於執行緒的本質特性,使得你不能捕獲從執行緒中逃逸的異常。一旦異常逃出任務的run()方法,它就會向外傳播到控制檯,除非你採取特殊的步驟捕獲這種錯誤的異常。
下面的程式總是會丟擲異常:
//: concurrency/ExceptionThread.java
// {ThrowsException}
import java.util.concurrent.*;
public class ExceptionThread implements Runnable {
public void run() {
throw new RuntimeException();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
} ///:~
在main中放入try carth並不能抓住異常:
import java.util.concurrent.*;
public class NaiveExceptionHandling {
public static void main(String[] args) {
try {
ExecutorService exec =
Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
} catch(RuntimeException ue) {
// This statement will NOT execute!
System.out.println("Exception has been handled!");
}
}
} ///:~
我們需要修改Executor產生執行緒的方式。Thread.UncaughtExceptionHandler是Java SE5中的新介面,它允許在每個Thread物件上都附著一個異常處理器。Thread.UncaughtExceptionHandler.uncaughtException()會線上程因為捕獲的異常而臨近死亡時被呼叫,為了使用它,我們建立一個新型別ThreadFactory,它將在每個新建立的Thread物件上附著一個Thread.UncaughtExceptionHandler:
//: concurrency/CaptureUncaughtException.java
import java.util.concurrent.*;
class ExceptionThread2 implements Runnable {
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
System.out.println(
"eh = " + t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(
new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
}
} /* Output: (90% match)
HandlerThreadFactory@de6ced creating new Thread
created Thread[Thread-0,5,main]
eh = MyUncaughtExceptionHandler@1fb8ee3
run() by Thread[Thread-0,5,main]
eh = MyUncaughtExceptionHandler@1fb8ee3
caught java.lang.RuntimeException
*///:~
在Thread類中設定一個靜態域,並將這個處理器設定為預設的為捕獲異常處理器:
import java.util.concurrent.*;
public class SettingDefaultHandler {
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(
new MyUncaughtExceptionHandler());
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
} /* Output:
caught java.lang.RuntimeException
*///:~
20.3 共享受限資源
20.3.1 不正確的訪問資源
下面的任務產生一個偶數,而其他任何消費這些數字。消費者任何唯一工作就是檢查偶數的有效性。
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
// Allow this to be canceled:
public void cancel() { canceled = true; }//修改canceled標識
public boolean isCanceled() { return canceled; }//檢視該物件是否被取消
} ///:~
import java.util.concurrent.*;
public class EvenChecker implements Runnable {//消費者任務
private IntGenerator generator;
private final int id;
public EvenChecker(IntGenerator g, int ident) {
generator = g;
id = ident;
}
public void run() {
while(!generator.isCanceled()) {
int val = generator.next();
if(val % 2 != 0) {//程式將檢查是否是偶數,如果是奇數,那麼就是另一個執行緒還沒有執行完next()就呼叫了檢查判斷。
System.out.println(val + " not even!");
generator.cancel(); // Cancels all EvenCheckers
}
}
}
// Test any type of IntGenerator:
public static void test(IntGenerator gp, int count) {
System.out.println("Press Control-C to exit");
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < count; i++)
exec.execute(new EvenChecker(gp, i));
exec.shutdown();
}
// Default value for count:
public static void test(IntGenerator gp) {
test(gp, 10);
}
} ///:~
共享公共資源的任務可以觀察該資源的終止訊號。這可以消除所謂競爭條件,即兩個或更多的任務競爭響應某個條件,因此產生的衝突或以一致結果:
public class EvenGenerator extends IntGenerator {
private volatile int currentEvenValue = 0;
public int next() {//一個任務可能在另一個任務執行第一個對currentEvenValue遞增操作之後,但沒有執行第二個操作之前,呼叫next()方法。
++currentEvenValue; // Danger point here!
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new EvenGenerator());
}
} /* Output: (Sample)
Press Control-C to exit
89476993 not even!
89476993 not even!
*///:~
遞增也不是原子操作,所以,必須要保護任務。
20.3.2 解決共享資源競爭
使用執行緒時的一個基本問題:你永遠都不知道一個執行緒何時在執行。
對於併發,你需要某種方式來防止兩個任務訪問相同的資源。
防止這種衝突的方法就是當資源被一個任務使用時,在其上加鎖。
基本所有併發模式在解決執行緒衝突問題的時候,都是採用序列化訪問共享資源的方案。這意味著在給定時刻只允許一個任務訪問共享資源。通常這是通過在程式碼前面加上一條鎖語句來實現的,這就使得在一段時間內只有一個任務可以執行這段程式碼。因為鎖語句產生了一種互相排斥的效果,所有這種機制被稱為互斥量。
Java提供關鍵字synchronized的形式,為防止資源衝突提供了內建支援。當任務要執行被synchronized關鍵字保護的程式碼片段的時候,它將檢查鎖是否可用,然後獲取鎖,執行程式碼,釋放鎖。
要控制對共享資源的訪問,得先把它包裝進一個物件,然後把所有要訪問這個資源的方法標記為synchronized。
synchronized void f(){}
所有物件都自動含有單一的鎖(也稱為監視器)。當物件上呼叫其任意synchronized方法的時候,此物件都被加鎖。
在使用併發時,將域設定為private是非常重要的,否則,synchronized關鍵字就不能防止其他任務直接訪問域,這樣就會產生衝突。
一個任務可以獲取多個鎖。
JVM負責跟蹤物件被加鎖的次數。如果一個物件被解鎖,其計數變為0。在任務第一個給物件加鎖的時候,計數變為1.每當這個相同的任務在這個物件上獲取鎖,計數都會遞增。只有首先獲得鎖的任務才能允許繼續獲取多個鎖。每當任務離開一個synchronized方法,計數遞減,當計數為0的時候,鎖被完全釋放,此時別的任務就可以使用此資源。
針對每個類,也有一個鎖,所有synchronized static方法可以在類的範圍內防止對static資料的併發訪問。
你應該在什麼時候同步,可以運用Brian的同步規則:
如果你在寫一個變數,它接下來將被另一個執行緒讀取,或者正在讀取一個上一次已經被另一個執行緒寫過的變數,那麼你必須使用同步,並且,讀寫執行緒都必須使用相同的監視器同步。
每個訪問臨界共享資源的方法都必須被同步,否則它們就不會正確的工作。
同步控制EvenGenerator
public class
SynchronizedEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
public synchronized int next() {
++currentEvenValue;
Thread.yield(); // Cause failure faster
++currentEvenValue;
return currentEvenValue;
}
public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenGenerator());
}
} ///:~
第一個進入next()的任務將獲得鎖,任何試圖獲取鎖的任務都將從其開始嘗試之時被組賽,直到第一個任務釋放鎖。通過這種方式,任何時刻只有一個任務可以通過由互斥量看護的程式碼。
使用顯示Lock物件
Lock物件必須被顯示的建立,鎖定和釋放。
對於解決某些型別的問題來說,它更靈活。
import java.util.concurrent.locks.*;
public class MutexEvenGenerator extends IntGenerator {
private int currentEvenValue = 0;
private Lock lock = new ReentrantLock();
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield(); // Cause failure faster
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EvenChecker.test(new MutexEvenGenerator());
}
} ///:~
新增一個被互斥呼叫的鎖,並使用lock和unlock方法在next()內建立臨界資源
當你使用synchronized關鍵字時,需要寫的程式碼量更少,並且使用者錯誤出現的可能性也會降低,因此通常只有在解決特殊問題時,才能顯示使用Lock物件。例如:使用synchronized關鍵字不能嘗試獲取鎖且最終獲取鎖會失敗,或者嘗試著獲取鎖一段時間,然後放棄它,要實現這些,你必須使用concurrent類庫:
//: concurrency/AttemptLocking.java
// Locks in the concurrent library allow you
// to give up on trying to acquire a lock.
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class AttemptLocking {
private ReentrantLock lock = new ReentrantLock();//ReentrantLock可以讓你嘗試獲取鎖,但最終沒有獲取到鎖
public void untimed() {
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
if(captured)
lock.unlock();
}
}
public void timed() {
boolean captured = false;
try {
captured = lock.tryLock(2, TimeUnit.SECONDS);//嘗試獲取鎖,在2秒後失敗
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("tryLock(2, TimeUnit.SECONDS): " +
captured);
} finally {
if(captured)
lock.unlock();
}
}
public static void main(String[] args) {
final AttemptLocking al = new AttemptLocking();
al.untimed(); // True -- lock is available
al.timed(); // True -- lock is available
// Now create a separate task to grab the lock:
new Thread() {
{ setDaemon(true); }
public void run() {
al.lock.lock();
System.out.println("acquired");
}
}.start();
Thread.yield(); // Give the 2nd task a chance
al.untimed(); // False -- lock grabbed by task
al.timed(); // False -- lock grabbed by task
}
} /* Output:
tryLock(): true
tryLock(2, TimeUnit.SECONDS): true
acquired
tryLock(): false
tryLock(2, TimeUnit.SECONDS): false
*///:~
20.3.3 原子性與易變性
一個常不正確的知識是“原子操作不需要進行同步控制”
通過Goetz測試你就可以使用原子性:如果你可以編寫用於現代微處理器的高效能JVM,那麼就有資格去考慮是否可以避免同步。
使用volatile關鍵字,就會獲得(簡單的賦值和返回操作)原子性。
在多處理器系統上,相對於單處理系統而言,可視性問題遠比原子性問題多得多。一個任務做出的修改,即使在不中斷的意義上講是原子性的,對其他任務也可能是不可視的,因此不同的任務對應用的狀態有不同的檢視。另一方面,同步機制強制在處理系統中,一個任務做出的修改必須在應用中是可視的。如果沒有同步機制,那麼修改時可視將無法確定。
volatile關鍵字還確保了應用中的可視性。如果你將一個域宣告為volatile的,那麼只要對這個域產生了寫操作,那麼所有的讀操作就都可以看到這個修改。
原子性和易變性是不同的概念。在非volatile域上的原子操作不必重新整理到主存中去,因此其它讀取該域的任務也不必看到這個新值。如果多個任務在同時訪問某個域,那麼這個域就應該是volatile的,否則,這個域應該只能經由同步來訪問。同步也會導致向主存中重新整理,因此如果一個域完全由syschronized方法或語句來防護,那就不必將其設定為是volatile的。
一個任務所作的任務寫入操作對這個任務來說都是可視的,因此如果它只需要在這個任務內部可視,那麼你就不需要將其設定為volatile的。
當一個域的值依賴於它之前的值時,volatile就無法工作了。
使用volatile而不是synchronized的唯一完全情況是類中只有一個可變域。所有,一般,你的第一選擇應該是synchronized。
不要盲目的應用原子性:
import java.util.concurrent.*;
public class AtomicityTest implements Runnable {
private int i = 0;
public int getValue() { return i; }
private synchronized void evenIncrement() { i++; i++; }
public void run() {
while(true)
evenIncrement();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
AtomicityTest at = new AtomicityTest();
exec.execute(at);
while(true) {
int val = at.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
儘管return i確實是原子性操作,但是缺少同步使得其數值可以在處於不穩定的中間狀態被讀取。除此之外,由於i也不是volatile的,因此還存在可視性問題。
一個產生序列數的類每當nextSerialNumber()被呼叫時,它必須呼叫者返回唯一的值:
public class SerialNumberGenerator {
private static volatile int serialNumber = 0;
public static int nextSerialNumber() {
return serialNumber++; // Not thread-safe
}
}
如果一個域可能會被多個任務同時訪問,或者這些任務中至少有一個是寫入任務,那麼你就應該將這個域設定為volatile。將一個域定義為volatile,那麼它就會告訴編譯器不要執行任務移除讀取和寫入操作的優化,這些操作的目的是用執行緒中的區域性變數維護對這個域的精確同步。
import java.util.concurrent.*;
// Reuses storage so we don't run out of memory:
class CircularSet {
private int[] array;
private int len;
private int index = 0;
public CircularSet(int size) {
array = new int[size];
len = size;
// Initialize to a value not produced
// by the SerialNumberGenerator:
for(int i = 0; i < size; i++)
array[i] = -1;
}
public synchronized void add(int i) {
array[index] = i;
// Wrap index and write over old elements:
index = ++index % len;
}
public synchronized boolean contains(int val) {
for(int i = 0; i < len; i++)
if(array[i] == val) return true;
return false;
}
}
public class SerialNumberChecker {
private static final int SIZE = 10;
private static CircularSet serials =
new CircularSet(1000);
private static ExecutorService exec =
Executors.newCachedThreadPool();
static class SerialChecker implements Runnable {
public void run() {
while(true) {
int serial =
SerialNumberGenerator.nextSerialNumber();
if(serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < SIZE; i++)
exec.execute(new SerialChecker());
// Stop after n seconds if there's an argument:
if(args.length > 0) {
TimeUnit.SECONDS.sleep(new Integer(args[0]));
System.out.println("No duplicates detected");
System.exit(0);
}
}
}
上面這個程式,最終會得到重複的序列數。如果要解決這個問題,需要在nextSerialNumber()前面加上synchronized關鍵字。
20.3.4 原子類
Java SE5引入了諸如AtomicIntger,AtomicLong,AtomicReference等特殊的原子性變數類,它們提供下面形式的原子性條件更新操作:
booleean compareAndSet(expectedValue,updateValue)
我們可以使用AtomicInteger來重寫:
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);
public int getValue() { return i.get(); }
private void evenIncrement() { i.addAndGet(2); }
public void run() {
while(true)
evenIncrement();
}
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
public void run() {
System.err.println("Aborting");
System.exit(0);
}
}, 5000); // Terminate after 5 seconds
ExecutorService exec = Executors.newCachedThreadPool();
AtomicIntegerTest ait = new AtomicIntegerTest();
exec.execute(ait);
while(true) {
int val = ait.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
20.3.5 臨界區
只是希望防止多個執行緒同時訪問方法內部的部分程式碼而不是防止訪問整個方法。通過這種方式分離出來的程式碼段被稱為臨界區,它也使用synchronized關鍵字建立,synchronized被用來指定某個物件,此物件的鎖被用來對花括號內程式碼進行同步控制:
synchronized(syncObject){}
這也被稱為同步控制塊;在進入此段程式碼前,必須得到syncObject物件的鎖。如果其他執行緒已經得到這個鎖,那麼就得等到鎖被釋放以後,才能進入臨界區。
如果把一個非保護型別的類,在其他類的保護和控制下,應用於多執行緒環境:
//: concurrency/CriticalSection.java
// Synchronizing blocks instead of entire methods. Also
// demonstrates protection of a non-thread-safe class
// with a thread-safe one.
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
class Pair { // Not thread-safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() { this(0, 0); }
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString() {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException
extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Arbitrary invariant -- both variables must be equal:
public void checkState() {
if(x != y)
throw new PairValuesNotEqualException();
}
}
// Protect a Pair inside a thread-safe class:
abstract class PairManager {//持有一個Pair物件,並控制一切對它的訪問
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage =
Collections.synchronizedList(new ArrayList<Pair>());
public synchronized Pair getPair() {
// Make a copy to keep the original safe:
return new Pair(p.getX(), p.getY());
}
// Assume this is a time consuming operation
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch(InterruptedException ignore) {}
}
public abstract void increment();
}
// Synchronize the entire method:
class PairManager1 extends PairManager {
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}
// Use a critical section:
class PairManager2 extends PairManager {
public void increment() {
Pair temp;
synchronized(this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
class PairManipulator implements Runnable {
private PairManager pm;
public PairManipulator(PairManager pm) {
this.pm = pm;
}
public void run() {
while(true)
pm.increment();
}
public String toString() {
return "Pair: " + pm.getPair() +
" checkCounter = " + pm.checkCounter.get();
}
}
class PairChecker implements Runnable {
private PairManager pm;
public PairChecker(PairManager pm) {
this.pm = pm;
}
public void run() {
while(true) {
pm.checkCounter.incrementAndGet();
pm.getPair().checkState();
}
}
}
public class CriticalSection {
// Test the two different approaches:
static void
testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator
pm1 = new PairManipulator(pman1),
pm2 = new PairManipulator(pman2);
PairChecker
pcheck1 = new PairChecker(pman1),
pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch(InterruptedException e) {
System.out.println("Sleep interrupted");
}
System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
System.exit(0);
}
public static void main(String[] args) {
PairManager
pman1 = new PairManager1(),
pman2 = new PairManager2();
testApproaches(pman1, pman2);
}
} /* Output: (Sample)
pm1: Pair: x: 15, y: 15 checkCounter = 272565
pm2: Pair: x: 16, y: 16 checkCounter = 3956974
*///:~
交給你的一個非執行緒安全的Pair類,你需要在一個執行緒環境中使用它。通過建立PairManager類就可以實現,PairManager類持有一個Pair物件並控制對它的一切訪問。
PairManager類結構,它的一些功能在基類中實現,並且一個或多個抽象方法在派生類中定義,這種結構在設計模式中稱為模板方法。
對於PairChecker的檢查頻率,PairManager1.increment()不允許有PairManager2.increment()那樣多。後者採用同步控制塊進行控制的典型原因:使得其他執行緒能更多的訪問。
使用顯示的Lock物件來建立臨界區:
//: concurrency/ExplicitCriticalSection.java
// Using explicit Lock objects to create critical sections.
import java.util.concurrent.locks.*;
// Synchronize the entire method:
class ExplicitPairManager1 extends PairManager {
private Lock lock = new ReentrantLock();
public synchronized void increment() {
lock.lock();
try {
p.incrementX();
p.incrementY();
store(getPair());
} finally {
lock.unlock();
}
}
}
// Use a critical section:
class ExplicitPairManager2 extends PairManager {
private Lock lock = new ReentrantLock();
public void increment() {
Pair temp;
lock.lock();
try {
p.incrementX();
p.incrementY();
temp = getPair();
} finally {
lock.unlock();
}
store(temp);
}
}
public class ExplicitCriticalSection {
public static void main(String[] args) throws Exception {
PairManager
pman1 = new ExplicitPairManager1(),
pman2 = new ExplicitPairManager2();
CriticalSection.testApproaches(pman1, pman2);
}
} /* Output: (Sample)
pm1: Pair: x: 15, y: 15 checkCounter = 174035
pm2: Pair: x: 16, y: 16 checkCounter = 2608588
*///:~
20.3.6 在其他物件上同步
synchronized塊必須給定一個在其上進行同步的物件,並且最合理的方式是,使用其方法正在被呼叫的當前物件:synchronized(this)。
如果獲得了synchronized塊上的鎖,那麼改物件其他的synchronized方法和臨界區就不能被呼叫了,因此,如果在this上同步,臨界區的效果就會直接縮小到同步的範圍內。
兩個任務可以同時進入同一個物件,只要這個物件上的方法是在不同的鎖上同步的即可:
//: concurrency/SyncObject.java
// Synchronizing on another object.
import javax.xml.crypto.Data;
import java.text.SimpleDateFormat;
import java.util.Date;
import static net.mindview.util.Print.*;
class DualSynch {
private Object syncObject = new Object();
public synchronized void f() {//同步整個方法,在this上同步。
for(int i = 0; i < 5; i++) {
print("f():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date()));
Thread.yield();
}
}
public void g() {//在syncObject物件上同步
synchronized(syncObject) {
for(int i = 0; i < 5; i++) {
print("g():"+new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss:SSS").format(new Date()));
Thread.yield();
}
}
}
}
public class SyncObject {
public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
public void run() {
ds.f();
}
}.start();
ds.g();
}
} /* Output: (Sample)
g():2018/05/31-10:36:32:635
f():2018/05/31-10:36:32:635
f():2018/05/31-10:36:32:637
g():2018/05/31-10:36:32:637
f():2018/05/31-10:36:32:637
g():2018/05/31-10:36:32:637
f():2018/05/31-10:36:32:637
g():2018/05/31-10:36:32:637
f():2018/05/31-10:36:32:638
g():2018/05/31-10:36:32:638
*///:~
20.3.7 執行緒本地儲存
防止任務在共享資源上產生衝突的第二種方式是根除對變數的共享。執行緒本地儲存是一種自動化機制,可以為使用相同變數的每個不同的執行緒都建立不同的儲存。
//: concurrency/ThreadLocalVariableHolder.java
// Automatically giving each thread its own storage.
import java.util.concurrent.*;
import java.util.*;
class Accessor implements Runnable {
private final int id;
public Accessor(int idn) { id = idn; }
public void run() {
while(!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString() {
return "#" + id + ": " +
ThreadLocalVariableHolder.get();
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value =
new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue() {
return rand.nextInt(10000);
}
};
public static void increment() {
value.set(value.get() + 1);
}
public static int get() { return value.get(); }
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Accessor(i));
TimeUnit.SECONDS.sleep(3); // Run for a while
exec.shutdownNow(); // All Accessors will quit
}
} /* Output: (Sample)
#0: 9259
#1: 556
#2: 6694
#3: 1862
#4: 962
#0: 9260
#1: 557
#2: 6695
#3: 1863
#4: 963
...
*///:~
ThreadLoca;物件通常當作靜態域儲存。
每個單獨的執行緒都被分配了自己的儲存,因為它們每個都需要跟蹤自己的計數值。
20.4 終結任務
下面演示一個終止問題,而且還是一個資源共享的示例
20.4.1 裝飾性花園
獲取每天進入公園的總人數。在公園的任何一個門口都有計數器可以遞增。
//: concurrency/OrnamentalGarden.java
import java.util.concurrent.*;
import java.util.*;
import static net.mindview.util.Print.*;
class Count {
private int count = 0;
private Random rand = new Random(47);
// Remove the synchronized keyword to see counting fail:
public synchronized int increment() {
int temp = count;
if(rand.nextBoolean()) // Yield half the time
Thread.yield();
return (count = ++temp);
}
public synchronized int value() { return count; }
}
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances =
new ArrayList<Entrance>();
private int number = 0;
// Doesn't need synchronization to read:
private final int id;
private static volatile boolean canceled = false;
// Atomic operation on a volatile field:
public static void cancel() { canceled = true; }
public Entrance(int id) {
this.id = id;
// Keep this task in a list. Also prevents
// garbage collection of dead tasks:
entrances.add(this);
}
public void run() {
while(!canceled) {
synchronized(this) {
++number;
}
print(this + " Total: " + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch(InterruptedException e) {
print("sleep interrupted");
}
}
print("Stopping " + this);
}
public synchronized int getValue() { return number; }
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for(Entrance entrance : entrances)
sum += entrance.getValue();
return sum;
}
}
public class OrnamentalGarden {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++)
exec.execute(new Entrance(i));
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS))
print("Some tasks were not terminated!");
print("Total: " + Entrance.getTotalCount());
print("Sum of Entrances: " + Entrance.sumEntrances());
}
} /* Output: (Sample)
Entrance 0: 1 Total: 1
Entrance 2: 1 Total: 3
Entrance 1: 1 Total: 2
Entrance 4: 1 Total: 5
Entrance 3: 1 Total: 4
Entrance 2: 2 Total: 6
Entrance 4: 2 Total: 7
Entrance 0: 2 Total: 8
...
Entrance 3: 29 Total: 143
Entrance 0: 29 Total: 144
Entrance 4: 29 Total: 145
Entrance 2: 30 Total: 147
Entrance 1: 30 Total: 146
Entrance 0: 30 Total: 149
Entrance 3: 30 Total: 148
Entrance 4: 30 Total: 150
Stopping Entrance 2: 30
Stopping Entrance 1: 30
Stopping Entrance 0: 30
Stopping Entrance 3: 30
Stopping Entrance 4: 30
Total: 150
Sum of Entrances: 150
*///:~
20.4.2 在阻塞時終結
sleep()的一種情況,它使任務從執行狀態變為被阻塞狀態,而有時你必須終止被阻塞的任務。
執行緒狀態
一個執行緒可以處於以下四種狀態之一:
- 新建:當執行緒被建立時,它只會短暫的處於這種狀態。此時它已經分配了必須的系統資源,並執行了初始化。
- 就緒:在這種狀態下,只要排程器把時間片分配給執行緒,執行緒就可以執行。
- 阻塞:執行緒能夠執行,但有某些條件阻止它的執行。當執行緒處於阻塞狀態時,排程器講忽略執行緒,不會分配給執行緒任何CPU時間。
- 死亡:處於死亡或終止狀態的執行緒將不再是可排程的。任務死亡的通常方式是從run()方法返回,但是任務的執行緒還可以被中斷。
進入阻塞狀態
一個任務進入阻塞狀態,可能有如下原因:
- 通過呼叫sleep(milliseconds)使任務進入休眠狀態,在這種情況下,任務在指定的時間內不會執行。
- 你通過呼叫wait()使執行緒掛起。直到執行緒得到了notity()或notityAll()訊息,執行緒才會進入就緒狀態。
- 任務在等待某個輸入/輸出完成
- 任務試圖在某個物件上呼叫其同步控制方法,但是物件鎖不可用,因為另一個任務已經獲取了這個鎖。
檢視的問題:希望能夠終止處於阻塞狀態的任務。
20.4.3 中斷
在任務的run()方法中間打斷,更像是丟擲的異常,因此在Java執行緒種的這種型別的異常中斷種用到了異常。
Thread類包含interrupt()方法,因此你可以終止被阻塞的任務,這個方法將設定執行緒的中斷狀態。如果一個執行緒已經被阻塞,或者試圖執行一個阻塞操作,那麼設定這個執行緒的中斷狀態將丟擲InterruptedException。
在Executor上呼叫shutdownNow(),那麼它將傳送一個interrupt()呼叫給它啟動的所有執行緒。
使用Executor,那麼通過呼叫submin()而不是executor()來啟動任務,就可以持有改任務的上下文。sub