1. 程式人生 > 其它 >高效能進階之路——Java非同步呼叫實現原理詳解

高效能進階之路——Java非同步呼叫實現原理詳解

尊重原創版權: https://www.gewuweb.com/hot/18404.html

高效能進階之路——Java非同步呼叫實現原理詳解

接下來進入到大家比較喜歡的 高效能系列 ,主題內容包括, 訊息佇列 , 快取 和 分散式部署架構 等,在上一篇文章- #
秒殺系統架構圖該怎麼畫?手把手教你! ,講解了博主 悽慘 的經歷,因此在學習相關技術的時候,我們要將其運用到我們實際的專案中,在 高效能篇
結束後,將進入 架構圖2.0版本 ~

什麼是非同步

同步呼叫:呼叫方在呼叫過程中,持續等待返回結果。
 
非同步呼叫:呼叫方在呼叫過程中,不直接等待返回結果,而是執行其他任務,結果返回形式通常為回撥函式。

脫離 IO ,單獨討論 同步 和 非同步 ,我們更加容易去理解它的原理,同步和非同步其實屬於一種 通訊機制
,表達的是,我們在通訊過程中,是主動去詢問,還是等對方主動反饋。體現在同步 (主動) 和非同步 (被動) 上。

程序內非同步呼叫

1、Thread

程序和執行緒:程序是資源分配的最小單位,執行緒是CPU排程的最小單位

Java程序 內最簡單的非同步呼叫方式,就是通過 new Thread().start() 的方式,啟動新的執行緒進行任務的執行( CPU排程 )。

public static void main(String[] args) {
    System.out.println("煲水");
    //建立新的執行緒
    Thread thread1= new Thread(()->{
        try {
            Thread.sleep(5000);
            System.out.println("水開了,"+Thread.currentThread().getName());
        }catch (Exception e){
            e.printStackTrace();
        }
    });
    thread1.start();
    System.out.println("運動");
}

1.1、start() 和 run()

在上述例項程式碼中,我們雖然採用了實現 Runnable 介面的方式,進行新執行緒的實現,但是在方法啟動時,並沒有使用 run() 方法,而是使用了
start() 方法。

run():使用當前執行緒執行 run()方法呼叫,可以理解時同步呼叫

start() 方法在呼叫時,在程式碼邏輯中,會呼叫到一個本地方法 start0 ,

下載 JDK原始碼 後,可以看到 Thread 類 有個 registerNatives 本地方法,該方法主要的作用就是註冊一些本地方法供
Thread 類使用,如 start0(),stop0() 等等,可以說,所有操作本地執行緒的本地方法都是由它註冊的。

可以看出 Java 執行緒 呼叫 start->start0 的方法,實際上會呼叫到 JVM_StartThread 方法,通過呼叫 new
JavaThread(&thread_entry,sz) 完成執行緒的建立。

在 jvm.cpp 中,有如下程式碼段:

在建立完執行緒後,通過 thread_entry 完成 run() 方法的呼叫

1.2、Future

Future 的呼叫方式,屬於 同步非阻塞 , 主要原因在於,在獲取非同步執行緒處理結果時,需要主執行緒主動去獲取,非同步執行緒並沒有通過 主動通知
的方式,將資料結構進行 更新 或 回撥 。

public static void main(String[] args) throws Exception {
    System.out.println("煲水");
    FutureTask<String> futureTask = new FutureTask(()->{
        try {
            Thread.sleep(5000);
            System.out.println("水開了,"+Thread.currentThread().getName());
        }catch (Exception e){
            e.printStackTrace();
        }
        return "water";
    });
    //建立新的執行緒
    Thread thread1= new Thread(futureTask);
    thread1.start();
    System.out.println("運動");
    Thread.sleep(3000);
    //阻塞等待資料
    String result= futureTask.get(5, TimeUnit.SECONDS);
    System.out.println("喝水," + result);
}

Future 的實現原理

類的繼承關係圖如下,可以看到 FutureTask ,實現了 Runnable 介面,那麼在重寫的 run() 方法中,可以看到,在呼叫
call() 方法獲取到結果後,通過 CAS 的方式,更新到 成員變數 中。

