1. 程式人生 > 程式設計 >react-diagram 序列化Json解讀案例分析

react-diagram 序列化Json解讀案例分析

一、併發程式設計知識準備

(1)併發:多種執行緒操作相同的資源,保證執行緒安全,合理使用資源

(2)高併發:服務能同時處理很多請求,提高程式效能

(3)知識技能

  • 總體架構:Spring Boot、Maven、JDK8、MySql
  • 基礎元件:Mybatis、Guava、Lombok、Redis、Kafka
  • 高階組建:Joda-Time、Atomic、JUC、AQS、ThreadLocal、RateLimiter、Hystrix、ThreadPool、ShardBatis、curator、elastic-job...

二、併發基礎

1、CPU多級快取——快取一致性

2、CPU多級快取——重排序

3、Java記憶體模型

4、併發的優勢與風險

三、併發模擬

1、Postman:HTTP請求工具

2、AB(Apache Bench):測試網站效能

Concurrency Level:      50
Time taken for tests:   0.173 seconds
Complete requests:      1000
Failed requests:        0
Total transferred:      136000 bytes
HTML transferred:       4000 bytes
Requests per second:    5764.98 [#/sec] (mean)
Time per request:       8.673 [ms] (mean)
Time per request:       0.173 [ms] (mean, across all concurrent requests)
Transfer rate:          765.66 [Kbytes/sec] received

3、Jmeter:壓測工具

4、程式碼:Semaphore、CountDownLatch等

四、執行緒安全性

1、執行緒安全性

定義:當多個執行緒訪問某個類時,不管執行時環境採用**何種排程方式**或者程序如何交替執行,並且在 **主調程式碼中不需要任何額外的同步或協同**,這個類都能表現出**正確的行為**,那麼稱這個類是執行緒安全的。

2、執行緒安全性的表現方式

(1)原子性:提供了互斥訪問,同一時刻只能有一個執行緒來對它進行操作

(2)可見性:一個執行緒對主記憶體的修改可以即使被其他執行緒觀察到

(3)有序性:一個執行緒觀察其他執行緒中的指令執行順序,由於指令重排序的存在,該觀察結果一般雜亂無序

3、原子性:Atomic包

(1)AtomicXXX:CAS、Unsafe.compareAndSwapInt

// UnSafe類中的方法
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset); // 獲取物件o中的底層值
        // offset == v 修改成功,反之迴圈繼續判斷
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetIntRelease(Object o, long offset,int expected,int x) {
    return compareAndSetInt(o, offset, expected, x); //CAS
}

(2)AtomicLong、LongAdder

  • Long不是原子的,會分成兩部分更新。
  • LongAdder:熱點資料分離,即AtmoicLong中的資料分離為陣列,執行緒訪問使用hash演算法命中某個數字進行計數,最終結果為陣列求和。
  • 應用:
    • 競爭壓力低、序列號:AtmoicLong
    • 競爭壓力大:LongAdder

(3)AtomicBoolean:VarHandle實現

  • 原子操作型別:
    • Unsafe:JVM內建函式API,損害安全性和可移植性
    • 原子性的FieldUpdaters,運用了反射
    • 使用原有原子類AtomicInteger,記憶體開銷大
  • VarHandle
    • 安全、高可用、高效能
    • 更細粒度的控制記憶體排序
    • 可與任何欄位、陣列元素或靜態變數關聯
  • 建立VarHandle
public class VarhandleFoo {
    volatile int x;
    private Point[] points;

    private static final VarHandle QA;//for arrays
    private static final VarHandle X;//for Variables
    static {
        try {
            QA =  MethodHandles.arrayElementVarHandle(Point[].class);
            X = MethodHandles.lookup(). // Lockup類
                    findVarHandle(Point.class, "x", int.class); 
            //X = MethodHandles.lookup().in(Point.class).findVarHandle(Point.class, "x", int.class);
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
    }

    class Point {
        // ...
    }
}
  • 使用VarHandle
//plain read
int x = (int) X.get(this);
Point p = (Point) QA.get(points,10);

//plain write
X.set(this,1);
QA.set(points,10,new Point());

//CAS
X.compareAndSet(this,0,1);
QA.compareAndSet(points,10,p,new Point());

//Numeric Atomic Update
X.getAndAdd(this,10);
  • 記憶體排序影響
    • 對於引用型別和32位以內的原始型別,read和write(get、set)都可以保證原子性,並且對於執行執行緒以外的執行緒不施加可觀察的排序約束。
    • 不透明屬性:訪問相同變數時,不透明操作按原子順序排列。
    • Acquire模式的讀取總是在與它對應的Release模式的寫入之後。
    • 所有Volatile變數的操作相互之間都是有序的。
  • 應用:控制只有一個執行緒執行

(4)AtomicReference、AtomicReferenceFieldUpdater

(5)AtomicStampReference:CAS的ABA問題

(6)AtomicLongArray:原子索引中的值

(7)AtomicIntegerFieldUpdater:讓普通變數(int)支援原子操作

  • 只支援可見的變數
  • 變數需要宣告為volatile
  • 不支援靜態變數

(8)StampLock:讀寫鎖改進,樂觀讀

  • 適用於讀操作很多,寫操作很少的場景(優先滿足寫操作)
  • 用於防止寫操作飢餓
  • 也可退化為讀寫鎖

4、UnSafe類

(1)概述

