頂級架構師學習——第二階段:實戰Java高併發程式設計
1、什麼是並行?
並行處理(ParallelProcessing)是計算機系統中能同時執行兩個或更多個處理機的一種計算方法。處理機可同時工作於同一程式的不同方面。並行處理的主要目的是節省大型和複雜問題的解決時間。
2、為什麼需要並行?
平行計算只有在 影象處理 和 服務端程式設計 兩個領域可以使用,並且它在這2個領域確實有著大量廣泛的使用。但是在其它任何地方,平行計算毫無建樹!
摩爾定律的失效。10年過去了,我們還停留在4GHZ。
平行計算還出於業務模型的需要。並不是為了提高系統性能,而是確實在業務上需要多個執行單元。比如HTTP伺服器,為每一個Socket連線新建一個處理執行緒,讓不同執行緒承擔不同的業務工作,簡化任務排程。
3、有幾個重要的概念
同步(synchronous)和非同步(asynchronous):方法呼叫時間的區別
併發(Concurrency)和並行(Parallelism)
臨界區 :臨界區用來表示一種公共資源或者說是共享資料,可以被多個執行緒使用。但是每一次,只能有一個執行緒
使用它,一旦臨界區資源被佔用,其他執行緒要想使用這個資源,就必須等待。
阻塞(Blocking)和非阻塞(Non-Blocking):阻塞和非阻塞通常用來形容多執行緒間的相互影響。比如一個執行緒佔用了臨界區資源,那麼其它所有需要這個資源的執行緒就必須在這個臨界區中進行等待,等待會導致執行緒掛起。這種情況就是阻塞。此時,如
果佔用資源的執行緒一直不願意釋放資源,那麼其它所有阻塞在這個臨界區上的執行緒都不能工作。非阻塞允許多個執行緒同時進入臨界區。
死鎖(Deadlock)、飢餓(Starvation)和活鎖(Livelock):死鎖是由於多個程序相互請求導致無法滿足資源需求;飢餓指執行緒長時間無法得到需要的資源,無法繼續執行;活鎖指在一定時間之後能夠滿足資源需要從而執行緒能夠繼續向下執行。
併發級別 :阻塞、無障礙、無鎖、無等待(後三個為非阻塞)
4、兩個定律
Amdahl定律:定義了序列系統並行化後的加速比的計算公式和理論上限;加速比定義:加速比=優化前系統耗時/優化後系統耗時。公式:Tn=T1(F+1/n*(1-F)),其中Tn為優化後耗時,T1為單核時耗時,F為序列比例,n為處理器個數。
Gustafson定律:說明處理器個數,序列比例和加速比之間的關係,只要有足夠的並行化,那麼加速比和CPU個數成正比。公式:S=n-F(n-1),其中S為加速比,n為處理器個數,F為序列比例。
5、執行緒相關
執行緒是程序內的執行單元。
Thread t1 = new Thread();// 新建執行緒
t1.start();// 啟動執行緒
// Thread.stop();// 不建議使用,它會釋放所有的monitor。
t1.interrupt(); // 中斷執行緒
// t1.isInterrupted(); // 判斷執行緒是否被中斷
t1.interrupted(); // 判斷執行緒是否被中斷,並清除中斷狀態
// 掛起suspend和繼續執行resume執行緒,如果加鎖發生在resume()之前則產生死鎖
// 等待執行緒結束join
// 讓出當前佔有資源,但還會繼續競爭yield
// 守護程序:在後臺默默地完成一些系統性的服務,比如垃圾回收執行緒、JIT執行緒就可以理解為守護執行緒,當一個Java應用內,只有守護執行緒時,Java虛擬機器就會自然退出,必須在啟動執行緒前設定為守護程序,否則報錯
Thread t = new DaemonT();
t.setDaemon(true);
t.start();
// 執行緒優先順序,高優先順序執行緒更容易獲得資源,但並不是絕對的
Thread high=new HightPriority();
LowPriority low=new LowPriority();
high.setPriority(Thread.MAX_PRIORITY);
low.setPriority(Thread.MIN_PRIORITY);
low.start();
high.start();
/* 基礎的執行緒同步,使用synchronized。
指定加鎖物件:對給定物件加鎖,進入同步程式碼前要獲得給定物件的鎖。
public void run() {
for(int j=0;j<10000000;j++){
synchronized(instance){
i++;
}
}
}
直接作用於例項方法:相當於對當前例項加鎖,進入同步程式碼前要獲得當前例項的鎖。
public synchronized void increase(){
i++;
}
直接作用於靜態方法:相當於對當前類加鎖,進入同步程式碼前要獲得當前類的鎖。
public static synchronized void increase(){
i++;
}
此外還有Object.wait(),Object.notify()等操作實現執行緒同步
public static class T2 extends Thread{
public void run()
{
synchronized (object) {
System.out.println(System.currentTimeMillis()
+":T2 start! notify one thread");
object.notify();
System.out.println(System.currentTimeMillis()+":T2 end!");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
}
}
public static class T1 extends Thread{
public void run()
{
synchronized (object) {
System.out.println(System.currentTimeMillis()+":T1 start! ");
try {
System.out.println(System.currentTimeMillis()
+":T1 wait for object ");
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()+":T1 end!");
}
}
}*/
6、記憶體模型和執行緒安全
原子性是指一個操作是不可中斷的。即使是在多個執行緒一起執行的時候,一個操作一旦開始,就
不會被其它執行緒干擾。
有序性是指一條指令的執行分為很多步驟,按照一定的順序向下執行。
可見性是指當一個執行緒修改了某一個共享變數的值,其他執行緒是否能夠立即知道這個修改。
public class VisibilityTest extends Thread {
private boolean stop;
public void run() {
int i = 0;
while(!stop) {
i++;
}
System.out.println("finish loop,i=" + i);
}
public void stopIt() {
stop = true;
}
public boolean getStop(){
return stop;
}
public static void main(String[] args) throws Exception {
VisibilityTest v = new VisibilityTest();
v.start();
Thread.sleep(1000);
v.stopIt();
Thread.sleep(2000);
System.out.println("finish main");
System.out.println(v.getStop());
}
}
Happen-Before原則
執行緒安全 指某個函式、函式庫在多執行緒環境中被呼叫時,能夠正確地處理各個執行緒的區域性變數,使程式功能正確完成。
public class AccountingSync implements Runnable{
static AccountingSync instance=new AccountingSync();
static int i=0;
@Override
public void run() {
for(int j=0;j<10000000;j++){
synchronized(instance){
i++;
}
}
}
}
7、無鎖
CAS:Compare and Swap
CAS演算法的過程是這樣:它包含3個引數CAS(V,E,N)。V表示要更新的變數,E表示預期值,N表示新值。僅當V值等於E值時,才會將V的值設為N,如果V值和E值不同,則說明已經有其他執行緒做了更新,則當前執行緒什麼都不做。最後,CAS返回當前V的真實值。CAS操作是抱著樂觀的態度進行的,它總是認為自己可以成功完成操作。當多個執行緒同時使用CAS操作一個變數時,只有一個會勝出,併成功更新,其餘均會失敗。失敗的執行緒不會被掛起,僅是被告知失敗,並且允許再次嘗試,當然也允許失敗的執行緒放棄操作。基於這樣的原理,CAS操作即時沒有鎖,也可以發現其他執行緒對當前執行緒的干擾,並進行恰當的處理。
無鎖類的使用
AtomicInteger:Number
主要介面:
public final int get() //取得當前值
public final void set(int newValue) //設定當前值
public final int getAndSet(int newValue) //設定新值,並返回舊值值
public final boolean compareAndSet(int expect, int u)//如果當前值為expect,則設定為u
public final int getAndIncrement() //當前值加1,返回舊值
public final int getAndDecrement() //當前值減1,返回舊值
public final int getAndAdd(int delta) //當前值增加delta,返回舊值
public final int incrementAndGet() //當前值加1,返回新值
public final int decrementAndGet() //當前值減1,返回新值
public final int addAndGet(int delta) //當前值增加delta,返回新值
Unsafe:非安全的操作,比如:根據偏移量設定值;park();底層的CAS操作;非公開API,在不同版本的JDK中, 可能有較大差異
主要介面:
//獲得給定物件偏移量上的int值
public native int getInt(Object o, long offset);
//設定給定物件偏移量上的int值
public native void putInt(Object o, long offset, int x);
//獲得欄位在物件中的偏移量
public native long objectFieldOffset(Field f);
//設定給定物件的int值,使用volatile語義
public native void putIntVolatile(Object o, long offset, int x);
//獲得給定物件物件的int值,使用volatile語義
public native int getIntVolatile(Object o, long offset);
//和putIntVolatile()一樣,但是它要求被操作欄位就是volatile型別的
public native void putOrderedInt(Object o, long offset, int x);
AtomicReference:對引用進行修改,是一個模板類,抽象化了資料型別
主要介面:
get()
set(V)
compareAndSet()
getAndSet(V)
AtomicStampedReference:針對ABA問題
主要介面:
//比較設定 引數依次為:期望值 寫入新值 期望時間戳 新時間戳
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
//獲得當前物件引用
public V getReference()
//獲得當前時間戳
public int getStamp()
//設定當前物件引用和時間戳
public void set(V newReference, int newStamp)
AtomicIntegerArray:支援無鎖的陣列
主要介面:
//獲得陣列第i個下標的元素
public final int get(int i)
//獲得陣列的長度
public final int length()
//將陣列第i個下標設定為newValue,並返回舊的值
public final int getAndSet(int i, int newValue)
//進行CAS操作,如果第i個下標的元素等於expect,則設定為update,設定成功返回true
public final boolean compareAndSet(int i, int expect, int update)
//將第i個下標的元素加1
public final int getAndIncrement(int i)
//將第i個下標的元素減1
public final int getAndDecrement(int i)
//將第i個下標的元素增加delta(delta可以是負數)
public final int getAndAdd(int i, int delta)
AtomicIntegerFieldUpdater:讓普通變數也享受原子操作。ps:1.Updater只能修改它可見範圍內的變數。因為Updater使用反射得到這個變數。如果變數不可見,就會出錯。比如如果score申明為private,就是不可行的。2.為了確保變數被正確的讀取,它必須是volatile型別的。如果我們原有程式碼中未申明這個型別,那麼簡單的申明一下就行,這不會引起什麼問題。3.由於CAS操作會通過物件例項中的偏移量直接進行賦值,因此,它不支援static欄位(Unsafe.objectFieldOffset()不支援靜態變數)。
主要介面:
AtomicIntegerFieldUpdater.newUpdater()
incrementAndGet()
8、各種同步工具的使用
ReentrantLock:可重入鎖
可重入(單執行緒可以重複進入,但要重複退出)、可中斷(需要設定lockInterruptibly())、可限時(超時不能獲得鎖,就返回false,不會永久等待構成死鎖)、公平鎖(先來先得)
Condition:類似於 Object.wait()和Object.notify();與ReentrantLock結合使用
主要介面:
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
API詳解:
1.await()方法會使當前執行緒等待,同時釋放當前鎖,當其他執行緒中使用signal()時或者signalAll()方法時,線
程會重新獲得鎖並繼續執行。或者當執行緒被中斷時,也能跳出等待。這和Object.wait()方法很相似。
2.awaitUninterruptibly()方法與await()方法基本相同,但是它並不會再等待過程中響應中斷。
3.singal()方法用於喚醒一個在等待中的執行緒。相對的singalAll()方法會喚醒所有在等待中的執行緒。這和Obej
ct.notify()方法很類似。
Semaphore:共享鎖;執行多個執行緒同時臨界區
主要介面:
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
ReadWriteLock:JDK5中提供的讀寫分離鎖
訪問情況:
讀-讀不互斥:讀讀之間不阻塞。
讀-寫互斥:讀阻塞寫,寫也會阻塞讀。
寫-寫互斥:寫寫阻塞。
主要介面:
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
CountDownLatch:倒數計時器。一種典型的場景就是火箭發射。在火箭發射前,為了保證萬無一失,往往還要進行各項裝置、儀器的檢查。只有等所有檢查完畢後,引擎才能點火。這種場景就非常適合使用CountDownLatch。它可以使得點火執行緒
,等待所有檢查執行緒全部完工後,再執行。
主要介面:
static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();
CyclicBarrier:迴圈柵欄。Cyclic意為迴圈,也就是說這個計數器可以反覆使用。比如,假設我們將計數器設定為10。那麼湊齊第一批10個執行緒後,計數器就會歸零,然後接著湊齊下一批10個執行緒。
主要介面:
public CyclicBarrier(int parties, Runnable barrierAction)barrierAction就是當計數器一次計數完成後,系統會執行的動作
await()
LockSupport:提供執行緒阻塞原語。與suspend()相比不容易引起執行緒凍結。能夠響應中斷,但不丟擲異常。中斷響應的結果是,park()函式的返回,可以從Thread.interrupted()得到中斷標誌。
主要介面:
LockSupport.park();
LockSupport.unpark(t1);
9、併發容器
集合包裝
// Hash map
Collections.synchronizedMap
public static Map m=Collections.synchronizedMap(new HashMap());
// List
synchronizedList
// Set
synchronizedSet
ConcurrentHashMap:高效能HashMap
BlockingQueue:阻塞佇列
10、執行緒池基礎
JDK的內建執行緒池
執行緒池種類
newFixedThreadPool:混合執行緒池
newSingleThreadExecutor:單例執行緒池
newCachedThreadPool:快取執行緒池
newScheduledThreadPool:類似於事務
11、併發設計模式
在軟體工程中,設計模式(design pattern)是對軟體設計中普遍存在(反覆出現)的各種問題,所提出的解決方案。這個術語是由埃裡希·伽瑪(Erich Gamma)等人在1990年代從建築設計領域引入到電腦科學的。
單例模式:單例物件的類必須保證只有一個例項存在。許多時候整個系統只需要擁有一個的全域性物件,這樣有利於我們協調系統整體的行為 比如:全域性資訊配置。
public class Singleton {// 在類初始化時獲取單例
private Singleton(){
System.out.println("Singleton is create");
}
private static Singleton instance = new Singleton();
public static Singleton getInstance() {
return instance;
}
}
public class LazySingleton {// 以懶惰模式建立單例,當呼叫getInstance()方法時建立單例
private LazySingleton() {
System.out.println("LazySingleton is create");
}
private static LazySingleton instance = null;
public static synchronized LazySingleton getInstance() {
if (instance == null)
instance = new LazySingleton();
return instance;
}
}
public class StaticSingleton {
private StaticSingleton(){
System.out.println("StaticSingleton is create");
}
private static class SingletonHolder {
private static StaticSingleton instance = new StaticSingleton();
}
public static StaticSingleton getInstance() {
return SingletonHolder.instance;
}
}
不變模式:一個類的內部狀態建立後,在整個生命期間都不會發生變化時,就是不變類。不變模式不需要同步。
public final class Product {
//確保無子類
private final String no;
//私有屬性,不會被其他物件獲取
private final String name;
//final保證屬性不會被2次賦值
private final double price;
public Product(String no, String name, double price) { //在建立物件時,必須指定資料
super();
//因為建立之後,無法進行修改
this.no = no;
this.name = name;
this.price = price;
}
public String getNo() {
return no;
}
public String getName() {
return name;
}
public double getPrice() {
return price;
}
}
一些常用的比如String、Boolean、Byte、Character、Double、Float、Integer、Long、Short都是如此。
Future模式:核心思想是非同步呼叫。在呼叫方法時只產生一個包裝盒,具體耗時的實現在空閒時完成。
public interface Data {
public String getResult ();
}
public class FutureData implements Data {
protected RealData realdata = null; //FutureData是RealData的包裝
protected boolean isReady = false;
public synchronized void setRealData(RealData realdata) {
if (isReady) {
return;
}
this.realdata = realdata;
isReady = true;
notifyAll(); //RealData已經被注入,通知getResult()
}
public synchronized String getResult() { //會等待RealData構造完成
while (!isReady) {
try {
wait(); //一直等待,知道RealData被注入
} catch (InterruptedException e) {
}
}
return realdata.result; //由RealData實現
}
}
public class RealData implements Data {
protected final String result;
public RealData(String para) {
//RealData的構造可能很慢,需要使用者等待很久,這裡使用sleep模擬
StringBuffer sb=new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(para);
try {
//這裡使用sleep,代替一個很慢的操作過程
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
result =sb.toString();
}
public String getResult() {
return result;
}
}
public class Client {
public Data request(final String queryStr) {
final FutureData future = new FutureData();
new Thread() {
public void run() {// RealData的構建很慢,
//所以在單獨的執行緒中進行
RealData realdata = new RealData(queryStr);
future.setRealData(realdata);
}
}.start();
return future; // FutureData會被立即返回
}
public static void main(String[] args) {
Client client = new Client();
//這裡會立即返回,因為得到的是FutureData而不是RealData
Data data = client.request("name");
System.out.println("請求完畢");
try {
//這裡可以用一個sleep代替了對其他業務邏輯的處理
//在處理這些業務邏輯的過程中,RealData被建立,從而充分利用了等待時間
Thread.sleep(2000);
} catch (InterruptedException e) {
}
//使用真實的資料
System.out.println("資料 = " + data.getResult());
}
}
生產者消費者模式:生產者-消費者模式是一個經典的多執行緒設計模式。它為多執行緒間的協作提供了良好的解決方案。在生產者-消費者模式中,通常由兩類執行緒,即若干個生產者執行緒和若干個消費者執行緒。生產者執行緒負責提交使用者請求,消費者執行緒則負責具體處理生產者提交的任務。生產者和消費者之間則通過共享記憶體緩衝區進行通訊。
12、悄悄插一嘴巴NIO和AIO
NIO是New I/O的簡稱,與舊式的基於流的I/O方法相對,從名字看,它表示新的一套Java I/O標準。它是在Java 1.4中被納入到JDK中的,並具有以下特性:
– NIO是基於塊(Block)的,它以塊為基本單位處理資料
– 為所有的原始型別提供(Buffer)快取支援,ByteBuffer最為常用,Buffer中有3個重要的引數:位置(position)、容量(capactiy)和上限(limit)
– 增加通道(Channel)物件,作為新的原始 I/O 抽象
– 支援鎖和記憶體對映檔案的檔案訪問介面
– 提供了基於Selector的非同步網路I/O
// 使用NIO複製檔案
public static void nioCopyFile(String resource, String destination) throws IOException {
FileInputStream fis = new FileInputStream(resource);
FileOutputStream fos = new FileOutputStream(destination);
FileChannel readChannel = fis.getChannel(); //讀檔案通道
FileChannel writeChannel = fos.getChannel(); //寫檔案通道
ByteBuffer buffer = ByteBuffer.allocate(1024); //讀入資料快取
while (true) {
buffer.clear();
int len = readChannel.read(buffer); //讀入資料
if (len == -1) {
break;
//讀取完畢
}
buffer.flip();// 讀寫過程轉換
writeChannel.write(buffer);
//寫入檔案
}
readChannel.close();
writeChannel.close();
}
// 將檔案對映到記憶體
RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");
FileChannel fc = raf.getChannel();
//將檔案對映到記憶體中
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
while(mbb.hasRemaining()){
System.out.print((char)mbb.get());
}
mbb.put(0,(byte)98); //修改檔案
raf.close();
網路程式設計NIO
// 簡單案例EchoServer
public static void main(String args[]) {
ServerSocket echoServer = null;
Socket clientSocket = null;
try {
echoServer = new ServerSocket(8000);
} catch (IOException e) {
System. out.println(e);
}
while (true) {
try {
clientSocket = echoServer.accept();
System. out.println(clientSocket.getRemoteSocketAddress() + " connect!");
tp.execute(new HandleMsg(clientSocket));
} catch (IOException e) {
System. out.println(e);
}
}
}
static class HandleMsg implements Runnable{
// 省略部分資訊
public void run(){
try {
is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintWriter(clientSocket.getOutputStream(), true);
// 從InputStream當中讀取客戶端所傳送的資料
String inputLine = null;
long b=System. currentTimeMillis();
while ((inputLine = is.readLine()) != null) {
os.println(inputLine);
}
long e=System. currentTimeMillis();
System. out.println("spend:"+(e-b)+"ms");
} catch (IOException e) {
e.printStackTrace();
}finally{
// 關閉資源
}
}
}
// EchoServer客戶端
public static void main(String[] args) throws IOException {
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
writer = new PrintWriter(client.getOutputStream(), true);
writer.println("Hello!");
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println("from server: " + reader.readLine());
} catch {
} finally {
//省略資源關閉
}
}
問題:
– 為每一個客戶端使用一個執行緒,如果客戶端出現延時等異常,執行緒可能會被佔用很長時間。因為資料的
準備和讀取都在這個執行緒中。
– 此時,如果客戶端數量眾多,可能會消耗大量的系統資源
解決:
– 非阻塞的NIO
– 資料準備好了在工作
總結:
– NIO會將資料準備好後,再交由應用進行處理,資料的讀取過程依然在應用執行緒中完成
– 節省資料準備時間(因為Selector可以複用)
網路程式設計AIO
- 讀完了再通知我
- 不會加快IO,只是在讀完後進行通知
- 使用回撥函式,進行業務處理
// AsynchronousSocketChannel使用舉例
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println(Thread.currentThread().getName());
Future<Integer> writeResult=null;
try {
buffer.clear();
result.read(buffer).get(100, TimeUnit.SECONDS);
buffer.flip();
writeResult=result.write(buffer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
server.accept(null, this);
writeResult.get();
result.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("failed: " + exc);
}
});
13、鎖優化
減少鎖持有時間:將無關程式碼塊單獨寫,只將必要的程式碼包含在鎖內
減小鎖粒度:將大物件,拆成小物件,大大增加並行度,降低鎖競爭;使用偏向鎖、輕量級鎖,取得鎖的成功率提高;
HashMap的同步實現:
– Collections.synchronizedMap(Map<K,V> m)
– 返回SynchronizedMap物件
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
ConcurrentHashMap:
– 若干個Segment :Segment<K,V>[] segments
– Segment中維護HashEntry<K,V>
– put操作時先定位到Segment,鎖定一個Segment,執行put
– 在減小鎖粒度後, ConcurrentHashMap允許若干個執行緒同時進入
鎖分離:根據功能進行鎖分離,使用ReadWriteLock,讀多寫的情況可以提高效能。
延伸:操作互不影響,鎖就可以分離,因而提出LinkedBlockingQueue,可在同時在頭尾執行不同的操作
鎖粗化:通常情況下,為了保證多執行緒間的有效併發,會要求每個執行緒持有鎖的時間儘量短,即在使用完公共資源後,應該立即釋放鎖。只有這樣,等待在這個鎖上的其他執行緒才能儘早的獲得資源執行任務。但是,凡事都有一個度,如果對同一個鎖不停的進行請求、同步和釋放,其本身也會消耗系統寶貴的資源,反而不利於效能的優化。
// 修改前
public void demoMethod(){
synchronized(lock){
//do sth.
}
//做其他不需要的同步的工作,但能很快執行完畢
synchronized(lock){
//do sth.
}
}
// 修改後
public void demoMethod(){
//整合成一次鎖請求
synchronized(lock){
//do sth.
//做其他不需要的同步的工作,但能很快執行完畢
}
}
鎖消除:在即時編譯器時,如果發現不可能被共享的物件,則可以消除這些物件的鎖操作
public static void main(String args[]) throws InterruptedException {
long start = System.currentTimeMillis();
for (int i = 0; i < CIRCLE; i++) {
craeteStringBuffer("JVM", "Diagnosis");
}
long bufferCost = System.currentTimeMillis() - start;
System.out.println("craeteStringBuffer: " + bufferCost + " ms");
}
public static String craeteStringBuffer(String s1, String s2) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
return sb.toString();
}
啟動JVM時使用該命令:-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks(啟用鎖消除)
虛擬機器內部的鎖優化機制
偏向鎖:鎖偏向於當前已經佔有鎖的程序,只要沒有競爭,獲得偏向鎖的執行緒,在將來進入同步塊,不需要做同步,當其他執行緒請求相同的鎖時,偏向模式結束。但在競爭激烈的場合,偏向鎖會增加系統負擔。-XX:+UseBiasedLocking 預設啟用偏向鎖
// 本例中啟用偏向鎖能提高5%左右的效能
// -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
// -XX:-UseBiasedLocking
// 使用這兩行啟動JVM檢視效果
public static List<Integer> numberList =new Vector<Integer>();
public static void main(String[] args) throws InterruptedException {
long begin=System.currentTimeMillis();
int count=0;
int startnum=0;
while(count<10000000){
numberList.add(startnum);
startnum+=2;
count++;
}
long end=System.currentTimeMillis();
System.out.println(end-begin);
}
輕量級鎖:普通的鎖處理效能不夠理想,輕量級鎖是一種快速的鎖定方法。如果物件沒有被鎖定,將物件頭的Mark指標儲存到鎖物件中,將物件頭設定為指向鎖的指標(線上程棧空間中)。如果輕量級鎖失敗,表示存在競爭,升級為重量級鎖(常規鎖)。在沒有鎖競爭的前提下,減少傳統鎖使用OS互斥量產生的效能損耗。在競爭激烈時,輕量級鎖會多做很多額外操作,導致效能下降。
自旋鎖:當競爭存在時,如果執行緒可以很快獲得鎖,那麼可以不在OS層掛起執行緒,讓執行緒做幾個空操作(自旋),等待下一個任務的到來。如果同步塊很長,自旋失敗,會降低系統性能;如果同步塊很短,自旋成功,節省執行緒掛起切換時間,提升系統性能。在JDK1.7之後內建實現。
小總結:
1.不是Java語言層面的鎖優化方法
2.內置於JVM中的獲取鎖的優化方法和獲取鎖的步驟
– 偏向鎖可用會先嚐試偏向鎖
– 輕量級鎖可用會先嚐試輕量級鎖
– 以上都失敗,嘗試自旋鎖
– 再失敗,嘗試普通鎖,使用OS互斥量在作業系統層掛起
ThreadLocal及其原始碼分析
// SimpleDateFormat被多執行緒訪問,鎖沒有起到原有效果
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static class ParseDate implements Runnable{
int i=0;
public ParseDate(int i){this.i=i;}
public void run() {
try {
Date t=sdf.parse("2015-03-29 19:29:"+i%60);
System.out.println(i+":"+t);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService es=Executors.newFixedThreadPool(10);
for(int i=0;i<1000;i++){
es.execute(new ParseDate(i));
}
}
// 為每一個執行緒分配一個例項
static ThreadLocal<SimpleDateFormat> tl=new ThreadLocal<SimpleDateFormat>();
public static class ParseDate implements Runnable{
int i=0;
public ParseDate(int i){this.i=i;}
public void run() {
try {
if(tl.get()==null){
tl.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
Date t=tl.get().parse("2015-03-29 19:29:"+i%60);
System.out.println(i+":"+t);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService es=Executors.newFixedThreadPool(10);
for(int i=0;i<1000;i++){
es.execute(new ParseDate(i));
}
}
14、併發除錯
使用Eclipse進行除錯。Eclipse中可以設定斷點,在斷點屬性Breakpoint Properties中可以進行各類斷點設定,包括掛起單個執行緒甚至掛起JVM等操作,方便快捷的對併發應用進行除錯。
使用命令列除錯。使用jsp命令得到JVM埠號,再使用jstack portid得到當前JVM的執行狀態。
15、JDK8對併發的新支援
LongAdder
– 和AtomicInteger類似的使用方式
– 在AtomicInteger上進行了熱點分離
– public void add(long x)
– public void increment()
– public void decrement()
– public long sum()
– public long longValue()
– public int intValue()
CompletableFuture
– 實現CompletionStage介面(40餘個方法)
– Java 8中對Future的增強版
– 支援流式呼叫
– 完成後得到通知
public static class AskThread implements Runnable {
CompletableFuture<Integer> re = null;
public AskThread(CompletableFuture<Integer> re) {
this.re = re;
}
@Override
public void run() {
int myRe = 0;
try {
myRe = re.get() * re.get();
} catch (Exception e) {
}
System.out.println(myRe);
}
}
public static void main(String[] args) throws InterruptedException {
final CompletableFuture<Integer> future = new CompletableFuture<>();
new Thread(new AskThread(future)).start();
// 模擬長時間的計算過程
Thread.sleep(1000);
// 告知完成結果
future.complete(60);
}
– 非同步執行
public static Integer calc(Integer para) {
try {
// 模擬一個長時間的執行
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return para*para;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> calc(50));
System.out.println(future.get());
}
– 工廠方法
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
StampedLock
– 讀寫鎖的改進
– 讀不阻塞寫
public class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
實現思想:
– CLH自旋鎖
– 鎖維護一個等待執行緒佇列,所有申請鎖,但是沒有成功的執行緒都記錄在這個佇列中。每一個節點(一個
節點代表一個執行緒),儲存一個標記位(locked),用於判斷當前執行緒是否已經釋放鎖。
– 當一個執行緒試圖獲得鎖時,取得當前等待佇列的尾部節點作為其前序節點。並使用類似如下程式碼判斷前
序節點是否已經成功釋放鎖。
– 不會進行無休止的自旋,會在在若干次自旋後掛起執行緒。
16、jetty分析(偷個懶)
1.new Server()
public Server(@Name("port")int port)
{
this((ThreadPool)null);
ServerConnector connector=new ServerConnector(this);
connector.setPort(port);
setConnectors(new Connector[]{connector});
}
1.1. 初始化執行緒池
public Server(@Name("threadpool") ThreadPool pool)
{
_threadPool=pool!=null?pool:new QueuedThreadPool();
addBean(_threadPool);
setServer(this);
}
1.1.1.QueuedThreadPool
實現了SizedThreadPool
execute() 方法
@Override
public void execute(Runnable job)
{
if (!isRunning() || !_jobs.offer(job))
{
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
else
{
// Make sure there is at least one thread executing the job.
if (getThreads() == 0)
startThreads(1);
}
}
BlockingQueue
將任務推入
BlockingQueue<Runnable> org.eclipse.jetty.util.thread.QueuedThreadPool._jobs
1.2. 初始化ServerConnector
HTTP connector using NIO ByteChannels and Selectors
繼承自 AbstractConnector
1.2.1. 初始化ScheduledExecutorScheduler
based on JDK's {@link ScheduledThreadPoolExecutor}.
1.2.2. 初始化ByteBufferPool
在資料傳輸過程中,不可避免需要byte陣列
buffer池
預設產生 ArrayByteBufferPool
ByteBufferPool 介面有2個方法:
public ByteBuffer acquire(int size, boolean direct);
public void release(ByteBuffer buffer);
這 是一個很好的 對 象池範本
ArrayByteBufferPool
public ArrayByteBufferPool(int minSize, int increment, int maxSize)
public ArrayByteBufferPool()
{
this(0,1024,64*1024);
}
_direct=new Bucket[maxSize/increment];
_indirect=new Bucket[maxSize/increment];
結構
Bucket
_direct Bucket陣列
_indirect Bucket陣列
為每一個大小,新建一個Bucket
但不初始化ByteBuffer
int size=0;
for (int i=0;i<_direct.length;i++)
{
size+=_inc;
_direct[i]=new Bucket(size);
_indirect[i]=new Bucket(size);
}
一個Bucekt存放 大小相同的所有的ByteBuffer
_size
bytebuffer大小
_queue
public final Queue<ByteBuffer> _queue= new ConcurrentLinkedQueue<>();
acquire
public ByteBuffer acquire(int size, boolean direct)
取得合適的Bucket
每個Bucket的大小不同,這裡找到最合適的
Bucket bucket = bucketFor(size,direct);
從Bucket 中取得ByteBuffer
ByteBuffer buffer = bucket==null?null:bucket._queue.poll();
不存在則新建
if (buffer == null)
{
int capacity = bucket==null?size:bucket._size;
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}
release
public void release(ByteBuffer buffer)
{
if (buffer!=null)
{
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
if (bucket!=null)
{
BufferUtil.clear(buffer);
bucket._queue.offer(buffer);
}
}
}
取得合適的Bucket
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
清空Buffer
BufferUtil.clear(buffer);
歸還Pool
bucket._queue.offer(buffer);
例外處理:
如果申請的ByteBuffer過大或者過小,無法在POOL中滿足,則可以申請成功,但無法歸還給POOL。
1.2.3. 維護ConnectionFactory
HttpConnectionFactory
用於建立連線,
比如Accept後,需要建立一個表示連線的物件
1.2.4. 取得可用CPU 數量
int cores = Runtime.getRuntime().availableProcessors();
1.2.5. 更新acceptor 數量
if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8));
1.2.6. 建立acceptor執行緒組
_acceptors = new Thread[acceptors];
1.2.7. 初始化ServerConnectorManager
繼承自 SelectorManager
_manager = new ServerConnectorManager(getExecutor(), getScheduler(),
selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2)));
儲存selector執行緒數量
Math.min(4,Runtime.getRuntime().availableProcessors()/2))
1.3. 設 置port
connector.setPort(port);
1.4. 關 聯Sever 和Connector
setConnectors(new Connector[]{connector});
2.Server.start()
org.eclipse.jetty.server.Server
啟動web伺服器
WebAppContext context = new WebAppContext();
context.setContextPath("/");
context.setResourceBase("./web/");
context.setClassLoader(Thread.currentThread().getContextClassLoader());
server.setHandler(context);
server.start();
2.1. 設 置啟 動 狀 態
AbstractLifeCycle
private void setStarting()
{
if (LOG.isDebugEnabled())
LOG.debug("starting {}",this);
_state = __STARTING;
for (Listener listener : _listeners)
listener.lifeCycleStarting(this);
}
2.2. 啟 動過 程doStart()
Server
啟動整個server
protected void doStart() throws Exception
{
//If the Server should be stopped when the jvm exits, register
//with the shutdown handler thread.
if (getStopAtShutdown())
ShutdownThread.register(this);
//Register the Server with the handler thread for receiving
//remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands.
ShutdownMonitor.getInstance().start(); // initialize
LOG.info("jetty-" + getVersion());
HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION);
MultiException mex=new MultiException();
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class);
int max=pool==null?-1:pool.getMaxThreads();
int selectors=0;
int acceptors=0;
if (mex.size()==0)
{
for (Connector connector : _connectors)
{
if (connector instanceof AbstractConnector)
acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector)
selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}
int needed=1+selectors+acceptors;
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d +
selectors=%d + request=1)",max,acceptors,selectors));
try
{
super.doStart();
}
catch(Throwable e)
{
mex.add(e);
}
// start connectors last
for (Connector connector : _connectors)
{
try
{
connector.start();
}
catch(Throwable e)
{
mex.add(e);
}
}
if (isDumpAfterStart())
dumpStdErr();
mex.ifExceptionThrow();
LOG.info(String.format("Started @%dms",Uptime.getUptime()));
}
2.2.1. 註冊ShutdownMonitor
遠端控制介面
//Register the Server with the handler thread for receiving
//remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands.
ShutdownMonitor.getInstance().start(); // initialize
2.2.2. 獲取化執行緒池
// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class);
QueuedThreadPool
2.2.3. 設定selector 數量
根據Connector數量進行累計
大部分情況下,只有一個ServerConnector
for (Connector connector : _connectors)
{
if (connector instanceof AbstractConnector)
acceptors+=((AbstractConnector)connector).getAcceptors();
if (connector instanceof ServerConnector)
selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
累 計 所有Connector 的需求
2.2.4. 計算所需的所有 線 程數量
int needed=1+selectors+acceptors;
如果大於默 認 的200 則 中斷程式
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d
+ selectors=%d + request=1)",max,acceptors,selectors));
2.2.5. 維護Bean
啟 動QueuedThreadPool
參 見: QueuedThreadPool
doStart()
startThreads()
建立需要的執行緒
建立執行緒
Thread thread = newThread(_runnable);
_runnable
_jobs中取任務並執行
設定執行緒的屬性
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
_threads.add(thread);
啟動執行緒
thread.start();
啟 動WebAppContext
如果需要使用,在此處啟動
2.2.6. 啟動Connector
取得ConnectionFactory
_defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
創 建selector 線 程並啟 動
for (int i = 0; i < _selectors.length; i++)
{
ManagedSelector selector = newSelector(i);
_selectors[i] = selector;
selector.start();
execute(new NonBlockingThread(selector));
}
newSelector()
protected ManagedSelector newSelector(int id)
{
return new ManagedSelector(id);
}
建立Acceptor 線 程
_stopping=new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
addBean(a);
getExecutor().execute(a);
}
Acceptor
設定執行緒名字
final Thread thread = Thread.currentThread();
String name=thread.getName();
_name=String.format("%s-acceptor-%[email protected]%x-
%s",name,_acceptor,hashCode(),AbstractConnector.this.toString());
thread.setName(_name);
設定優先順序
將自己放入_acceptors陣列
synchronized (AbstractConnector.this)
{
_acceptors[_acceptor] = thread;
}
監聽埠
try
{
while (isAccepting())
{
try
{
accept(_acceptor);
}
catch (Throwable e)
{
if (isAccepting())
LOG.warn(e);
else
LOG.ignore(e);
}
}
}
finally
{
thread.setName(name);
if (_acceptorPriorityDelta!=0)
thread.setPriority(priority);
synchronized (AbstractConnector.this)
{
_acceptors[_acceptor] = null;
}
CountDownLatch stopping=_stopping;
if (stopping!=null)
stopping.countDown();
}
ServerConnector.accept()
public void accept(int acceptorID) throws IOException
{
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
accepted(channel);
}
}
在accept的地方等待
沒有Acceptor 的情況
channle預設是blocking的
如果acceptor數量為0,沒有安排執行緒專門進行accept,則設定為非阻塞模式
若是非0,有專門執行緒進行accept,因此,為阻塞模式
protected void doStart() throws Exception
{
super.doStart();
if (getAcceptors()==0)
{
_acceptChannel.configureBlocking(false);
_manager.acceptor(_acceptChannel);
}
}
2.3. 啟動完畢
AbstractLifeCycle
private void setStarted()
{
_state = __STARTED;
if (LOG.isDebugEnabled())
LOG.debug(STARTED+" @{}ms {}",Uptime.getUptime(),this);
for (Listener listener : _listeners)
listener.lifeCycleStarted(this);
}
3.Http請求
3.1.Accept成功
private void accepted(SocketChannel channel) throws IOException
{
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
_manager.accept(channel);
}
17
3.1.1. 設 置 為 非阻塞模式
channel.configureBlocking(false);
3.1.2. 配置Socket
Socket socket = channel.socket();
configure(socket);
3.1.3. 正式處理
SelectorManager _manager;
_manager.accept(channel);
選擇可用的ManagedSelector執行緒
private ManagedSelector chooseSelector()
{
// The ++ increment here is not atomic, but it does not matter,
// so long as the value changes sometimes, then connections will
// be distributed over the available selectors.
long s = _selectorIndex++;
int index = (int)(s % getSelectorCount());
return _selectors[index];
}
ManagedSelector 處 理
ManagedSelector 是一個執行緒
封裝了Selector 的使用
提交任務
selector.submit(selector.new Accept(channel, attachment));
提交這個處理任務到ManagedSelector:
private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
_changes.offer(change);
ConcurrentArrayQueue
與ConcurrentLinkedQueue相似的效能,但直接儲存元素
而不是node,因此需要更少的物件,更少的GC
3.2. 請 求 處 理
3.2.1.ManagedSelector.run()
while (isRunning())
select();
select()
發現有任務就執行
runChanges();
runChanges()
參見: 提交任務
private void runChanges()
{
Runnable change;
while ((change = _changes.poll()) != null)
runChange(change);
}
runChange()
change.run();
Accept.run
SelectionKey key = channel.register(_selector, 0, attachment);
EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
select()
int selected = _selector.select();
處理SelectionKey
Set<SelectionKey> selectedKeys = _selector.selectedKeys();
for (SelectionKey key : selectedKeys)
{
if (key.isValid())
{
processKey(key);
}
else
{
if (debug)
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
((EndPoint)attachment).close();
}
}
selectedKeys.clear();
processKey()
private void processKey(SelectionKey key)
{
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableEndPoint)
{
((SelectableEndPoint)attachment).onSelected();
}
else if (key.isConnectable())
{
processConnect(key, (Connect)attachment);
}
else if (key.isAcceptable())
{
processAccept(key);
}
else
{
throw new IllegalStateException();
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
onSelected()
@Override
public void onSelected()
{
assert _selector.isSelectorThread();
int oldInterestOps = _key.interestOps();
int readyOps = _key.readyOps();
int newInterestOps = oldInterestOps & ~readyOps;
setKeyInterests(oldInterestOps, newInterestOps);
updateLocalInterests(readyOps, false);
if (_key.isReadable())
getFillInterest().fillable();
if (_key.isWritable())
getWriteFlusher().completeWrite();
}
會使用新的執行緒進行HTTP業務處理 (提交到執行緒池)
如果看起來不方便的話最後一節可以關注我們的公眾號:落餅楓林,傳送 jetty分析 獲取源pdf檔案。
歡迎加入我們一起討論,後期會有更多的資料發上來。
最後,感謝 煉數成金 的學習資料,讓我整理這份部落格。