1. 程式人生 > >Reactor 2.0 響應式時效性流式資料框架學習

Reactor 2.0 響應式時效性流式資料框架學習

Reactor 介紹

        Reactor 是一個基礎庫,可用它構建時效性流式資料應用,或者有低延遲容錯性要求的微/納/皮級服務。

        簡單說,Reactor 是一個輕量級 JVM 基礎庫,幫助你的服務或應用高效,非同步地傳遞訊息。

"高效"是指什麼?

  • 訊息從A傳遞到B時,產生很少的記憶體垃圾,甚至不產生。
  • 解決消費者處理訊息的效率低於生產者時帶來的溢位問題。
  • 儘可能提供非阻塞非同步流

        非同步程式設計很難,而像 JVM 這類提供眾多可選引數的平臺則尤其困難。 Reactor 旨在幫助大多數用例真正非阻塞地執行。我們提供的 API 比 JDK 的 java.util.concurrent

庫低階原語更高效。Reactor 提供了下列功能的替代函式 (並建議不使用 JDK 原生語句):

  • 阻塞等待:如 Future.get()

  • 不安全的資料訪問:如 ReentrantLock.lock()

  • 異常冒泡:如 try…​catch…​finally

  • 同步阻塞:如 synchronized{ }

  • Wrapper分配(GC 壓力):如 new Wrapper<T>(event)

    當訊息傳遞效率成為系統性能瓶頸的時候(10k msg/s,100k msg/s,1M...),非阻塞機制就顯得尤為重要。 雖然這個有理論支援 (參見 Amdahl’s Law

    ),但讀起來太無聊了。我們舉例說明,比如你用了個 Executor 方法:

    private ExecutorService  threadPool = Executors.newFixedThreadPool(8);
    
    final List<T> batches = new ArrayList<T>();
    
    Callable<T> t = new Callable<T>() { // *1
    
            public T run() {
                    synchronized(batches) { // *2
                            T result = callDatabase(msg); // *3
                            batches.add(result);
                            return result;
                    }
            }
    };
    
    Future<T> f = threadPool.submit(t); // *4
    T result = f.get() // *5
    
  • 1.Callable 分配 -- 可能導致 GC 壓力。
  • 2.同步過程強制每個執行緒執行停-檢查操作。
  • 3.訊息的消費可能比生產慢。
  • 4.使用執行緒池(ThreadPool)將任務傳遞給目標執行緒 -- 通過 FutureTask 方式肯定會產生 GC 壓力。
  • 5.阻塞直至 callDatabase() 回撥。

在這個簡單的例子中,很容易指出為什麼擴容是很有限的:

  • 分配物件可能產生GC壓力,特別是當任務執行時間過長。

    • 每次 GC 暫停都會影響全域性效能。
  • 預設,佇列是無界的,任務會因為資料庫呼叫而堆積。

    • 積壓雖然不會直接導致記憶體洩漏,但會帶來嚴重副作用:GC 暫停時要掃描更多的物件;有丟失重要資料位的風險;等等 …

    • 典型鏈式佇列節點分配時會產生大量記憶體壓力。

  • 阻塞回調容易產生惡性迴圈。

    • 阻塞回調會降低訊息生產者的效率。在實踐中,任務提交後需要等待結果返回,此時流式過程幾乎演變為同步的了。

    • 會話過程丟擲的任何帶資料儲存的異常都會以不受控的方式被傳遞給生產者,否定了任何通常線上程邊界附近可用的容錯性。

  • 要實現完全非阻塞是很難辦到的,尤其是在有著類似微服務架構這樣時髦綽號的分散式系統的世界裡。因此 Reactor 做了部分妥協,嘗試利用最優的可用模式,使開發者覺得他們是在寫非同步納米服務,而不是什麼數學論文。

    沒有什麼傳播得比光快(除了緋聞和網紅貓的視訊),正如到了某個階段,延遲是每一個系統到都要面對的實實在在的問題。為此:

    Reactor 提供的框架可以幫助減輕應用中由延遲產生的副作用,只需要增加一點點開銷:

    • 使用了一些聰明的結構,通過啟動預分配策略解決執行時分配問題

    • 通過確定資訊傳遞主結構的邊界,避免任務的無限堆疊;

    • 採用主流的響應與事件驅動構架模式,提供包含反饋在內的非阻塞端對端流

    • 引入新的 Reactive Streams標準,拒絕超過當前容量請求,從而保證限制結構的有效性;

    • IPC上也使用了類似理念,提供對流控制友好的非阻塞 IO 驅動

    • 開放了幫助開發者們以零副作用方式組織他們程式碼的函式介面,藉助這些函式來處理容錯性和執行緒安全。

Reactor版本與依賴

        <!-- reactor -->
        <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>2.0.8.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-stream -->
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-stream</artifactId>
            <version>2.0.8.RELEASE</version>
        </dependency>

個人程式碼

package com.brillilab.test;