  • 根據偏移量設定值
  • park()
  • 底層的CAS操作
  • 記憶體屏障

(2)主要介面

//獲得給定物件偏移量上的int值 
public native int getInt(Object o,long offset); 
//設定給定物件偏移量上的int值 
public native void putInt(Object o,long offset, int x); 
//獲得欄位在物件中的偏移量 
public native long objectFieldOffset(Field f); 
public native long staticFieldOffset(Field f);
//設定給定物件的int值,使用volatile語義 
public native void putIntVolatile(Object o,long offset,int x); 
//獲得給定物件物件的int值,使用volatile語義 
public native int  getIntVolatile(Object o,long offset); 
//和putIntVolatile()一樣,但是它要求被操作欄位就是volatile型別的 
public native void putOrderedInt(Object o,long offset,int x);

// cas設值
public final boolean compareAndSwapInt(Object o, long offset,
                                           int expected,
                                           int x);
// 記憶體屏障                                           
public  void fullFence();
public  void loadFence();
public  void storeFence();
public  void loadLoadFence();
public  void storeStoreFence();

//------------------陣列操作---------------------------------
//獲取給定陣列的第一個元素的偏移地址
public native int arrayBaseOffset(Class<?> arrayClass);
//獲取給定陣列的元素增量地址,也就是說每個元素的佔位數
public native int arrayIndexScale(Class<?> arrayClass);

//--------------------鎖指令(synchronized)-------------------------------
//物件加鎖
public native void monitorEnter(Object o);
//物件解鎖
public native void monitorExit(Object o);
public native boolean tryMonitorEnter(Object o);
//解除給定執行緒的阻塞
public native void unpark(Object thread);
//阻塞當前執行緒
public native void park(boolean isAbsolute, long time);

//------------------記憶體操作----------------------
// 在本地記憶體分配一塊指定大小的新記憶體,記憶體的內容未初始化;它們通常被當做垃圾回收。
public native long allocateMemory(long bytes);
//重新分配給定記憶體地址的本地記憶體
public native long reallocateMemory(long address, long bytes);
//將給定記憶體塊中的所有位元組設定為固定值(通常是0)
public native void setMemory(Object o, long offset, long bytes, byte value);
//複製一塊記憶體
public native void copyMemory(Object srcBase, long srcOffset,
                              Object destBase, long destOffset,
                              long bytes);
//釋放給定地址的記憶體
public native void freeMemory(long address);

5、原子性:鎖

(1)概述

  • synchronized:依賴JVM
  • Lock:依賴特殊的CPU指令,程式碼實現,ReentrantLock

(2)synchronized

  • 修飾程式碼塊:作用於呼叫的物件
  • 修飾方法:作用於呼叫的物件
  • 修飾靜態變數:作用於所有呼叫物件
  • 修飾類:作用於所有呼叫物件

(3)原子性對比

  • synchronized:不可中斷,適合競爭不激烈,可讀性好
  • Lock:可中斷鎖,多樣化同步,競爭激烈時能維持常態
  • Atomic:競爭激烈時維持常態,效能比Lock好;但只能同步一個值

6、可見性

(1)共享變數不可見線上程間不可見原因

  • 執行緒交叉執行
  • 重排序結合線程交叉執行
  • 共享變數更新後的值沒有從工作記憶體重新整理到主記憶體中(CPU快取和記憶體)

(2)synchronized的可見性

  • 執行緒解鎖時,把共享變數的最新值重新整理到主記憶體
  • 執行緒解鎖時,清空工作記憶體中共享變數的值,即共享變數需要從主記憶體中重新讀取最新值

(3)volatile的可見性

  • 通過加入記憶體屏障禁止重排序來實現
  • volatile變數的寫後面加入store屏障,將本地記憶體的共享變數值重新整理到主存
  • volatile變數的讀前面加入load屏障,從主存中讀取共享變數

(4)volatile應用

  • 作為狀態標識量
volatile boolean inited = false;

// 執行緒1
context = loadContext();
inited = true;

// 執行緒2
while(!inited)
    sleep();
doSomethingWithConfig(context);
  • double checked:保證單例等

(5)可見性的原因

  • CPU快取的存在
  • 編譯器的優化
  • 重排序導致看到的值不一致

6、有序性

(1)java記憶體模型中,允許編譯器和處理器對指令進行重排序,但重排序不影響單執行緒程式的執行,卻可能影響到多執行緒的正確性
(2)volatile、synchronized、Lock
(3)happens-before原則
  • 程式次序原則:程式次序前先於後
  • 鎖定規則:unlock先於lock操作
  • volatile規則:寫先於讀操作
  • 傳遞規則:A先於B,B先於C,則A先於C
  • 執行緒啟動規則:start 先於 run
  • 執行緒中斷規則:interrupt先於執行緒中斷髮生
  • 執行緒終結規則:執行緒中所有操作先於執行緒終止,Thread.join方法結束、Thread.isAlive返回值檢測執行緒終止與否
  • 物件終結規則:物件的初始化完成先於完成它的finalize方法的開始

(4)一條指令的執行可以分為多個步驟:

  • 取址 IF
  • 譯碼和取暫存器運算元 ID
  • 執行或者有效地址計算 EX
  • 儲存器訪問 MEM
  • 寫回暫存器 WB

(5)重排序的原因

  • 重排序不影響單執行緒的執行結果,消除“氣泡(停頓時間)”,使得流水線更加順暢

五、安全釋出物件

1、釋出物件

  • 釋出物件:使一個物件能被當前範圍之外的程式碼所使用
  • 物件逸出:錯誤的釋出,物件未構建完成時,被其他執行緒所見

2、安全釋出物件

  • 在static函式中初始化一個物件引用
  • 將物件的引用儲存到volatile型別域或者AtomicReference物件中
  • 將物件的引用儲存到某個正確建構函式的final型別域中
  • 將物件的引用儲存到由鎖保護的域中

六、不可變物件

1、不可變物件的條件:參考String類

  • 物件建立後狀態不能修改
  • 物件所有域都是final型別
  • 物件是正確建立的(沒有發生逸出)

2、 final關鍵字:類、方法、變數

  • 類:不能被繼承,方法隱式變為final
  • 方法:private方法隱式變為final方法
    • 鎖定方法不能被繼承類修改
    • 效率
  • 變數:基本資料型別變數(無法修改值)、引用型別變數(無法指向另外一個物件)

3、不可變方式

  • final
  • Collections.unmodifiableXXX:Collection List Set Map
  • Guava:ImutableXXX Collection List Set Map

4、執行緒封閉

  • Ad-hoc執行緒封閉:程式控制實現,不推薦
  • ThreadLocal:推薦
  • 堆疊封閉:區域性變數,無併發問題

6、常見執行緒不安全

  • StringBuilder -> StringBuffer
  • DateFormat -> joda-time包中的DateTimeFormatter
  • ArrayList HashSet HashMap等Collections
  • 先檢查在執行:if(condition(a)){ handler(a); } -> CAS方法或者加鎖

7、同步容器

  • ArrayList -> Vector,Stack
  • HashMap -> HashTable
  • Collections.synchronizedXXX(List Set Map)

注:集合遍歷(foreach、iterator)的時候有對集合進行增刪操作將導致ConcurrentModificationException.需要進行更新,可以先打標記,遍歷完再進行操作。

8、併發容器——JUC