任務呼叫結果更新:

1.3、ThreadPoolExecutor

public static void main(String[] args) throws Exception {
    ExecutorService executors = Executors.newFixedThreadPool(10);
    System.out.println("煲水");
    Future<String> future = executors.submit(() -> {
        try {
            Thread.sleep(5000);
            System.out.println("水開了," + Thread.currentThread().getName());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "water";
    });
    System.out.println("運動");

    String result = future.get(5, TimeUnit.SECONDS);
    System.out.println("喝水," + result);
}

上面講解了 FutureTask 的實現原理後,這裡再對比 submit() 和 execute() ,就比較容易理解了,在 submit()
方法中,將 Callable 實現類,封裝成了 FutureTask , 然後再進行實際的呼叫:

1.4、總結

核心區別在於 start() 和 run() , start()是啟動一條 新的執行緒 的同時,完成 run() 方法,這時候是一個
非同步操作 ;如果直接執行 run() 方法, 則會在 當前執行緒 直接執行,是一個 同步阻塞操作 。

而 Future 的呼叫方式,則是一個 同步非阻塞 處理,在提交了 任務 後,不阻塞主執行緒的繼續執行,在到了一定時間後,主執行緒可以通過
get() 方法,獲取 非同步任務 處理結果。

ThreadPoolExecutor 則是維護了一個可複用的執行緒池,解決了 資源複用 , 效能耗時 的問題, Java執行緒 預設大小為
1MB ,執行緒的 建立 和 銷燬 都會佔用記憶體和GC耗時;而執行緒的無限制建立, 則會帶來 CPU負載過高
,每個執行緒分配的時間都很少,導致處理效率低。

2、EventBus

public class JiulingTest {
    public static void main(String[] args) throws Exception {
        System.out.println("開始");
        //使用非同步事件匯流排
        EventBus eventBus  = new AsyncEventBus(Executors.newFixedThreadPool(10));
        // 向上述EventBus物件中註冊一個監聽物件
        eventBus.register(new EventListener());
        // 使用EventBus釋出一個事件,該事件會給通知到所有註冊的監聽者
        eventBus.post(new Event("煲水"));
        System.out.println("運動");
    }
}
// 事件,監聽者監聽的事件的包裝物件
class Event {
    //事件動作
    public String action;

    Event(String action) {
        this.action = action;
    }
}

// 監聽者
 class EventListener {
    // 監聽的方法,必須使用註解宣告,且只能有一個引數,實際觸發一個事件的時候會根據引數型別觸發方法
    @Subscribe
    public void listen(Event event) {
        try {
            System.out.println("Event listener receive message:  " + event.action + " threadName:" + Thread.currentThread().getName());
            Thread.sleep(5000);
            System.out.println("水開了!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.1、觀察者模式

在 EventBus 中,通過 @Subscribe 定義了抽象觀察者的行為, 通過 入口 區分不同的事件 監聽動作 ,如上述的示例程式碼中,
listen(Event event) 只會觀察這個類的事件。

/**
 * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
 */
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
  Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
  Class<?> clazz = listener.getClass();
  //遍歷 @Subscribe 的方法
  for (Method method : getAnnotatedMethods(clazz)) {
    Class<?>[] parameterTypes = method.getParameterTypes();
    Class<?> eventType = parameterTypes[0];
    //然後根據 引數型別,也就是事件型別,進行歸類
    methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
  }
  return methodsInListener;
}

然後在進行事件釋出的時候,通過呼叫 EventBus.post() 方法,便利找到所有的監聽方法:

public void post(Object event) {
//從上述歸類的Map 中,找到所有的觀察者方法
  Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
  if (eventSubscribers.hasNext()) {
  //事件分發,具體呼叫
    dispatcher.dispatch(event, eventSubscribers);
  } else if (!(event instanceof DeadEvent)) {
    // the event had no subscribers and was not itself a DeadEvent
    post(new DeadEvent(this, event));
  }
}

2.2、AsyncEventBus

在示例程式碼中,我們使用的是 new AsyncEventBus(Executors.newFixedThreadPool(10)) 構建的非同步事件匯流排。

由下往上倒推,我們先看 Listern ,是如何執行事件處理方法的,這裡比較好 理解 ,通過執行緒池完成任務的呼叫,具體實現是 通過反射的方式呼叫
@Subscribe 註解的方法。

那麼這裡的 executor 是怎麼來的呢?

this.executor = bus.executor(); //從事件匯流排傳遞過來

回到 EventBus 中,我們可以看到建構函式並沒有提供初始化執行緒池的入口,那麼預設執行緒池的建立,可以跟蹤到

這個執行緒池的 execute 方法,並沒有建立新的執行緒執行 Runnable 方法 ,而是使用 當前執行緒 執行(具體邏輯參考 1.1 )。
因此 EventBus 是不支援非同步事件處理的!

在 dispatchEvent 方法中,比較直接可以看到整體設計中,是支援非同步事件的,我們需要做的就是將 Executor 修改成一個合理的執行緒池,
而 AsyncEventBus 恰恰提供了這個能力。

/**
 * Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
 *
 * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
 *     down the executor after the last event has been posted to this event bus.
 */
public AsyncEventBus(Executor executor) {
  super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}

3、Spring Event

Spring Event 與 Event Bus 預設都是同步執行,支援通過設定 Executors 的方式修改成非同步事件。

核心元件:

  • 事件類:定義事件,繼承 ApplicationEvent 的類成為一個事件類。
  • 釋出者:釋出事件,通過 ApplicationEventPublisher 釋出事件。
  • 監聽者:監聽並處理事件,實現 ApplicationListener 介面或者使用 @EventListener 註解。

由於程式碼過多,可以直接github 下載 進行閱讀,這裡只貼部分關鍵程式碼:

在釋出事件方法: AbstractApplicationContext#publishEvent
會走到 下圖中的 SimpleApplicationEventMulticaster#multicastEvent 執行具體任務的排程。 這裡的設計與
上面的 EventBus 如出一轍,在執行時,通過區分執行緒池進行實際的排程,從而決定 同步|非同步 !

3.1、非同步之ApplicationEventMulticaster

修改 ApplicationEventMulticaster 設定初始執行緒池, 和 EventBus 的解決思路一致:

@Order(0)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
    SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
    eventMulticaster.setTaskExecutor(Executors.newFixedThreadPool(10));
    return eventMulticaster;
}

在Spring 上下文初始化的時候,會將這一個bean,載入到上下文中,

** 存在的問題: ** 由於將整個上下文的 ApplicationEventMulticaster
都替換了,那麼在事件處理的流程上,所有的事件都會以非同步的方式進行,那麼風險的把控就很難做好。不建議,但能用( 畢竟經受過考驗 )

3.2、非同步之@Async

通過實現 AsyncConfigurer 介面,自定義執行緒池,對切面方法,執行反射代理

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

核心原理

程序間非同步呼叫

Dubbo 非同步呼叫

在 rpc 框架中,我們普遍使用的都是 同步呼叫 模式,但是在 Dubbo 的底層實現中,反而是以 非同步呼叫 的方式實現的。先來簡單看看
呼叫鏈路 :

核心程式碼在

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

訊息佇列非同步解耦

在介紹 EventBus 的時候, 我查看了很多文章,都將 EventBus 的 設計模式 描述為 釋出-訂閱
模式。首先這個描述是錯誤的,然後我們來對比一下他們的區別:

從表面上看:

  • 觀察者模式裡,只有兩個角色 —— 觀察者 + 被觀察者
  • 而釋出訂閱模式裡,卻不僅僅只有釋出者和訂閱者兩個角色,還有一個經常被我們忽略的 —— 經紀人Broker

往更深層次講:

  • 觀察者和被觀察者,是鬆耦合的關係
  • 釋出者和訂閱者,則完全不存在耦合

釋出-訂閱 模式:

訊息佇列 能夠幫我們做到 解耦 的效果,通過訊息中介軟體,如 RocketMQ , kafka 和 rabbitMQ 等;
完成訊息的接收和推送,從而達到 非同步處理 的效果。

,從而達到 非同步處理 的效果。

更多內容參考: https://www.gewuweb.com/sitemap.html