import org.junit.Test;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.DispatcherSupplier;
import reactor.core.processor.RingBufferProcessor;
import reactor.core.support.Assert;
import reactor.rx.Stream;
import reactor.rx.Streams;

import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class ReactorTest {

    /**
     * Function 包 的使用
     */
    @Test
    public void functionTest(){

        Consumer<String> consumer = value ->{
            System.out.println(value);
        };

        Supplier<Integer> supplier = () -> 123;

        BiConsumer<Consumer<String> , String> biConsumer = (callback ,value) -> {
            for (int i=0; i<10; i++){
                callback.accept(value);
            }
        };

        Function<Integer,String> transformation = integer -> ""+integer;

        biConsumer.accept(
                consumer,
                transformation.apply(
                        supplier.get()
                )
        );
    }

    /**
     *  Reactor 庫的Environment 和 Dispatcher
     */
    @Test
    public void EnvironmentAndDispatcher (){

        Environment.initialize();

        DispatcherSupplier supplier=Environment.newCachedDispatchers(2);

        Dispatcher d1=supplier.get();
        Dispatcher d2=supplier.get();
        Dispatcher d3=supplier.get();
        Dispatcher d4=supplier.get();

        Assert.isTrue(d1 == d3 && d2 == d4);
        supplier.shutdown();

        // 建立並註冊帶3個排程者的新池
        DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3,"myPool");
        DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool");

        Assert.isTrue( supplier1 == supplier2 );
        supplier1.shutdown();
    }

    /**
     * Reactor 庫 Steams 使用 進行流式程式設計
     */
    @Test
    public void ReactorStreamTest(){

        Environment.initialize();

        Stream<String> st=Streams.just("Hello","World","!");

        st.dispatchOn(Environment.cachedDispatcher())
                .map(String::toUpperCase)
                .consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s));

    }

    /**
     * Reactor 流式程式設計之流合併 wrap concat 方法的使用
     */
    @Test
    public void ReactorStreamTest2(){

        RingBufferProcessor<String> processor=RingBufferProcessor.create();

        Stream<String> st1=Streams.just("hello ");
        Stream<String> st2=Streams.just("world ");

        //一個針對傳入 Publisher.subscribe(Subscriber<T>) 引數的下發 Stream。
        // 只支援正確使用 Reactive Stream 協議的格式正確的 Publisher:
        //onSubscribe > onNext* > (onError | onComplete)
        Stream<String> st3=Streams.wrap(processor);

        //如果一個 Publisher<T> 已經發送了,在處理下一個等待處理的 Publisher<T> 之前要等待這個 onComplete()。
        //其名稱就暗示它對於串聯不同的資料來源並保持順序正確,這些方面的作用。
        Streams.concat(st1,st2,st3)
                .reduce((prev,next) -> prev + next)
                .consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s));

        processor.onNext("!");
        processor.onNext("!!!");
        processor.onComplete();
    }
}

================================================================================================

相關推薦

Reactor 2.0 響應時效性資料框架學習

Reactor 介紹         Reactor 是一個基礎庫,可用它構建時效性流式資料應用,或者有低延遲和容錯性要求的微/納/皮級服務。         簡單說,Reactor 是一個輕量級 JVM 基礎庫,幫助你的服務或應用高效,非同步地傳遞訊息。 "高效"是

Vczh Library++3.0開發紀事之xml和json讀寫

    每次完成一個任務的時候,都要看看有沒有什麼潛在的可以把功能是現成庫的地方。這十分有利於提高自己的水平。但至於你寫出來的庫會不會有人用,那是另一回事情了。     這次為了完成一個多程式語言+多自然語言的文件編寫工具,不得不做一個可以一次生成一大批文字檔案的模板結構出來。有了模板必然有元資

自動排班系統2.0(基於網路實現的排班系統,附詳細註解)

更新說明:     將輸入優化了下,不必再輸入幹部數和總班數,比較排序部分採用了氣泡排序。 程式碼: #include <iostream> #include <cstdio> #include <queue> #include &l

PaddlePaddle 1.2.0 釋出,百度開源的深度學習框架

   PaddlePaddle 1.2.0 釋出了,PaddlePaddle (PArallel Distributed Deep LEarning,並行分散式深度學習)是百度研發的深度學習平臺,具有易用、高效、靈活和可伸縮等特點,為百度內部多項產品提供深度學習演算法支援。Pa

Spring Boot 1.5.19 和 2.0.8 釋出,Spring 應用開發框架

   Spring 應用開發框架 Spring Boot 1.5.19 和 2.0.8 版本釋出。 Spring Boot 2.0.8 此次更新內容主要是: 新特性:優化了 字串順序(StringSequence ); Bug 修復:使用 ssl and http/2 時

Sqoop1.4.5+hadoop2.2.0進行Mysql到HDFS的資料轉換

正如上一篇記錄的那樣,採用sqoop1.99.4 + hadoop2.2.0來將mysql的表資料匯入到HDFS的時候,死活沒有找到如何制定欄位分隔符號,這才有了試用sqoop1.4.5這番折騰。從架構上來將,Sqoop2確實在安全性等方面有很好的提升,但是Sqoop2目前