  • ArrayList -> CopyOnWriteArrayList
    • 拷貝需要消耗記憶體,可能發生GC
    • 不能用於實時性的場景,只滿足最終一致性
    • 只適合讀多寫少的場景
    • 讀寫分離,使用時另外開闢空間,進行併發保護
  • HashSet、TreeSet -> CopyOnWriteArraySet、ConcurrentSkipListSet
    • CopyOnWriteArraySet底層使用CopyOnWriteArrayList
    • ConcurrentSkipListSet基於ConcurrentSkipListMap,只有單次操作時原子的,但批量操作(containAll)不是原子的。add呼叫了putIfAbsent方法
  • HashMap、TreeMap -> ConcurrentHashMap、ConcurrentSkipListMap
    • ConcurrentSkipListMap的Key值有序,存取時間與併發執行緒數無關
    • 併發低時ConcurrentHashMap效率更高,併發高時ConcurrentSkipListMap效率更高。

9、安全共享物件策略——總結

  • 執行緒限制物件:由執行緒獨佔,並且只能被佔用它的執行緒修改
  • 共享只讀物件:再沒有額外同步的情況下,可被多個執行緒併發訪問,但任何執行緒都不能修改它
  • 執行緒安全物件:在內部通過同步機制來保證執行緒安全,所以其他執行緒無需額外的同步就可以通過公共介面任意訪問它
  • 被守護物件:只能通過獲取特定的鎖來訪問

七、JUC之AQS

1、AbstractQueuedSynchronizer——AQS

  • Sync Queue,底層為雙向佇列;Condition Queue,單項鍊表,等待某個條件的佇列
  • 使用Node實現FIFO佇列,用於構建鎖或者其他同步裝置的基礎框架
  • 利用int型別表示狀態
  • 使用方法是繼承,模板方法模式
  • 子類通過繼承並通過實現它的方法管理其狀態{acquire和release}的方法操縱狀態
  • 可以同時實現排他鎖和共享鎖模式(獨佔和共享)

2、AQS同步元件

  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

3、CountDownLatch

  • 計數器不能被重置
  • 一個執行緒等待計數器變為0(某個條件)
  • 父任務等待子任務執行完成才去做彙總操作

4、Semaphore

  • 有限訪問的資源,限制併發訪問的數目
  • 共享鎖
  • 一次可以獲取多個許可(acquire(3)),一次也可以釋放多個許可(realease(3))
  • 嘗試獲取許可(可加等待時間),獲取不到可以丟棄掉

5、CyclicBarrier

  • 多個執行緒等待,等待某個條件達成後,執行緒才繼續執行
  • 多執行緒計算,最後計算結果執行彙總(fork -> join)
  • 可重置,各個執行緒相互等待
  • 可用於更復雜的場景

6、ReentrantLock與鎖

  • ReetrantLock和Synchronized區別

    • 可重入性
    • 鎖的實現
      • jvm實現
      • jdk實現
    • 效能區別
      • 偏向鎖、輕量級鎖、自旋鎖、重量級鎖
      • 使用者態進行解決,避免進入核心態,採用cas技術
    • 功能
      • 便利性:自動加鎖和釋放
      • 細粒度:ReetrantLock優於synchronized
  • ReetrantLock特有功能

    • 可指定公平鎖和非公平鎖,公平為先等待先獲取鎖
    • Condition類,分組喚醒需要喚醒得執行緒
    • 中斷等待鎖得機制,lock.lockInterruptibly()
    • 自旋cas,避免進入核心態
  • synchronized能夠在物件頭標識物件所處狀態,便於除錯

  • ReentrantReadWriteLock

    • 悲觀讀取,只有沒有讀操作時,才可以寫
    • 適用於寫多讀少的情況
  • StampedLock

    • 樂觀讀,樂觀認為讀操作時有寫操作存在,如果讀完後發現有寫操作,再進行處理。
    • 適用於讀多寫少的情況
  • Condition

    • condition.await():釋放鎖並等待訊號,進入condition佇列;當獲取到訊號並被喚醒後將重新獲取到鎖
    • condition.signal():傳送訊號,獲取condition佇列的值放入sync佇列中,此時並未釋放鎖

7、併發相關概念

  • PV(page view):網站的總訪問量,頁面瀏覽量或點選量,使用者沒重新整理一次就會被記錄一次
  • UV(unique visitor):網站的訪客量,一般時0~24的相同的IP地址只記錄一次
  • QPS(query per second):每秒伺服器支援的查詢量
  • RT(response time):請求的響應時間
  • 峰值QPS = (總PV * 80%)/ (606024*20%)
  • 機器數 = 總的峰值QPS / 壓測得出的單機極限qps

八、JUC元件拓展

1、FutureTask

  • Callable與Runnable介面比較
    • Callable有返回值且能丟擲異常(jdk1.5)
    • Runnable沒有返回值
  • Future介面
    • 非同步程式設計
    • 獲取其他執行緒的返回值
    • Future.get():當其他執行緒未執行完將阻塞直到執行緒執行完,並獲取返回值
    • 可以取消任務
  • FutureTask類:繼承Runnable和Future介面
    • 可以作為Runnable傳入執行緒類執行
    • 也可以作為Future傳入執行緒類並獲取返回值
    • 用於某個執行緒執行需要時間且有返回值,但當前執行緒不需要等待,可以先執行其他業務,直到需要該返回值才阻塞去獲取返回值。

2、Fork/Join框架(Map-Reduce思想)

  • 任務竊取:執行緒1執行完自己的任務,則去竊取執行緒2的任務,為了防止重複執行任務,則從其他執行緒的尾部進行竊取任務

  • 缺點:

    • 任務少時,消耗等待時間
    • 建立過多的執行緒和雙端任務佇列,浪費空間資源
  • 特點:

    • 任務只能使用fork和join進行同步
    • 任務不能執行IO操作
    • 任務不能丟擲異常
  • ForkJoinPool:執行類,執行ForkJoinTask

  • ForkJoinTask:任務類,需要實現compute方法

3、BlockingQueue

