react-diagram 序列化Json解讀案例分析
一、併發程式設計知識準備
(1)併發:多種執行緒操作相同的資源,保證執行緒安全,合理使用資源
(2)高併發:服務能同時處理很多請求,提高程式效能
(3)知識技能
- 總體架構:Spring Boot、Maven、JDK8、MySql
- 基礎元件:Mybatis、Guava、Lombok、Redis、Kafka
- 高階組建:Joda-Time、Atomic、JUC、AQS、ThreadLocal、RateLimiter、Hystrix、ThreadPool、ShardBatis、curator、elastic-job...
二、併發基礎
1、CPU多級快取——快取一致性
2、CPU多級快取——重排序
3、Java記憶體模型
4、併發的優勢與風險
三、併發模擬
1、Postman:HTTP請求工具
2、AB(Apache Bench):測試網站效能
Concurrency Level: 50 Time taken for tests: 0.173 seconds Complete requests: 1000 Failed requests: 0 Total transferred: 136000 bytes HTML transferred: 4000 bytes Requests per second: 5764.98 [#/sec] (mean) Time per request: 8.673 [ms] (mean) Time per request: 0.173 [ms] (mean, across all concurrent requests) Transfer rate: 765.66 [Kbytes/sec] received
3、Jmeter:壓測工具
4、程式碼:Semaphore、CountDownLatch等
四、執行緒安全性
1、執行緒安全性
定義:當多個執行緒訪問某個類時,不管執行時環境採用**何種排程方式**或者程序如何交替執行,並且在 **主調程式碼中不需要任何額外的同步或協同**,這個類都能表現出**正確的行為**,那麼稱這個類是執行緒安全的。
2、執行緒安全性的表現方式
(1)原子性:提供了互斥訪問,同一時刻只能有一個執行緒來對它進行操作
(2)可見性:一個執行緒對主記憶體的修改可以即使被其他執行緒觀察到
(3)有序性:一個執行緒觀察其他執行緒中的指令執行順序,由於指令重排序的存在,該觀察結果一般雜亂無序
3、原子性:Atomic包
(1)AtomicXXX:CAS、Unsafe.compareAndSwapInt
// UnSafe類中的方法
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset); // 獲取物件o中的底層值
// offset == v 修改成功,反之迴圈繼續判斷
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetIntRelease(Object o, long offset,int expected,int x) {
return compareAndSetInt(o, offset, expected, x); //CAS
}
(2)AtomicLong、LongAdder
- Long不是原子的,會分成兩部分更新。
- LongAdder:熱點資料分離,即AtmoicLong中的資料分離為陣列,執行緒訪問使用hash演算法命中某個數字進行計數,最終結果為陣列求和。
- 應用:
- 競爭壓力低、序列號:AtmoicLong
- 競爭壓力大:LongAdder
(3)AtomicBoolean:VarHandle實現
- 原子操作型別:
- Unsafe:JVM內建函式API,損害安全性和可移植性
- 原子性的FieldUpdaters,運用了反射
- 使用原有原子類AtomicInteger,記憶體開銷大
- VarHandle:
- 安全、高可用、高效能
- 更細粒度的控制記憶體排序
- 可與任何欄位、陣列元素或靜態變數關聯
- 建立VarHandle
public class VarhandleFoo {
volatile int x;
private Point[] points;
private static final VarHandle QA;//for arrays
private static final VarHandle X;//for Variables
static {
try {
QA = MethodHandles.arrayElementVarHandle(Point[].class);
X = MethodHandles.lookup(). // Lockup類
findVarHandle(Point.class, "x", int.class);
//X = MethodHandles.lookup().in(Point.class).findVarHandle(Point.class, "x", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
class Point {
// ...
}
}
- 使用VarHandle
//plain read
int x = (int) X.get(this);
Point p = (Point) QA.get(points,10);
//plain write
X.set(this,1);
QA.set(points,10,new Point());
//CAS
X.compareAndSet(this,0,1);
QA.compareAndSet(points,10,p,new Point());
//Numeric Atomic Update
X.getAndAdd(this,10);
- 記憶體排序影響
- 對於引用型別和32位以內的原始型別,read和write(get、set)都可以保證原子性,並且對於執行執行緒以外的執行緒不施加可觀察的排序約束。
- 不透明屬性:訪問相同變數時,不透明操作按原子順序排列。
- Acquire模式的讀取總是在與它對應的Release模式的寫入之後。
- 所有Volatile變數的操作相互之間都是有序的。
- 應用:控制只有一個執行緒執行
(4)AtomicReference、AtomicReferenceFieldUpdater
(5)AtomicStampReference:CAS的ABA問題
(6)AtomicLongArray:原子索引中的值
(7)AtomicIntegerFieldUpdater:讓普通變數(int)支援原子操作
- 只支援可見的變數
- 變數需要宣告為volatile
- 不支援靜態變數
(8)StampLock:讀寫鎖改進,樂觀讀
- 適用於讀操作很多,寫操作很少的場景(優先滿足寫操作)
- 用於防止寫操作飢餓
- 也可退化為讀寫鎖
4、UnSafe類
(1)概述
- 根據偏移量設定值
- park()
- 底層的CAS操作
- 記憶體屏障
(2)主要介面
//獲得給定物件偏移量上的int值
public native int getInt(Object o,long offset);
//設定給定物件偏移量上的int值
public native void putInt(Object o,long offset, int x);
//獲得欄位在物件中的偏移量
public native long objectFieldOffset(Field f);
public native long staticFieldOffset(Field f);
//設定給定物件的int值,使用volatile語義
public native void putIntVolatile(Object o,long offset,int x);
//獲得給定物件物件的int值,使用volatile語義
public native int getIntVolatile(Object o,long offset);
//和putIntVolatile()一樣,但是它要求被操作欄位就是volatile型別的
public native void putOrderedInt(Object o,long offset,int x);
// cas設值
public final boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
// 記憶體屏障
public void fullFence();
public void loadFence();
public void storeFence();
public void loadLoadFence();
public void storeStoreFence();
//------------------陣列操作---------------------------------
//獲取給定陣列的第一個元素的偏移地址
public native int arrayBaseOffset(Class<?> arrayClass);
//獲取給定陣列的元素增量地址,也就是說每個元素的佔位數
public native int arrayIndexScale(Class<?> arrayClass);
//--------------------鎖指令(synchronized)-------------------------------
//物件加鎖
public native void monitorEnter(Object o);
//物件解鎖
public native void monitorExit(Object o);
public native boolean tryMonitorEnter(Object o);
//解除給定執行緒的阻塞
public native void unpark(Object thread);
//阻塞當前執行緒
public native void park(boolean isAbsolute, long time);
//------------------記憶體操作----------------------
// 在本地記憶體分配一塊指定大小的新記憶體,記憶體的內容未初始化;它們通常被當做垃圾回收。
public native long allocateMemory(long bytes);
//重新分配給定記憶體地址的本地記憶體
public native long reallocateMemory(long address, long bytes);
//將給定記憶體塊中的所有位元組設定為固定值(通常是0)
public native void setMemory(Object o, long offset, long bytes, byte value);
//複製一塊記憶體
public native void copyMemory(Object srcBase, long srcOffset,
Object destBase, long destOffset,
long bytes);
//釋放給定地址的記憶體
public native void freeMemory(long address);
5、原子性:鎖
(1)概述
- synchronized:依賴JVM
- Lock:依賴特殊的CPU指令,程式碼實現,ReentrantLock
(2)synchronized
- 修飾程式碼塊:作用於呼叫的物件
- 修飾方法:作用於呼叫的物件
- 修飾靜態變數:作用於所有呼叫物件
- 修飾類:作用於所有呼叫物件
(3)原子性對比
- synchronized:不可中斷,適合競爭不激烈,可讀性好
- Lock:可中斷鎖,多樣化同步,競爭激烈時能維持常態
- Atomic:競爭激烈時維持常態,效能比Lock好;但只能同步一個值
6、可見性
(1)共享變數不可見線上程間不可見原因
- 執行緒交叉執行
- 重排序結合線程交叉執行
- 共享變數更新後的值沒有從工作記憶體重新整理到主記憶體中(CPU快取和記憶體)
(2)synchronized的可見性
- 執行緒解鎖時,把共享變數的最新值重新整理到主記憶體
- 執行緒解鎖時,清空工作記憶體中共享變數的值,即共享變數需要從主記憶體中重新讀取最新值
(3)volatile的可見性
- 通過加入記憶體屏障和禁止重排序來實現
- volatile變數的寫後面加入store屏障,將本地記憶體的共享變數值重新整理到主存
- volatile變數的讀前面加入load屏障,從主存中讀取共享變數
(4)volatile應用
- 作為狀態標識量
volatile boolean inited = false;
// 執行緒1
context = loadContext();
inited = true;
// 執行緒2
while(!inited)
sleep();
doSomethingWithConfig(context);
- double checked:保證單例等
(5)可見性的原因
- CPU快取的存在
- 編譯器的優化
- 重排序導致看到的值不一致
6、有序性
(1)java記憶體模型中,允許編譯器和處理器對指令進行重排序,但重排序不影響單執行緒程式的執行,卻可能影響到多執行緒的正確性
(2)volatile、synchronized、Lock
(3)happens-before原則
- 程式次序原則:程式次序前先於後
- 鎖定規則:unlock先於lock操作
- volatile規則:寫先於讀操作
- 傳遞規則:A先於B,B先於C,則A先於C
- 執行緒啟動規則:start 先於 run
- 執行緒中斷規則:interrupt先於執行緒中斷髮生
- 執行緒終結規則:執行緒中所有操作先於執行緒終止,Thread.join方法結束、Thread.isAlive返回值檢測執行緒終止與否
- 物件終結規則:物件的初始化完成先於完成它的finalize方法的開始
(4)一條指令的執行可以分為多個步驟:
- 取址 IF
- 譯碼和取暫存器運算元 ID
- 執行或者有效地址計算 EX
- 儲存器訪問 MEM
- 寫回暫存器 WB
(5)重排序的原因
- 重排序不影響單執行緒的執行結果,消除“氣泡(停頓時間)”,使得流水線更加順暢
五、安全釋出物件
1、釋出物件
- 釋出物件:使一個物件能被當前範圍之外的程式碼所使用
- 物件逸出:錯誤的釋出,物件未構建完成時,被其他執行緒所見
2、安全釋出物件
- 在static函式中初始化一個物件引用
- 將物件的引用儲存到volatile型別域或者AtomicReference物件中
- 將物件的引用儲存到某個正確建構函式的final型別域中
- 將物件的引用儲存到由鎖保護的域中
六、不可變物件
1、不可變物件的條件:參考String類
- 物件建立後狀態不能修改
- 物件所有域都是final型別
- 物件是正確建立的(沒有發生逸出)
2、 final關鍵字:類、方法、變數
- 類:不能被繼承,方法隱式變為final
- 方法:private方法隱式變為final方法
- 鎖定方法不能被繼承類修改
- 效率
- 變數:基本資料型別變數(無法修改值)、引用型別變數(無法指向另外一個物件)
3、不可變方式
- final
- Collections.unmodifiableXXX:Collection List Set Map
- Guava:ImutableXXX Collection List Set Map
4、執行緒封閉
- Ad-hoc執行緒封閉:程式控制實現,不推薦
- ThreadLocal:推薦
- 堆疊封閉:區域性變數,無併發問題
6、常見執行緒不安全
- StringBuilder -> StringBuffer
- DateFormat -> joda-time包中的DateTimeFormatter
- ArrayList HashSet HashMap等Collections
- 先檢查在執行:if(condition(a)){ handler(a); } -> CAS方法或者加鎖
7、同步容器
- ArrayList -> Vector,Stack
- HashMap -> HashTable
- Collections.synchronizedXXX(List Set Map)
注:集合遍歷(foreach、iterator)的時候有對集合進行增刪操作將導致ConcurrentModificationException.需要進行更新,可以先打標記,遍歷完再進行操作。
8、併發容器——JUC
- ArrayList -> CopyOnWriteArrayList
- 拷貝需要消耗記憶體,可能發生GC
- 不能用於實時性的場景,只滿足最終一致性
- 只適合讀多寫少的場景
- 讀寫分離,使用時另外開闢空間,進行併發保護
- HashSet、TreeSet -> CopyOnWriteArraySet、ConcurrentSkipListSet
- CopyOnWriteArraySet底層使用CopyOnWriteArrayList
- ConcurrentSkipListSet基於ConcurrentSkipListMap,只有單次操作時原子的,但批量操作(containAll)不是原子的。add呼叫了putIfAbsent方法
- HashMap、TreeMap -> ConcurrentHashMap、ConcurrentSkipListMap
- ConcurrentSkipListMap的Key值有序,存取時間與併發執行緒數無關
- 併發低時ConcurrentHashMap效率更高,併發高時ConcurrentSkipListMap效率更高。
9、安全共享物件策略——總結
- 執行緒限制物件:由執行緒獨佔,並且只能被佔用它的執行緒修改
- 共享只讀物件:再沒有額外同步的情況下,可被多個執行緒併發訪問,但任何執行緒都不能修改它
- 執行緒安全物件:在內部通過同步機制來保證執行緒安全,所以其他執行緒無需額外的同步就可以通過公共介面任意訪問它
- 被守護物件:只能通過獲取特定的鎖來訪問
七、JUC之AQS
1、AbstractQueuedSynchronizer——AQS
- Sync Queue,底層為雙向佇列;Condition Queue,單項鍊表,等待某個條件的佇列
- 使用Node實現FIFO佇列,用於構建鎖或者其他同步裝置的基礎框架
- 利用int型別表示狀態
- 使用方法是繼承,模板方法模式
- 子類通過繼承並通過實現它的方法管理其狀態{acquire和release}的方法操縱狀態
- 可以同時實現排他鎖和共享鎖模式(獨佔和共享)
2、AQS同步元件
- CountDownLatch
- Semaphore
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
3、CountDownLatch
- 計數器不能被重置
- 一個執行緒等待計數器變為0(某個條件)
- 父任務等待子任務執行完成才去做彙總操作
4、Semaphore
- 有限訪問的資源,限制併發訪問的數目
- 共享鎖
- 一次可以獲取多個許可(acquire(3)),一次也可以釋放多個許可(realease(3))
- 嘗試獲取許可(可加等待時間),獲取不到可以丟棄掉
5、CyclicBarrier
- 多個執行緒等待,等待某個條件達成後,執行緒才繼續執行
- 多執行緒計算,最後計算結果執行彙總(fork -> join)
- 可重置,各個執行緒相互等待
- 可用於更復雜的場景
6、ReentrantLock與鎖
-
ReetrantLock和Synchronized區別
- 可重入性
- 鎖的實現
- jvm實現
- jdk實現
- 效能區別
- 偏向鎖、輕量級鎖、自旋鎖、重量級鎖
- 使用者態進行解決,避免進入核心態,採用cas技術
- 功能
- 便利性:自動加鎖和釋放
- 細粒度:ReetrantLock優於synchronized
-
ReetrantLock特有功能
- 可指定公平鎖和非公平鎖,公平為先等待先獲取鎖
- Condition類,分組喚醒需要喚醒得執行緒
- 中斷等待鎖得機制,lock.lockInterruptibly()
- 自旋cas,避免進入核心態
-
synchronized能夠在物件頭標識物件所處狀態,便於除錯
-
ReentrantReadWriteLock
- 悲觀讀取,只有沒有讀操作時,才可以寫
- 適用於寫多讀少的情況
-
StampedLock
- 樂觀讀,樂觀認為讀操作時有寫操作存在,如果讀完後發現有寫操作,再進行處理。
- 適用於讀多寫少的情況
-
Condition
- condition.await():釋放鎖並等待訊號,進入condition佇列;當獲取到訊號並被喚醒後將重新獲取到鎖
- condition.signal():傳送訊號,獲取condition佇列的值放入sync佇列中,此時並未釋放鎖
7、併發相關概念
- PV(page view):網站的總訪問量,頁面瀏覽量或點選量,使用者沒重新整理一次就會被記錄一次
- UV(unique visitor):網站的訪客量,一般時0~24的相同的IP地址只記錄一次
- QPS(query per second):每秒伺服器支援的查詢量
- RT(response time):請求的響應時間
- 峰值QPS = (總PV * 80%)/ (606024*20%)
- 機器數 = 總的峰值QPS / 壓測得出的單機極限qps
八、JUC元件拓展
1、FutureTask
- Callable與Runnable介面比較
- Callable有返回值且能丟擲異常(jdk1.5)
- Runnable沒有返回值
- Future介面
- 非同步程式設計
- 獲取其他執行緒的返回值
- Future.get():當其他執行緒未執行完將阻塞直到執行緒執行完,並獲取返回值
- 可以取消任務
- FutureTask類:繼承Runnable和Future介面
- 可以作為Runnable傳入執行緒類執行
- 也可以作為Future傳入執行緒類並獲取返回值
- 用於某個執行緒執行需要時間且有返回值,但當前執行緒不需要等待,可以先執行其他業務,直到需要該返回值才阻塞去獲取返回值。
2、Fork/Join框架(Map-Reduce思想)
-
任務竊取:執行緒1執行完自己的任務,則去竊取執行緒2的任務,為了防止重複執行任務,則從其他執行緒的尾部進行竊取任務
-
缺點:
- 任務少時,消耗等待時間
- 建立過多的執行緒和雙端任務佇列,浪費空間資源
-
特點:
- 任務只能使用fork和join進行同步
- 任務不能執行IO操作
- 任務不能丟擲異常
-
ForkJoinPool:執行類,執行ForkJoinTask
-
ForkJoinTask:任務類,需要實現compute方法
3、BlockingQueue
- 阻塞情況:
- 入隊時,發現佇列已滿時
- 出隊時,發現佇列已空時
- 消費者生產者模型
- 多種場景:
- | Throw Exception | Special Value | Blocks | Time Out |
---|---|---|---|---|
Insert | add(o) | offer(o) | put(o) | offer(o,timeout,timeunit) |
Remove | remove(o) | poll() | take() | poll(timeout,timeunit) |
Examine | element() | peek() |
- 實現類
- ArrayBlockingQueue:有界,FIFO
- DelayQueue:元素需要實現Delayed介面,繼承了Comparable介面,即元素需要排序獲取過期時間(內部實現:PriorityQueue和ReentrantLock)
- LinkedBlockingQueue:可有界可無界(最大為Integer.MAX_VALUE),FIFO
- PriorityBlockingQueue:無邊界佇列,元素需要實現Comparable介面
- SynchronousQueue:同步佇列,只能存放一個元素,把併發執行變為同步執行
- ArrayDeque(陣列雙端佇列)
- LinkedBlockingDeque(基於連結串列的FIFO雙端阻塞佇列)
4 併發設計模式
(1)Future模式
-
概述
- 非同步
- 類似商品訂單
- 使用者無需一直等待請求的結果,使用者可以繼續瀏覽或者操作其他內容
-
實現圖
- 程式碼
public interface Data {
String getRequest();
}
public class FutureClient {
public Data request(String queryStr) {
FutureData data = new FutureData();
new Thread(()->{
RealData realData = new RealData(queryStr);
data.setRealData(realData);
}).start();
return data;
}
}
public class FutureData implements Data {
private RealData realData;
private volatile boolean isComplete = false;
@Override
public synchronized String getRequest() {
while (!isComplete) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData.getRequest();
}
public synchronized void setRealData(RealData realData) {
if (isComplete) return;
this.realData = realData;
isComplete = true;
notify();
}
}
public class RealData implements Data {
private String result;
public RealData(String queryStr) {
System.out.println("根據" + queryStr + "進行查詢,這是一個耗時間5s的操作");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
result = "查詢結果";
}
@Override
public String getRequest() {
return result;
}
}
public class Main {
public static void main(String[] args) {
FutureClient fc = new FutureClient();
Data data = fc.request("請求引數");
System.out.println("請求傳送成功");
System.out.println("繼續執行其他事情");
// 這個方法會阻塞等待結果執行完成
String result = data.getRequest();
System.out.println(result);
}
}
(2)Master-Slave模式
-
概述
- 常用的平行計算模式
- Master:負責接受和分配任務
- Worker:負責處理子任務
- Worker子程序處理完成後,將結果返回給Master,Master進行歸納和總結。
-
實現圖
- 程式碼
public class Master {
// 1.承載任務的集合
private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();
// 2.承載所有Worker物件
private Map<String, Thread> workers = new HashMap<>();
// 3. 使用容器承載所有Worker執行任務的結果結合
private Map<String, Object> resultMap = new ConcurrentHashMap<>();
public Master(Worker worker, int workerCount) {
worker.setResultMap(resultMap);
worker.setTaskQueue(taskQueue);
for (int i = 0; i < workerCount; i++) {
// key=Worker名字 value=執行緒執行物件
workers.put("worker" + i, new Thread(worker));
}
}
public void submit(Task task) {
this.taskQueue.add(task);
}
public void execute() {
workers.values().forEach(Thread::start);
}
public boolean isComplete() {
Collection<Thread> threads = workers.values();
for (Thread thread : threads) {
if (thread.getState() != Thread.State.TERMINATED) return false;
}
return true;
}
public long getResult() {
return resultMap.values().stream().mapToInt(obj->(Integer)obj).reduce((sum, val) -> sum+val).getAsInt();
}
}
@Data
public class Task {
private int id;
private String name;
private int price;
public Task(int id, String name, int price) {
this.id = id;
this.name = name;
this.price = price;
}
}
@Data
public class Worker implements Runnable {
private ConcurrentLinkedQueue<Task> taskQueue;
private Map<String, Object> resultMap;
@Override
public void run() {
while (true) {
Task task = this.taskQueue.poll();
if (task == null) break;
Object obj = handle(task);
// key=id value=結果
resultMap.put(String.valueOf(task.getId()), obj);
}
}
// 可以作為抽象方法提取出去
private Object handle(Task task) {
Object object = null;
// 業務耗時
try {
Thread.sleep(500);
object = task.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return object;
}
}
public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(), 50);
Random r = new Random();
for (int i = 1; i <= 100; i++) {
master.submit(new Task(i, "任務" + i, r.nextInt(1000)));
}
master.execute();
long start = System.currentTimeMillis();
while (true) {
if (master.isComplete()) {
long result = master.getResult();
System.out.println(result);
System.out.println("執行時間:" + (System.currentTimeMillis() - start));
break;
}
}
}
}
(3)生產者與消費者實現
-
概述
- 經典的多執行緒模式
- 生產者執行緒:負責提交使用者處理
- 消費者執行緒:負責具體處理生產者提交的任務
- 生產者和消費者通過共享緩衝區進行通訊
-
實現圖
- 程式碼
class ProducerThread implements Runnable {
private BlockingQueue<String> blockingQueue;
private AtomicInteger count = new AtomicInteger();
private volatile boolean FLAG = true;
public ProducerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "生產者開始啟動....");
while (FLAG) {
String data = count.incrementAndGet() + "";
try {
boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (offer) {
System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "成功..");
} else {
System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "失敗..");
}
Thread.sleep(1000);
} catch (Exception e) {
}
}
System.out.println(Thread.currentThread().getName() + ",生產者執行緒停止...");
}
public void stop() {
this.FLAG = false;
}
}
class ConsumerThread implements Runnable {
private volatile boolean FLAG = true;
private BlockingQueue<String> blockingQueue;
public ConsumerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "消費者開始啟動....");
while (FLAG) {
try {
String data = blockingQueue.poll(2, TimeUnit.SECONDS);
if (data == null || data == "") {
FLAG = false;
System.out.println("消費者超過2秒時間未獲取到訊息.");
return;
}
System.out.println("消費者獲取到佇列資訊成功,data:" + data);
} catch (Exception e) {
// TODO: handle exception
}
}
}
}
public class Main {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
ProducerThread producerThread = new ProducerThread(blockingQueue);
ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
Thread t1 = new Thread(producerThread);
Thread t2 = new Thread(consumerThread);
t1.start();
t2.start();
// 10秒後 停止執行緒..
// 可以使用CountDownLatch來等待子執行緒結束
try {
Thread.sleep(10*1000);
producerThread.stop();
} catch (Exception e) {
// TODO: handle exception
}
}
}
九、執行緒池
1、new Thread弊端
- 每次new Thread新建物件,效能查
- 執行緒缺乏統一管理,可能無限制新建執行緒,相互競爭,可能導致宕機或者OOM
- 功能少,沒有定期執行、執行緒中斷
2、執行緒池好處
- 降低資源消耗:重用執行緒,減少物件建立和消亡的開銷
- 提高執行緒的可管理性:能有效控制最大併發執行緒數,提高系統資源利用率,
- 提高響應速度:避免了過多資源競爭,避免阻塞
- 提供強大的功能:提供定時執行、定期執行、單執行緒、併發控制等功能
3、ThreadPoolExecutor
-
引數:
- corePoolSize:核心執行緒數量;
- maximumPoolSize:最大執行緒數;當workQueue有界才有效。
- workQueue:阻塞佇列,儲存等待執行的任務
- keepAliveTime:當執行緒數大於corePoolSize小於maximumPoolSize的執行緒,不會直接從執行緒池移除,而是等keepAliveTime還沒被使用才移除。
- unit:keepAliveTime的時間單位
- threadFactory:執行緒工廠,建立執行緒
- rejectHandler:拒絕處理的策略
- 丟擲異常:AbortPolicy
- 用呼叫者所線上程執行任務:CallRunsPolicy
- 丟棄佇列中最靠前的任務,並執行當前任務:DiscardOldestPolicy
- 直接丟棄:DiscardPolicy
- 自定義拒絕策略
-
執行過程:
- 當一個任務到來,發現當前執行緒池的數量小於corePoolSize,則直接建立執行緒執行
- 當一個任務到來,沒有多餘執行緒執行,放入workQueue,發現workQueue滿了,但是當前執行緒池的執行緒數量未達到maximumPoolSize,則建立執行緒執行任務。
-
狀態:
- RUNNING:workQueue能放入任務
- SHUTDOWN:不能放入任務,但可以執行workQueue中的任務
- STOP:不能放入任務,也不執行workQueue任務,且會終止正在執行的任務
- TIDYING:呼叫終結方法
- TERMINATED:已終結
-
方法:
- execute:提交任務,交給執行緒池執行
- submit:提交任務,能夠獲取執行結果,execute+Future
- shutdown:關閉執行緒池,等待任務都執行完成
- shutdownNow:關閉執行緒池,不等待執行緒執行完,強行暫停正在執行的任務
-
監控方法:
- getTaskCount:執行緒池已經執行和未執行的任務總數
- getCompletedTaskCount:已完成的任務數量
- getPoolSize:執行緒池當前執行緒數量
- getActiveCount:當前執行緒池中正在執行任務的執行緒數量
-
執行緒池建立:
- Executors.newCachedThreadPool:可快取執行緒池,可 回收執行緒
- Executors.newFixedThreadPool:執行緒數固定
- Executors.newScheduledThreadPool:定長,定時和週期執行任務
- Executors.newSingleThreadExecutor:單執行緒執行緒池,保證任務順序執行
-
合理配置:
- CPU密集型任務,需要儘量壓榨CPU,參考值設定未N*CPU+1
- IO密集型任務,參考值為2N*CPU
十、多執行緒併發拓展
1、死鎖與活鎖
- 死鎖必要條件
- 互斥條件
- 請求和保持條件
- 不剝奪條件
- 環路等待條件
- 活鎖:當執行緒間互相謙讓資源,導致所有執行緒都無法獲取到足夠資源進行業務處理。
2、併發相關概念
- 阻塞:當一個執行緒進入臨界區後,其他執行緒必須等待
- 無障礙:
- 寬進嚴出,可能導致死迴圈
- 無競爭時,有限步內完成操作
- 有競爭時,回滾資料
- 無鎖:
- 無障礙
- 保證有一個執行緒能夠勝出
- cas就是無鎖的
- 無等待:
- 無鎖
- 要求所有執行緒都必須在有限步數內完成
- 無飢餓:相當於沒有執行緒優先順序
3、併發最佳實踐
-
使用本地變數
-
使用不可變類
-
最小化鎖的作用域範圍:S=1/(1-a+a/n) 阿姆達爾定律
- a平行計算部分所佔的比例
- n平行計算處理的節點個數
- S為加速比
-
使用執行緒池,不直接使用new Thread
-
使用同步也不要使用執行緒的wait和notify
-
使用BlockingQueue實現生產消費模式
-
使用併發集合而不是加鎖的同步集合
-
使用Semaphore建立有界的訪問
-
寧可使用同步程式碼塊也不適用同步方法
-
避免使用靜態變數(除非final只讀)
4、Spring與執行緒安全
- Spring bean(scope):singleton、prototype
- 無狀態物件:如DTO、DAO
- 有狀態需要加鎖,或者ThradLocal
5、HashMap與ConcurrentHashMap解析
(1)HashMap
- 初始容量:16
- 載入因子:0.75
- hash值取mod,陣列長度必須為2的n次方
- HashMap的resize可能出現死迴圈,迭代器fail-fast
(2)ConcurrentHashMap
-
java7:分段鎖
-
java8:紅黑樹
6、併發的兩個重要定律
(1)Amdahl定律(阿姆達爾定律)
-
定義了序列系統並行化後的加速比的計算公式和理論上限
-
加速比定義:加速比=優化前系統耗時/優化後系統耗時
$$
加速比公司:S=1/(1-a+a/n)
$$
- a代表平行計算部分所佔的比例,n為並行處理節點個數。
- 例如,若序列程式碼佔整個程式碼的25%,則並行處理的總體效能不可能超過4。
- 增加CPU數量並不一定能起到有效的作用,提高系統內可並行的模組比重,合理增加並行處理器數量,才能以最小的投入,得到較大的加速比
(2)Gustafson定律(古斯塔夫森定律)
- 說明處理器個數,序列比例和加速比之間的關係
- 執行時間 :a + b a 序列時間 b並行時間
- 總執行時間 a+nb n處理器個數
- 序列比例:F = a/(a+b)
$$
加速比公式:S = n-F(n-1)
$$
- 只要有足夠的並行化,那麼加速比和CPU個數成正比
7、Amino無鎖類
public class LockFreeList<E> implements List<E> {
protected static class Entry<E> {
E element;
AtomicMarkableReference<Entry<E>> next;
}
protected AtomicMarkableReference<Entry<E>> head;
public LockFreeList() {
head = new AtomicMarkableReference<Entry<E>>(null, false);
}
public boolean add(E e) {
if (null == e) throw new NullPointerException();
final Entry<E> newNode = new Entry<E>(e,
new AtomicMarkableReference<Entry<E>>(null, false));
while (true) {
Entry<E> cur = head.getReference();
newNode.next.set(cur, false);
if (head.compareAndSet(cur, newNode, false, false)) {
return true;
}
}
}
// http://amino-cbbs.sourceforge.net/qs_java.html
}
十一、高併發處理思路與手段
1、擴容
- 方式:
- 垂直擴容:提高系統部件能力
- 水平擴容:增加更多系統成員來實現
- 資料庫擴容:
- 讀操作:memcache、redis、CDN等快取
- 寫操作:Cassandra、Hbase等
2、快取
-
快取特徵:
- 命中率:命中數/(命中數+沒有命中數)
- 最大元素(空間)
- 清空策略:FIFO、LFU(最少使用策略)、LRU(最近最少使用策略)、過期時間、隨機等
-
快取命中率影響因素:
- 業務場景和業務需求
- 快取的涉及(粒度和策略)
- 快取容量和基礎設施
-
快取分類和應用場景:
- 本地快取:程式設計實現、Guava Cache
- 分散式快取:Memcache、Redis
3、Guava Cache
- 多個segments的細粒度鎖,類似java7HashMap分段鎖
- LRU演算法移除
- key -> WeakReference value -> Weak/SoftReference
- 統計快取命中率 未命中率 異常率
4、Memcache
- 一致性hash演算法
- slab_class:
- slab:數量有限,1.25倍
- page:1M記憶體,申請記憶體
- chunk:存放資料,page通過chunk進行切分
- chunk總有記憶體浪費
- LRU不是針對全域性的,而是針對slab的
- key值最大為250位元組
- 單個item最大為1M
- 不能遍歷items
- 非阻塞基於事件
5、Redis
(1)type
- string
- hash
- list
- set
- sorted set
(2)編碼方式
- raw
- int
- ht
- zipmap
- linkedlist
- ziplist
- intset
(3)特徵
-
支援持久化
-
資料備份,主從模式
-
讀效能11w/s,寫效能8w/s
-
單操作原子性,支援多操作的原子性
-
subsribe/pushlish通知key過期
(4)使用場景:
- 排行榜,取top n的操作
- 取最新n個數
- 計數器
- 唯一性檢查
- 實時系統
- 訊息佇列系統
6、快取問題
-
快取一致性問題:
- 更新db成功 -> 更新快取失敗 -> 資料不一致
- 更新快取成功 -> 更新db失敗 -> 資料不一致
- 更新db成功 -> 淘汰快取失敗 -> 資料不一致
- 淘汰快取成功 -> 更新db失敗 ->查詢快取miss
-
快取穿透問題
- 快取空物件,空集合,命中不高但頻繁更新的資料
- 單獨過濾資料,命中不高更新不頻繁的資料
-
快取雪崩:
- 快取抖動:快取故障導致,一致性hash演算法解決
- 快取原因導致大量請求到db,導致db宕機
- 多個快取的資料週期性大量集中失效,也可能導致db壓力過大
- 解決方案:
- 多級快取
- 限流
- 降級
十二、訊息佇列
1、訂單和手機簡訊(非同步解耦)
2、訊息佇列特徵
- 業務無關:訊息分發
- FIFO:先投遞先到達
- 容災:節點的動態增刪和訊息的持久化
- 效能:吞吐量提升,系統內部通訊效率提高
3、為什麼需要訊息佇列
生產和消費的速度或穩定性等因素不一致
4、訊息佇列的好處
-
業務解耦
-
最終一致性(兩個系統的狀態一樣,RocketMQ ZeroMQ):交易系統的高可靠
- 跨jvm的一致性問題解決方案:強一致性(分散式事務)和最終一致性實現簡單
- 依靠定時任務和db實現最終一致性
-
廣播
-
錯峰與流控:
- 上下游處理能力不同,web前端lvs負載均衡 nginx伺服器等裝置提升到上千萬請求,資料庫處理能力有限
- 兩個系統之間(滑動視窗也可實現)處理能力不同
5、訊息佇列距離
-
Kafka
- 高效能 跨語言 分散式 釋出訂閱訊息佇列的系統
- 支援快速持久化 O(1)的系統開銷下進行持久化
- 高吞吐 10w/s producer broker counsumer原生支援分散式,自動實現負載均衡
- Hadoop資料並行載入,統一線上和離線資料
-
RabbitMQ
- Exchange:訊息分發(分發策略)
- Queue:佇列
十三、應用拆分思路
1、拆分原則
- 業務優先
- 循序漸進
- 兼顧技術:重構、分層
- 可靠測試
2、思考
- 應用之間通訊:RPC(dubbo等)、訊息佇列
- 應用之間資料庫涉及:每個應用都有獨立的資料庫
- 避免事務操作跨應用
3、元件
(1)Dubbo:服務註冊到zookeeper
(2)Spring Cloud
-
獨立的服務共同組成一個系統
-
單個部署,每個跑在自己的程序中
-
每個服務為獨立的業務開發
-
分散式管理,強調隔離性
-
標準:
- 分散式服務組成的系統
- 按照業務,不是按照技術劃分
- 有生命的產品而不是專案
- 強服務個體,弱通訊
- 自動化運維,devops
- 高度的容錯性
- 可以快速演化和迭代
-
客戶端訪問服務:Api Gateway
- 提供統一的入口
- 微服務對於前臺透明
- 聚合後臺服務
- 整合流量,提升效能
- 安全,過濾,流控
-
服務之間通訊:
- 非同步:訊息佇列(一致性減弱,需要實現冪等性)
- 同步:
- rest(http):SpringBoot Vert.x Dropwizard
- rpc:dubbo
-
服務發現:zookeeper註冊
-
服務可靠性:
- 重試機制
- 熔斷機制
- 限流機制
- 系統降級
十四、限流機制
1、常見限流
- 限制總併發數
- 限制瞬時併發數
- 限制時間視窗內的平均速率
2、演算法
-
計數器法(1分鐘100個)
-
滑動視窗(10秒一個,每格都有獨立的計數器)
-
漏桶演算法
- 出水恆定
- 超出溢位
-
令牌桶演算法
3、演算法對比
- 計數器法 VS 滑動視窗
- 計數器是滑動視窗低精度的實現
- 滑動視窗實現需要更多的空間
- 漏桶演算法 VS 令牌桶演算法
- 令牌桶演算法允許一定條件的突發,因為取走token的不需要耗費時間
十五、服務降級和熔斷思路
1、服務降級
- 自動降級:超時、失敗次數、故障、限流
- 人工降級:秒殺、雙11大促
2、熔斷思路與服務降級對比
- 共性:目的(可用性 可靠性著想)、最終表現(不可達)、粒度(服務、資料持久型)、自治(自動觸發)
- 區別:
- 觸發原因:熔斷是下級服務引起,降級是整體負荷考慮
- 管理層次:熔斷是框架級處理,降級對業務有層級之分(最外圍)
- 實現不同
3、服務降級需要考慮的問題
- 核心和非核心服務
- 是否支援降級,降級策略
- 業務放行場景,策略
4、Hystrix(服務降級實現)
- 再通過第三方client訪問(網路)依賴服務出現高延遲或者失敗時,為系統提供保護和控制
- 再分散式系統中防止級聯失敗
- 快速失敗(Fail fast)同時能快速恢復
- 提供失敗回退和優雅的服務降級機制
十六、資料庫切庫
1、資料庫瓶頸
- 單個數據庫過大:多個庫
- 單個數據庫壓力過大,讀寫瓶頸:多個庫
- 單個表資料量過大:分表
2、資料庫切庫
- 切庫基礎:讀寫分離,1主多從
- 自定義註解實現資料庫切庫
- 程式碼實現多資料來源
3、資料分表
- 什麼時候分表?
- 橫向分表(id取mod)和縱向分表(根據資料活躍度分離資料)
- 資料庫分表:mybatis分表外掛 shardbatis2.0
十七、高可用的手段
- 任務排程系統分散式:elastic-job + zookeeper(無中心化的思想)
- 主備切換:apache curator + zookeeper分散式鎖實現(多個伺服器向zookeeper獲取鎖)
- 監控報警機制
十八、NIO與AIO
1、NIO的特性
- 基於塊(Block),以塊為基本單位處理
- 為所有的原始型別提供(Buffer)快取支援 ,如ByteBuffer
- 增加通道(Channel)物件,作為新的原始 I/O 抽象
- 支援鎖和記憶體對映檔案的檔案訪問介面
- 提供了基於Selector的非同步網路I/O
2、Buffer && Channel
- 檔案複製
public static void nioCopyFile(String resource, String destination) throws IOException {
FileInputStream fis = new FileInputStream(resource);
FileOutputStream fos = new FileOutputStream(destination);
FileChannel readChannel = fis.getChannel(); //讀檔案通道
FileChannel writeChannel = fos.getChannel(); //寫檔案通道
ByteBuffer buffer = ByteBuffer.allocate(1024);//讀入資料快取
while (true) {
buffer.clear();
int len = readChannel.read(buffer); //讀入資料
if (len == -1) break; //讀取完畢
buffer.flip(); // 讀寫切換
writeChannel.write(buffer); //寫入檔案
}
readChannel.close();
writeChannel.close();
}
- 檔案對映到記憶體
RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw"); FileChannel fc = raf.getChannel(); //將檔案對映到記憶體中
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
while(mbb.hasRemaining()){
System.out.print((char)mbb.get());
}
mbb.put(0,(byte)98); //修改檔案
raf.close();
- 三個重要的引數
引數 | 寫模式 | 讀模式 |
---|---|---|
position | 從position的下一個位置寫資料 | 從position該位置讀資料 |
capacity | 緩衝區總容量 | 緩衝區總容量 |
limit | 緩衝區實際上限,通常與capacity相等 | 代表可讀容量,與上次寫入的資料量相等 |
- 重要api
- rewind:將position置零,並清除標誌位(mark),limit不變 ,即可重新讀或寫
- clear:將position置零,同時將limit設定為capacity的大小,並清除了標誌mark
- flip:先將limit設定到position所在位置,然後將position置零,並清除標誌位mark (通常用於讀寫切換)
3、網路程式設計
(1)BIO
// 客戶端
public class NioClient {
private static final int sleepTime = 1000 * 1000 * 1000;
public static void main(String[] args) throws IOException {
ExecutorService tp = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
tp.execute(new EchoClient());
}
}
public static class EchoClient implements Runnable {
@Override
public void run() {
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
writer = new PrintWriter(client.getOutputStream(), true);
writer.print("H");
LockSupport.parkNanos(sleepTime);
writer.print("e");
LockSupport.parkNanos(sleepTime);
writer.print("l");
LockSupport.parkNanos(sleepTime);
writer.print("l");
LockSupport.parkNanos(sleepTime);
writer.print("o");
LockSupport.parkNanos(sleepTime);
writer.print("!");
LockSupport.parkNanos(sleepTime);
writer.println(); // 這行很重要
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println("from server: " + reader.readLine());
} catch (IOException e) {
e.printStackTrace();
} finally {
writer.close();
try {
reader.close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
// 服務端
public class BIOServer {
public static void main(String[] args) {
ExecutorService tp = Executors.newCachedThreadPool();
ServerSocket echoServer = null;
Socket clientSocket = null;
try {
echoServer = new ServerSocket(8000);
} catch (IOException e) {
System.out.println(e);
}
while (true) {
try {
clientSocket = echoServer.accept();
System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
tp.execute(new HandleMsg(clientSocket));
} catch (IOException e) {
System.out.println(e);
}
}
}
static class HandleMsg implements Runnable {
private Socket clientSocket;
public HandleMsg(Socket clientSocket) {
this.clientSocket = clientSocket;
}
public void run() {
BufferedReader is = null;
PrintWriter os = null;
try {
is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
// 從InputStream當中讀取客戶端所傳送的資料
os = new PrintWriter(clientSocket.getOutputStream(), true);
String inputLine = null;
long b = System.currentTimeMillis();
while ((inputLine = is.readLine()) != null) {
os.println(inputLine);
}
long e = System.currentTimeMillis();
System.out.println("spend:" + (e - b) + " ms ");
} catch (IOException e) {
e.printStackTrace();
} finally {
//省略資源關閉
}
}
}
}
(2)Nio
public class NioServer {
private ExecutorService tp = Executors.newCachedThreadPool();
private Selector selector;
private Map<Socket, Long> geym_time_stat = new HashMap<>();
class EchoClient {
private LinkedList<ByteBuffer> outQueue;
public EchoClient() {
outQueue = new LinkedList<>();
}
public LinkedList<ByteBuffer> getOutQueue() {
return outQueue;
}
public void enqueue(ByteBuffer byteBuffer) {
outQueue.addFirst(byteBuffer);
}
}
private void startServer() throws IOException {
selector = SelectorProvider.provider().openSelector();
// 配置為非阻塞
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// 繫結埠
InetSocketAddress address = new InetSocketAddress(8000);
serverChannel.socket().bind(address);
// 註冊socket監聽事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
for (; ; ) {
selector.select();
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
long e = 0;
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isAcceptable()) {
doAccept(selectionKey);
} else if (selectionKey.isValid() && selectionKey.isReadable()) {
Socket socket = ((SocketChannel) selectionKey.channel()).socket();
if (!geym_time_stat.containsKey(socket)) {
geym_time_stat.put(socket, System.currentTimeMillis());
}
doRead(selectionKey);
} else if (selectionKey.isValid() && selectionKey.isWritable()) {
doWrite(selectionKey);
Socket socket = ((SocketChannel) selectionKey.channel()).socket();
e = System.currentTimeMillis();
long b = geym_time_stat.remove(socket);
System.out.println("spend:" + (e - b) + "ms");
}
}
}
}
private void doWrite(SelectionKey selectionKey) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
EchoClient echoClient = (EchoClient) selectionKey.attachment();
LinkedList<ByteBuffer> outQueue = echoClient.getOutQueue();
ByteBuffer buffer = outQueue.getLast();
try {
int len = channel.write(buffer);
if (len == -1) {
disconnect(selectionKey);
return;
}
if (buffer.remaining() == 0) {
outQueue.removeLast();
}
} catch (IOException e) {
System.out.println("Failed: write to client");
e.printStackTrace();
disconnect(selectionKey);
}
if (outQueue.size() == 0) {
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
private void disconnect(SelectionKey selectionKey) {
try {
selectionKey.selector().close();
selectionKey.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doRead(SelectionKey selectionKey) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
int len;
try {
len = channel.read(buffer);
if (len < 0) {
channel.socket().close();
return;
}
} catch (IOException e) {
e.printStackTrace();
}
buffer.flip();
tp.execute(new HandleMsg(selectionKey, buffer));
}
class HandleMsg implements Runnable {
private SelectionKey selectionKey;
private ByteBuffer buffer;
public HandleMsg(SelectionKey selectionKey, ByteBuffer buffer) {
this.selectionKey = selectionKey;
this.buffer = buffer;
}
@Override
public void run() {
EchoClient echoClient = (EchoClient) selectionKey.attachment();
echoClient.enqueue(buffer);
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selector.wakeup(); //強迫selector立即返回
}
}
private void doAccept(SelectionKey selectionKey) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel clientChannel;
try {
clientChannel = server.accept();
clientChannel.configureBlocking(false);
SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
EchoClient echoClient = new EchoClient();
clientKey.attach(echoClient);
InetAddress inetAddress = clientChannel.socket().getInetAddress();
System.out.println("accepted from :" + inetAddress.getHostAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
NioServer nioServer = new NioServer();
try {
nioServer.startServer();
} catch (IOException e) {
e.printStackTrace();
}
}
}
(3)AIO
public class AIOEchoServer {
private AsynchronousServerSocketChannel server;
public static void main(String[] args) throws IOException {
AIOEchoServer aioServer = new AIOEchoServer();
aioServer.init("localhost", 8000);
}
private void init(String host, int port) throws IOException {
//ChannelGroup用來管理共享資源
AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);
server = AsynchronousServerSocketChannel.open(group);
//通過setOption配置Socket
server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
server.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
//繫結到指定的主機,埠
server.bind(new InetSocketAddress(host, port));
System.out.println("Listening on " + host + ":" + port);
//等待連線,並註冊CompletionHandler處理核心完成後的操作。
server.accept(null, new CompletionHandler<>() {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println(Thread.currentThread().getName());
buffer.clear();
try {
//把socket中的資料讀取到buffer中
result.read(buffer).get();
buffer.flip();
System.out.println(System.currentTimeMillis()+" Echo " + new String(buffer.array()).trim() +" to"+result.getRemoteAddress());
//把收到的直接返回給客戶端
result.write(buffer);
buffer.flip();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
//關閉處理完的socket,並重新呼叫accept等待新的連線
result.close();
server.accept(null, this);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.print("Server failed...." + exc.getCause());
}
});
//因為AIO不會阻塞呼叫程序,因此必須在主程序阻塞,才能保持程序存活。
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}