響應編程庫Reactor 3 Reference Guide參考文檔中文版(v3.2.0

響應式編程Project Reactor 是 Spring WebFlux 的禦用響應式編程庫,與 Spring 是兄弟項目。 關於如何基於Spring的組件進行響應式應用的開發,歡迎閱讀系列文章《響應式Spring的道法術器》。 官方參考文檔地址:http://projectreactor.io/doc

2響應——響應Spring的道法術器

響應式編程 Spring WebFlux 本系列文章索引:《響應式Spring的道法術器》。前情提要: 什麽是響應式編程 1.2 響應式流 上一節留了一個坑——為啥不用Java Stream來進行數據流的操作? 原因在於,若將其用於響應式編程中,是有局限性的。比如如下兩個需要面對的問題: Web

(12)Reactor 3 自定義數據——響應Spring的道法術器

響應式編程 Spring WebFlux 本系列文章索引《響應式Spring的道法術器》前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範本文源碼 2.2 自定義數據流 這一小節介紹如何通過定義相應的事件(onNext、onError和onComplete) 創建一個 Flux 或

新增 – GPU 支援的 Amazon AppStream 2.0 處理例項

我們在 re:Invent 2016 釋出了 Amazon AppStream 2.0。利用此應用程式流式處理服務可將 Windows 應用程式交付到桌面瀏覽器。 AppStream 2.0 是完全託管的,並通過執行一般用途的應用程式提供一致的可擴充套件效能,提供經過優化的計

講課:Webflux響應程式設計(SpringBoot 2.0新特性)

學習webflux前需要學習三個基礎: 函數語言程式設計和lambda表示式 Stream流程式設計 Reactive stream 響應式流 接下來進入學習 一、函數語言程式設計和lambda表示式 1. 什麼是函數語言程式設計 函數語言程式設計是

靜態布局、自適應布局、布局、響應布局、彈性布局等的概念和區別

自動 min-width isp 過程 簡單 屏幕 前端 默認字體 應該 一、靜態布局(Static Layout)即傳統Web設計,網頁上的所有元素的尺寸一律使用px作為單位。 1、布局特點:不管瀏覽器尺寸具體是多少,網頁布局始終按照最初寫代碼時的布局來顯示。常規的p

spark2.2.0偽分布

執行 .sh work export pan exp source p地址 全分布式 環境:   hadoop2.6.0   jdk1.8   ubuntu 14.04 64位 1 安裝scala環境 版本是scala-2.10.6,官網下載地址http://www.sca

靜態布局、自適應布局、布局、響應布局、彈性布局簡析

彈性 href 窗口 遮擋 正常 阮一峰 布局 變化 發生   近期學習,有很多感想,有時候看似相近的概念,其實意義卻不相同。所以學習要針對不同的名詞有明確的區分意識。   抽空時間,打算學習下display:flex;本以為就是一個小小的知識點,正式去研究的時候,才發現d

靜態、自適應、響應

OS 分享 col 優缺點 log pan 大小 設計 支持 靜態布局:頁面的寬高是固定的,超出頁面的內容用戶手動拖拽。手機端的話,另外設計一套。 自適應布局:用媒體查詢(@media)根據不同屏幕定義不同的寬高,比靜態布局少了用戶手動拖拽的幾率。 (因為媒體查詢範圍不能做

分布 NewSQL 數據庫 TiDB 2.0 RC3:提高兼容性

cme log gpo geo HR mas 3D lan MQ o2qss0蒙輾夜史銜魏http://wenda.cngold.org/question743374.htmnpjfz7研運某顆馗陌http://wenda.cngold.org/question743281

(15)Reactor 3 Operators——響應Spring的道法術器

響應式編程 Spring WebFlux 本系列文章索引《響應式Spring的道法術器》前情提要 Reactor 3快速上手 | 響應式流規範 2.5 Reactor 3 Operators 雖然響應式流規範中對Operator(以下均稱作”操作符“)並未做要求,但是與RxJava等響應式開發庫一樣

(16)Reactor的測試——響應Spring的道法術器

Spring WebFlux 響應式編程 本系列文章索引《響應式Spring的道法術器》前情提要:Reactor 3快速上手 | 響應式流規範本文測試源碼 2.6 測試 在非常重視DevOps的今天,以及一些奉行TDD的團隊中,自動化測試是保證代碼質量的重要手段。要進行Reactor的測試,首先要確

jQuery超酷響應瀑布效果

function else if syntax RR wid lock etx fse get 參考 http://www.sucaihuo.com/js/74.html <script src="scripts/blocksit.min.js">&l

Spring5.0響應編程入門

JD 字符 增加 配置服務 控制 數據類型 pin 開發人員 簡單 引言? 響應式編程是一種面向數據流和變化傳播的編程範式。使用它可以在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。我們可以使用聲明的方式構建應用