  • 阻塞情況:
    • 入隊時,發現佇列已滿時
    • 出隊時,發現佇列已空時
  • 消費者生產者模型
  • 多種場景:
- Throw Exception Special Value Blocks Time Out
Insert add(o) offer(o) put(o) offer(o,timeout,timeunit)
Remove remove(o) poll() take() poll(timeout,timeunit)
Examine element() peek()
  • 實現類
    • ArrayBlockingQueue:有界,FIFO
    • DelayQueue:元素需要實現Delayed介面,繼承了Comparable介面,即元素需要排序獲取過期時間(內部實現:PriorityQueueReentrantLock
    • LinkedBlockingQueue:可有界可無界(最大為Integer.MAX_VALUE),FIFO
    • PriorityBlockingQueue:無邊界佇列,元素需要實現Comparable介面
    • SynchronousQueue:同步佇列,只能存放一個元素,把併發執行變為同步執行
    • ArrayDeque(陣列雙端佇列)
    • LinkedBlockingDeque(基於連結串列的FIFO雙端阻塞佇列)

4 併發設計模式

(1)Future模式

  • 概述

    • 非同步
    • 類似商品訂單
    • 使用者無需一直等待請求的結果,使用者可以繼續瀏覽或者操作其他內容
  • 實現圖

  • 程式碼
public interface Data {
    String getRequest();
}

public class FutureClient {
    public Data request(String queryStr) {
        FutureData data = new FutureData();
        new Thread(()->{
            RealData realData = new RealData(queryStr);
            data.setRealData(realData);
        }).start();
        return data;
    }
}

public class FutureData implements Data {
    private RealData realData;
    private volatile boolean isComplete = false;

    @Override
    public synchronized String getRequest() {
        while (!isComplete) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.getRequest();
    }

    public synchronized void setRealData(RealData realData) {
        if (isComplete) return;
        this.realData = realData;
        isComplete = true;
        notify();
    }
}

public class RealData implements Data {
    private String result;

    public RealData(String queryStr) {
        System.out.println("根據" + queryStr + "進行查詢,這是一個耗時間5s的操作");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        result = "查詢結果";
    }

    @Override
    public String getRequest() {
        return result;
    }
}

public class Main {
    public static void main(String[] args) {
        FutureClient fc = new FutureClient();
        Data data = fc.request("請求引數");
        System.out.println("請求傳送成功");
        System.out.println("繼續執行其他事情");
        // 這個方法會阻塞等待結果執行完成
        String result = data.getRequest();
        System.out.println(result);
    }
}

(2)Master-Slave模式

  • 概述

    • 常用的平行計算模式
    • Master:負責接受和分配任務
    • Worker:負責處理子任務
    • Worker子程序處理完成後,將結果返回給Master,Master進行歸納和總結。
  • 實現圖

  • 程式碼
public class Master {
    // 1.承載任務的集合
    private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();

    // 2.承載所有Worker物件
    private Map<String, Thread> workers = new HashMap<>();

    // 3. 使用容器承載所有Worker執行任務的結果結合
    private Map<String, Object> resultMap = new ConcurrentHashMap<>();

    public Master(Worker worker, int workerCount) {
        worker.setResultMap(resultMap);
        worker.setTaskQueue(taskQueue);
        for (int i = 0; i < workerCount; i++) {
            // key=Worker名字 value=執行緒執行物件
            workers.put("worker" + i, new Thread(worker));
        }
    }

    public void submit(Task task) {
        this.taskQueue.add(task);
    }

    public void execute() {
        workers.values().forEach(Thread::start);
    }

    public boolean isComplete() {
        Collection<Thread> threads = workers.values();
        for (Thread thread : threads) {
            if (thread.getState() != Thread.State.TERMINATED) return false;
        }
        return true;
    }

    public long getResult() {
        return resultMap.values().stream().mapToInt(obj->(Integer)obj).reduce((sum, val) -> sum+val).getAsInt();
    }
}

@Data
public class Task {
    private int id;
    private String name;
    private int price;

    public Task(int id, String name, int price) {
        this.id = id;
        this.name = name;
        this.price = price;
    }
}

@Data
public class Worker implements Runnable {
    private ConcurrentLinkedQueue<Task> taskQueue;
    private Map<String, Object> resultMap;

    @Override
    public void run() {
        while (true) {
            Task task = this.taskQueue.poll();
            if (task == null) break;
            Object obj = handle(task);
            // key=id  value=結果
            resultMap.put(String.valueOf(task.getId()), obj);
        }
    }

    // 可以作為抽象方法提取出去
    private Object handle(Task task) {
        Object object = null;
        // 業務耗時
        try {
            Thread.sleep(500);
            object = task.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return object;
    }
}

public class Main {
    public static void main(String[] args) {
        Master master = new Master(new Worker(), 50);
        Random r = new Random();
        for (int i = 1; i <= 100; i++) {
            master.submit(new Task(i, "任務" + i, r.nextInt(1000)));
        }
        master.execute();
        long start = System.currentTimeMillis();
        while (true) {
            if (master.isComplete()) {
                long result = master.getResult();
                System.out.println(result);
                System.out.println("執行時間:" + (System.currentTimeMillis() - start));
                break;
            }
        }
    }
}

(3)生產者與消費者實現

  • 概述

    • 經典的多執行緒模式
    • 生產者執行緒:負責提交使用者處理
    • 消費者執行緒:負責具體處理生產者提交的任務
    • 生產者和消費者通過共享緩衝區進行通訊
  • 實現圖

  • 程式碼
class ProducerThread implements Runnable {
	private BlockingQueue<String> blockingQueue;
	private AtomicInteger count = new AtomicInteger();
	private volatile boolean FLAG = true;

	public ProducerThread(BlockingQueue<String> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "生產者開始啟動....");
		while (FLAG) {
			String data = count.incrementAndGet() + "";
			try {
				boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
				if (offer) {
					System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "成功..");
				} else {
					System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "失敗..");
				}
				Thread.sleep(1000);
			} catch (Exception e) {

			}
		}
		System.out.println(Thread.currentThread().getName() + ",生產者執行緒停止...");
	}

	public void stop() {
		this.FLAG = false;
	}

}

class ConsumerThread implements Runnable {
	private volatile boolean FLAG = true;
	private BlockingQueue<String> blockingQueue;

	public ConsumerThread(BlockingQueue<String> blockingQueue) {
		this.blockingQueue = blockingQueue;
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "消費者開始啟動....");
		while (FLAG) {
			try {
				String data = blockingQueue.poll(2, TimeUnit.SECONDS);
				if (data == null || data == "") {
					FLAG = false;
					System.out.println("消費者超過2秒時間未獲取到訊息.");
					return;
				}
				System.out.println("消費者獲取到佇列資訊成功,data:" + data);

			} catch (Exception e) {
				// TODO: handle exception
			}
		}
	}

}

public class Main {
	public static void main(String[] args) {
		BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
		ProducerThread producerThread = new ProducerThread(blockingQueue);
		ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
		Thread t1 = new Thread(producerThread);
		Thread t2 = new Thread(consumerThread);
		t1.start();
		t2.start();
		// 10秒後 停止執行緒..
        // 可以使用CountDownLatch來等待子執行緒結束
		try {
			Thread.sleep(10*1000);
			producerThread.stop();
		} catch (Exception e) {
			// TODO: handle exception
		}
	}

}

九、執行緒池

1、new Thread弊端

  • 每次new Thread新建物件,效能查
  • 執行緒缺乏統一管理,可能無限制新建執行緒,相互競爭,可能導致宕機或者OOM
  • 功能少,沒有定期執行、執行緒中斷

2、執行緒池好處

  • 降低資源消耗:重用執行緒,減少物件建立和消亡的開銷
  • 提高執行緒的可管理性:能有效控制最大併發執行緒數,提高系統資源利用率,
  • 提高響應速度:避免了過多資源競爭,避免阻塞
  • 提供強大的功能:提供定時執行、定期執行、單執行緒、併發控制等功能

3、ThreadPoolExecutor

  • 引數

    • corePoolSize:核心執行緒數量;
    • maximumPoolSize:最大執行緒數;當workQueue有界才有效。
    • workQueue:阻塞佇列,儲存等待執行的任務
    • keepAliveTime:當執行緒數大於corePoolSize小於maximumPoolSize的執行緒,不會直接從執行緒池移除,而是等keepAliveTime還沒被使用才移除。
    • unit:keepAliveTime的時間單位
    • threadFactory:執行緒工廠,建立執行緒
    • rejectHandler:拒絕處理的策略
      • 丟擲異常:AbortPolicy
      • 用呼叫者所線上程執行任務:CallRunsPolicy
      • 丟棄佇列中最靠前的任務,並執行當前任務:DiscardOldestPolicy
      • 直接丟棄:DiscardPolicy
      • 自定義拒絕策略
  • 執行過程

    • 當一個任務到來,發現當前執行緒池的數量小於corePoolSize,則直接建立執行緒執行
    • 當一個任務到來,沒有多餘執行緒執行,放入workQueue,發現workQueue滿了,但是當前執行緒池的執行緒數量未達到maximumPoolSize,則建立執行緒執行任務。
  • 狀態

    • RUNNING:workQueue能放入任務
    • SHUTDOWN:不能放入任務,但可以執行workQueue中的任務
    • STOP:不能放入任務,也不執行workQueue任務,且會終止正在執行的任務
    • TIDYING:呼叫終結方法
    • TERMINATED:已終結
  • 方法

    • execute:提交任務,交給執行緒池執行
    • submit:提交任務,能夠獲取執行結果,execute+Future
    • shutdown:關閉執行緒池,等待任務都執行完成
    • shutdownNow:關閉執行緒池,不等待執行緒執行完,強行暫停正在執行的任務
  • 監控方法

    • getTaskCount:執行緒池已經執行和未執行的任務總數
    • getCompletedTaskCount:已完成的任務數量
    • getPoolSize:執行緒池當前執行緒數量
    • getActiveCount:當前執行緒池中正在執行任務的執行緒數量
  • 執行緒池建立

    • Executors.newCachedThreadPool:可快取執行緒池,可 回收執行緒
    • Executors.newFixedThreadPool:執行緒數固定
    • Executors.newScheduledThreadPool:定長,定時和週期執行任務
    • Executors.newSingleThreadExecutor:單執行緒執行緒池,保證任務順序執行
  • 合理配置

    • CPU密集型任務,需要儘量壓榨CPU,參考值設定未N*CPU+1
    • IO密集型任務,參考值為2N*CPU

十、多執行緒併發拓展

1、死鎖與活鎖

  • 死鎖必要條件
    • 互斥條件
    • 請求和保持條件
    • 不剝奪條件
    • 環路等待條件
  • 活鎖:當執行緒間互相謙讓資源,導致所有執行緒都無法獲取到足夠資源進行業務處理。

2、併發相關概念

  • 阻塞:當一個執行緒進入臨界區後,其他執行緒必須等待
  • 無障礙:
    • 寬進嚴出,可能導致死迴圈
    • 無競爭時,有限步內完成操作
    • 有競爭時,回滾資料
  • 無鎖:
    • 無障礙
    • 保證有一個執行緒能夠勝出
    • cas就是無鎖的
  • 無等待:
    • 無鎖
    • 要求所有執行緒都必須在有限步數內完成
    • 無飢餓:相當於沒有執行緒優先順序

3、併發最佳實踐

  • 使用本地變數

  • 使用不可變類

  • 最小化鎖的作用域範圍:S=1/(1-a+a/n) 阿姆達爾定律

    • a平行計算部分所佔的比例
    • n平行計算處理的節點個數
    • S為加速比
  • 使用執行緒池,不直接使用new Thread

  • 使用同步也不要使用執行緒的wait和notify

  • 使用BlockingQueue實現生產消費模式

  • 使用併發集合而不是加鎖的同步集合

  • 使用Semaphore建立有界的訪問

  • 寧可使用同步程式碼塊也不適用同步方法

  • 避免使用靜態變數(除非final只讀)

4、Spring與執行緒安全

  • Spring bean(scope):singleton、prototype
  • 無狀態物件:如DTO、DAO
  • 有狀態需要加鎖,或者ThradLocal

5、HashMap與ConcurrentHashMap解析

(1)HashMap

  • 初始容量:16
  • 載入因子:0.75
  • hash值取mod,陣列長度必須為2的n次方
  • HashMap的resize可能出現死迴圈,迭代器fail-fast

(2)ConcurrentHashMap

  • java7:分段鎖

  • java8:紅黑樹

6、併發的兩個重要定律

(1)Amdahl定律(阿姆達爾定律)

  • 定義了序列系統並行化後的加速比的計算公式和理論上限

  • 加速比定義:加速比=優化前系統耗時/優化後系統耗時

$$
加速比公司:S=1/(1-a+a/n)
$$

  • a代表平行計算部分所佔的比例,n為並行處理節點個數。
  • 例如,若序列程式碼佔整個程式碼的25%,則並行處理的總體效能不可能超過4。
  • 增加CPU數量並不一定能起到有效的作用,提高系統內可並行的模組比重,合理增加並行處理器數量,才能以最小的投入,得到較大的加速比

(2)Gustafson定律(古斯塔夫森定律)

  • 說明處理器個數,序列比例和加速比之間的關係
  • 執行時間 :a + b a 序列時間 b並行時間
  • 總執行時間 a+nb n處理器個數
  • 序列比例:F = a/(a+b)

$$
加速比公式:S = n-F(n-1)
$$

  • 只要有足夠的並行化,那麼加速比和CPU個數成正比

7、Amino無鎖類

public class LockFreeList<E> implements List<E> {
    protected static class Entry<E> {
        E element;
        AtomicMarkableReference<Entry<E>> next;
    }
    
    protected AtomicMarkableReference<Entry<E>> head;
    
    public LockFreeList() {
		head = new AtomicMarkableReference<Entry<E>>(null, false);
	}
    
    public boolean add(E e) {
		if (null == e) throw new NullPointerException();
		final Entry<E> newNode = new Entry<E>(e,
				new AtomicMarkableReference<Entry<E>>(null, false));
		while (true) {
			Entry<E> cur = head.getReference();
			newNode.next.set(cur, false);
			if (head.compareAndSet(cur, newNode, false, false)) {
				return true;
			}
		}
	}
    // http://amino-cbbs.sourceforge.net/qs_java.html
}

十一、高併發處理思路與手段

1、擴容

  • 方式
    • 垂直擴容:提高系統部件能力
    • 水平擴容:增加更多系統成員來實現
  • 資料庫擴容
    • 讀操作:memcache、redis、CDN等快取
    • 寫操作:Cassandra、Hbase等

2、快取

  • 快取特徵

    • 命中率:命中數/(命中數+沒有命中數)
    • 最大元素(空間)
    • 清空策略:FIFO、LFU(最少使用策略)、LRU(最近最少使用策略)、過期時間、隨機等
  • 快取命中率影響因素

    • 業務場景和業務需求
    • 快取的涉及(粒度和策略)
    • 快取容量和基礎設施
  • 快取分類和應用場景

    • 本地快取:程式設計實現、Guava Cache
    • 分散式快取:Memcache、Redis

3、Guava Cache

  • 多個segments的細粒度鎖,類似java7HashMap分段鎖
  • LRU演算法移除
  • key -> WeakReference value -> Weak/SoftReference
  • 統計快取命中率 未命中率 異常率

4、Memcache

  • 一致性hash演算法
  • slab_class:
    • slab:數量有限,1.25倍
    • page:1M記憶體,申請記憶體
    • chunk:存放資料,page通過chunk進行切分
  • chunk總有記憶體浪費
  • LRU不是針對全域性的,而是針對slab的
  • key值最大為250位元組
  • 單個item最大為1M
  • 不能遍歷items
  • 非阻塞基於事件

5、Redis

(1)type

  • string
  • hash
  • list
  • set
  • sorted set

(2)編碼方式

  • raw
  • int
  • ht
  • zipmap
  • linkedlist
  • ziplist
  • intset

(3)特徵

  • 支援持久化

  • 資料備份,主從模式

  • 讀效能11w/s,寫效能8w/s

  • 單操作原子性,支援多操作的原子性

  • subsribe/pushlish通知key過期

(4)使用場景:

  • 排行榜,取top n的操作
  • 取最新n個數
  • 計數器
  • 唯一性檢查
  • 實時系統
  • 訊息佇列系統

6、快取問題

  • 快取一致性問題:

    • 更新db成功 -> 更新快取失敗 -> 資料不一致
    • 更新快取成功 -> 更新db失敗 -> 資料不一致
    • 更新db成功 -> 淘汰快取失敗 -> 資料不一致
    • 淘汰快取成功 -> 更新db失敗 ->查詢快取miss
  • 快取穿透問題

    • 快取空物件,空集合,命中不高但頻繁更新的資料
    • 單獨過濾資料,命中不高更新不頻繁的資料
  • 快取雪崩:

    • 快取抖動:快取故障導致,一致性hash演算法解決
    • 快取原因導致大量請求到db,導致db宕機
    • 多個快取的資料週期性大量集中失效,也可能導致db壓力過大
    • 解決方案:
      • 多級快取
      • 限流
      • 降級

十二、訊息佇列

1、訂單和手機簡訊(非同步解耦)

2、訊息佇列特徵

  • 業務無關:訊息分發
  • FIFO:先投遞先到達
  • 容災:節點的動態增刪和訊息的持久化
  • 效能:吞吐量提升,系統內部通訊效率提高

3、為什麼需要訊息佇列

生產和消費的速度或穩定性等因素不一致

4、訊息佇列的好處

  • 業務解耦

  • 最終一致性(兩個系統的狀態一樣,RocketMQ ZeroMQ):交易系統的高可靠

    • 跨jvm的一致性問題解決方案:強一致性(分散式事務)和最終一致性實現簡單
    • 依靠定時任務和db實現最終一致性
  • 廣播

  • 錯峰與流控:

    • 上下游處理能力不同,web前端lvs負載均衡 nginx伺服器等裝置提升到上千萬請求,資料庫處理能力有限
    • 兩個系統之間(滑動視窗也可實現)處理能力不同

5、訊息佇列距離

  • Kafka

    • 高效能 跨語言 分散式 釋出訂閱訊息佇列的系統
    • 支援快速持久化 O(1)的系統開銷下進行持久化
    • 高吞吐 10w/s producer broker counsumer原生支援分散式,自動實現負載均衡
    • Hadoop資料並行載入,統一線上和離線資料
  • RabbitMQ

    • Exchange:訊息分發(分發策略)
    • Queue:佇列

十三、應用拆分思路

1、拆分原則

  • 業務優先
  • 循序漸進
  • 兼顧技術:重構、分層
  • 可靠測試

2、思考

  • 應用之間通訊:RPC(dubbo等)、訊息佇列
  • 應用之間資料庫涉及:每個應用都有獨立的資料庫
  • 避免事務操作跨應用

3、元件

(1)Dubbo:服務註冊到zookeeper

(2)Spring Cloud

  • 獨立的服務共同組成一個系統

  • 單個部署,每個跑在自己的程序中

  • 每個服務為獨立的業務開發

  • 分散式管理,強調隔離性

  • 標準:

    • 分散式服務組成的系統
    • 按照業務,不是按照技術劃分
    • 有生命的產品而不是專案
    • 強服務個體,弱通訊
    • 自動化運維,devops
    • 高度的容錯性
    • 可以快速演化和迭代
  • 客戶端訪問服務:Api Gateway

    • 提供統一的入口
    • 微服務對於前臺透明
    • 聚合後臺服務
    • 整合流量,提升效能
    • 安全,過濾,流控
  • 服務之間通訊

    • 非同步:訊息佇列(一致性減弱,需要實現冪等性)
    • 同步:
      • rest(http):SpringBoot Vert.x Dropwizard
      • rpc:dubbo
  • 服務發現:zookeeper註冊

  • 服務可靠性

    • 重試機制
    • 熔斷機制
    • 限流機制
    • 系統降級

十四、限流機制

1、常見限流

  • 限制總併發數
  • 限制瞬時併發數
  • 限制時間視窗內的平均速率

2、演算法

  • 計數器法(1分鐘100個)

  • 滑動視窗(10秒一個,每格都有獨立的計數器)

  • 漏桶演算法

    • 出水恆定
    • 超出溢位
  • 令牌桶演算法

3、演算法對比

  • 計數器法 VS 滑動視窗
    • 計數器是滑動視窗低精度的實現
    • 滑動視窗實現需要更多的空間
  • 漏桶演算法 VS 令牌桶演算法
    • 令牌桶演算法允許一定條件的突發,因為取走token的不需要耗費時間

十五、服務降級和熔斷思路

1、服務降級

  • 自動降級:超時、失敗次數、故障、限流
  • 人工降級:秒殺、雙11大促

2、熔斷思路與服務降級對比

  • 共性:目的(可用性 可靠性著想)、最終表現(不可達)、粒度(服務、資料持久型)、自治(自動觸發)
  • 區別:
    • 觸發原因:熔斷是下級服務引起,降級是整體負荷考慮
    • 管理層次:熔斷是框架級處理,降級對業務有層級之分(最外圍)
    • 實現不同

3、服務降級需要考慮的問題

  • 核心和非核心服務
  • 是否支援降級,降級策略
  • 業務放行場景,策略

4、Hystrix(服務降級實現)

  • 再通過第三方client訪問(網路)依賴服務出現高延遲或者失敗時,為系統提供保護和控制
  • 再分散式系統中防止級聯失敗
  • 快速失敗(Fail fast)同時能快速恢復
  • 提供失敗回退和優雅的服務降級機制

十六、資料庫切庫

1、資料庫瓶頸

  • 單個數據庫過大:多個庫
  • 單個數據庫壓力過大,讀寫瓶頸:多個庫
  • 單個表資料量過大:分表

2、資料庫切庫

  • 切庫基礎:讀寫分離,1主多從
    • 自定義註解實現資料庫切庫
    • 程式碼實現多資料來源

3、資料分表

  • 什麼時候分表?
  • 橫向分表(id取mod)和縱向分表(根據資料活躍度分離資料)
  • 資料庫分表:mybatis分表外掛 shardbatis2.0

十七、高可用的手段

  • 任務排程系統分散式:elastic-job + zookeeper(無中心化的思想)
  • 主備切換:apache curator + zookeeper分散式鎖實現(多個伺服器向zookeeper獲取鎖)
  • 監控報警機制

十八、NIO與AIO

1、NIO的特性

  • 基於塊(Block),以塊為基本單位處理
  • 為所有的原始型別提供(Buffer)快取支援 ,如ByteBuffer
  • 增加通道(Channel)物件,作為新的原始 I/O 抽象
  • 支援鎖和記憶體對映檔案的檔案訪問介面
  • 提供了基於Selector的非同步網路I/O

2、Buffer && Channel

  • 檔案複製
public static void nioCopyFile(String resource, String destination) throws IOException {
    FileInputStream fis = new FileInputStream(resource);
    FileOutputStream fos = new FileOutputStream(destination);
    FileChannel readChannel = fis.getChannel();   //讀檔案通道
    FileChannel writeChannel = fos.getChannel();  //寫檔案通道
    ByteBuffer buffer = ByteBuffer.allocate(1024);//讀入資料快取
    while (true) {
        buffer.clear();
        int len = readChannel.read(buffer);   //讀入資料
        if (len == -1)  break;         //讀取完畢
        buffer.flip(); // 讀寫切換
        writeChannel.write(buffer);  //寫入檔案
    }
    readChannel.close();
    writeChannel.close();
}
  • 檔案對映到記憶體
RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw"); FileChannel fc = raf.getChannel();  //將檔案對映到記憶體中 
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); 
while(mbb.hasRemaining()){  
    System.out.print((char)mbb.get()); 
} 
mbb.put(0,(byte)98); //修改檔案 
raf.close(); 
  • 三個重要的引數
引數 寫模式 讀模式
position 從position的下一個位置寫資料 從position該位置讀資料
capacity 緩衝區總容量 緩衝區總容量
limit 緩衝區實際上限,通常與capacity相等 代表可讀容量,與上次寫入的資料量相等
  • 重要api
    • rewind:將position置零,並清除標誌位(mark),limit不變 ,即可重新讀或寫
    • clear:將position置零,同時將limit設定為capacity的大小,並清除了標誌mark
    • flip:先將limit設定到position所在位置,然後將position置零,並清除標誌位mark (通常用於讀寫切換)

3、網路程式設計

(1)BIO

// 客戶端
public class NioClient {
    private static final int sleepTime = 1000 * 1000 * 1000;

    public static void main(String[] args) throws IOException {
        ExecutorService tp = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            tp.execute(new EchoClient());
        }
    }

    public static class EchoClient implements Runnable {
        @Override
        public void run() {
            Socket client = null;
            PrintWriter writer = null;
            BufferedReader reader = null;
            try {
                client = new Socket();
                client.connect(new InetSocketAddress("localhost", 8000));
                writer = new PrintWriter(client.getOutputStream(), true);
                writer.print("H");
                LockSupport.parkNanos(sleepTime);
                writer.print("e");
                LockSupport.parkNanos(sleepTime);
                writer.print("l");
                LockSupport.parkNanos(sleepTime);
                writer.print("l");
                LockSupport.parkNanos(sleepTime);
                writer.print("o");
                LockSupport.parkNanos(sleepTime);
                writer.print("!");
                LockSupport.parkNanos(sleepTime);
                writer.println(); // 這行很重要
                writer.flush();
                reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
                System.out.println("from server: " + reader.readLine());
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                writer.close();
                try {
                    reader.close();
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


// 服務端
public class BIOServer {
    public static void main(String[] args) {
        ExecutorService tp = Executors.newCachedThreadPool();
        ServerSocket echoServer = null;
        Socket clientSocket = null;
        try {
            echoServer = new ServerSocket(8000);
        } catch (IOException e) {
            System.out.println(e);
        }
        while (true) {
            try {
                clientSocket = echoServer.accept();
                System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
                tp.execute(new HandleMsg(clientSocket));
            } catch (IOException e) {
                System.out.println(e);
            }
        }
    }

    static class HandleMsg implements Runnable {
        private Socket clientSocket;

        public HandleMsg(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }

        public void run() {
            BufferedReader is = null;
            PrintWriter os = null;
            try {
                is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                // 從InputStream當中讀取客戶端所傳送的資料
                os = new PrintWriter(clientSocket.getOutputStream(), true);
                String inputLine = null;
                long b = System.currentTimeMillis();
                while ((inputLine = is.readLine()) != null) {
                    os.println(inputLine);
                }
                long e = System.currentTimeMillis();
                System.out.println("spend:" + (e - b) + " ms ");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
				//省略資源關閉
            }
        }
    }
}

(2)Nio

public class NioServer {
    private ExecutorService tp = Executors.newCachedThreadPool();
    private Selector selector;
    private Map<Socket, Long> geym_time_stat = new HashMap<>();

    class EchoClient {
        private LinkedList<ByteBuffer> outQueue;

        public EchoClient() {
            outQueue = new LinkedList<>();
        }

        public LinkedList<ByteBuffer> getOutQueue() {
            return outQueue;
        }

        public void enqueue(ByteBuffer byteBuffer) {
            outQueue.addFirst(byteBuffer);
        }
    }

    private void startServer() throws IOException {
        selector = SelectorProvider.provider().openSelector();
        // 配置為非阻塞
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // 繫結埠
        InetSocketAddress address = new InetSocketAddress(8000);
        serverChannel.socket().bind(address);

        // 註冊socket監聽事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        for (; ; ) {
            selector.select();
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            long e = 0;
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();

                if (selectionKey.isAcceptable()) {
                    doAccept(selectionKey);
                } else if (selectionKey.isValid() && selectionKey.isReadable()) {
                    Socket socket = ((SocketChannel) selectionKey.channel()).socket();
                    if (!geym_time_stat.containsKey(socket)) {
                        geym_time_stat.put(socket, System.currentTimeMillis());
                    }
                    doRead(selectionKey);
                } else if (selectionKey.isValid() && selectionKey.isWritable()) {
                    doWrite(selectionKey);
                    Socket socket = ((SocketChannel) selectionKey.channel()).socket();
                    e = System.currentTimeMillis();
                    long b = geym_time_stat.remove(socket);
                    System.out.println("spend:" + (e - b) + "ms");
                }
            }
        }
    }

    private void doWrite(SelectionKey selectionKey) {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        EchoClient echoClient = (EchoClient) selectionKey.attachment();
        LinkedList<ByteBuffer> outQueue = echoClient.getOutQueue();

        ByteBuffer buffer = outQueue.getLast();
        try {
            int len = channel.write(buffer);
            if (len == -1) {
                disconnect(selectionKey);
                return;
            }

            if (buffer.remaining() == 0) {
                outQueue.removeLast();
            }
        } catch (IOException e) {
            System.out.println("Failed: write to client");
            e.printStackTrace();
            disconnect(selectionKey);
        }
        if (outQueue.size() == 0) {
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }

    private void disconnect(SelectionKey selectionKey) {
        try {
            selectionKey.selector().close();
            selectionKey.channel().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doRead(SelectionKey selectionKey) {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
        int len;

        try {
            len = channel.read(buffer);
            if (len < 0) {
                channel.socket().close();
                return;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        buffer.flip();
        tp.execute(new HandleMsg(selectionKey, buffer));
    }

    class HandleMsg implements Runnable {
        private SelectionKey selectionKey;
        private ByteBuffer buffer;

        public HandleMsg(SelectionKey selectionKey, ByteBuffer buffer) {
            this.selectionKey = selectionKey;
            this.buffer = buffer;
        }

        @Override
        public void run() {
            EchoClient echoClient = (EchoClient) selectionKey.attachment();
            echoClient.enqueue(buffer);

            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            selector.wakeup(); //強迫selector立即返回
        }
    }

    private void doAccept(SelectionKey selectionKey) {
        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
        SocketChannel clientChannel;
        try {
            clientChannel = server.accept();
            clientChannel.configureBlocking(false);

            SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
            EchoClient echoClient = new EchoClient();
            clientKey.attach(echoClient);

            InetAddress inetAddress = clientChannel.socket().getInetAddress();
            System.out.println("accepted from :" + inetAddress.getHostAddress());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        NioServer nioServer = new NioServer();
        try {
            nioServer.startServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

(3)AIO

public class AIOEchoServer {
    private AsynchronousServerSocketChannel server;

    public static void main(String[] args) throws IOException {
        AIOEchoServer aioServer = new AIOEchoServer();
        aioServer.init("localhost", 8000);
    }

    private void init(String host, int port) throws IOException {
        //ChannelGroup用來管理共享資源
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);
        server = AsynchronousServerSocketChannel.open(group);
        //通過setOption配置Socket
        server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        server.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
        //繫結到指定的主機,埠
        server.bind(new InetSocketAddress(host, port));
        System.out.println("Listening on " + host + ":" + port);
        //等待連線,並註冊CompletionHandler處理核心完成後的操作。
        server.accept(null, new CompletionHandler<>() {
            final ByteBuffer buffer = ByteBuffer.allocate(1024);
            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                System.out.println(Thread.currentThread().getName());
                buffer.clear();
                try {
                    //把socket中的資料讀取到buffer中
                    result.read(buffer).get();
                    buffer.flip();
                    System.out.println(System.currentTimeMillis()+" Echo " + new String(buffer.array()).trim() +" to"+result.getRemoteAddress());
                    //把收到的直接返回給客戶端
                    result.write(buffer);
                    buffer.flip();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        //關閉處理完的socket,並重新呼叫accept等待新的連線
                        result.close();
                        server.accept(null, this);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.print("Server failed...." + exc.getCause());
            }
        });

        //因為AIO不會阻塞呼叫程序,因此必須在主程序阻塞,才能保持程序存活。
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}