1. 程式人生 > >Dart非同步程式設計之Stream

Dart非同步程式設計之Stream

Dart非同步程式設計包含兩部分:Future和Stream
上篇文章已介紹了Future,此篇文章為大家介紹下另一塊–Stream

Dart 非同步事件流 Stream

基本概念

顧名思義,Stream 就是流的意思,表示發出的一系列的非同步資料。可以簡單地認為 Stream 是一個非同步資料來源。它是 Dart 中處理非同步事件流的統一 API

集合與Stream

Dart 中,集合(Iterable或Collection)表示一系列的物件。而 Stream (也就是“流”)也表示一系列的物件,但區別在於 Stream 是非同步的事件流。比如檔案、套接字這種 IO 資料的非阻塞輸入流(input data),或者使用者介面上使用者觸發的動作(UI事件)。

集合可以理解為“拉”模式,比如你有一個 List ,你可以主動地通過迭代獲得其中的每個元素,想要就能拿出來。而 Stream 可以理解為“推”模式,這些非同步產生的事件或資料會推送給你(並不是你想要就能立刻拿到)。這種模式下,你要做的是用一個 listener (即callback)做好資料接收的準備,資料可用時就通知你。

推和拉就是別人給你還是你自己去拿的區別。但是不管如何獲取資料,二者的本質都可以認為是資料的集合(資料可能無限多)。所以,二者有很多相同的方法,稍後介紹

怎麼理解 Stream 中的資料

資料(data)是個非常抽象的概念,可以認為一切皆資料。在程式的世界裡,其實只有兩種東西:資料和對資料的操作。對資料的操作就是對輸入的資料經過一些計算,之後輸出一些新資料。事件(event,如UI上的事件)、計算結果(value,如函式/方法的返回值)以及從檔案或網路獲得的純資料都可以認為是資料(data)。另外,Dart 中的所有事物都是物件,所以資料也一定是某種物件(object)。在本文中,可以認為事件、結果、資料、物件都是一樣的,不用特意區分。

Stream 與 Future

Stream 和 Future 是 Dart 非同步處理的核心 API。Future 表示稍後獲得的一個數據,所有非同步的操作的返回值都用 Future 來表示。但是 Future 只能表示一次非同步獲得的資料。而 Stream 表示多次非同步獲得的資料。比如介面上的按鈕可能會被使用者點選多次,所以按鈕上的點選事件(onClick)就是一個 Stream 。簡單地說,Future將返回一個值,而Stream將返回多次值。

另外一點, Stream 是流式處理,比如 IO 處理的時候,一般情況是每次只會讀取一部分資料(具體取決於實現)。和一次性讀取整個檔案的內容相比,Stream 的好處是處理過程中記憶體佔用較小。而 File 的 readAsString(非同步讀,返回 Future)或 readAsStringSync(同步讀,返回 String)等方法都是一次性讀取整個檔案的內容進來,雖然獲得完整內容處理起來比較方便,但是如果檔案很大的話就會導致記憶體佔用過大的問題。


基本使用

獲得 Stream

Dart 中統一使用 Stream 處理非同步事件流,所以可以獲得 Stream 的地方很多。為了方便演示,這裡先介紹2種獲取 Stream 的方式。

將集合(Iterable)包裝為 Stream
Stream 有3個工廠建構函式:fromFuture、fromIterable 和 periodic,分別可以通過一個 Future、Iterable或定時觸發動作作為 Stream 的事件源構造 Stream。下面的程式碼就是通過一個 List 構造的 Stream。

var data = [1, 2, 3, 4]; 
var stream = new Stream.fromIterable(data); 

對集合的包裝只是簡單地模擬非同步,定時觸發、IO輸入、UI事件等現實情況才是真正的非同步事件。

使用 Stream 讀檔案
讀檔案的方式有多種,其中一種是使用 Stream 獲得檔案內容。File 的方法 openRead() 返回一個 Stream>,List 可以理解為一個 byte array,因為 Dart 中沒有 byte 型別。下面的程式碼將開啟當前程式的原始碼的 Stream 輸入流。

var stream = new File(new Options().script).openRead();

訂閱 Stream

當你有了一個 Stream 時,最常用的功能就是通過 listen() 方法訂閱 Stream 上發出的資料(即事件)。有事件發出時就會通知訂閱者。如果在發出事件的同時新增訂閱者,那麼要在訂閱者在該事件發出後才會生效。如果訂閱者取消了訂閱,那麼它會立即停止接收事件。

我們在接收一個輸入流的時候要面臨幾種不同的情況和狀態,最基本的是處理收到資料,此外上游還可能出現錯誤,以及出現錯誤時是否繼續後續資料的處理,最後在輸入完成的時候還有一個結束狀態。所以 listen 方法的幾個引數分別對應這些情況和狀態:
onData,處理收到的資料的 callback
onError,處理遇到錯誤時的 callback
onDone,結束時的通知 callback
unsubscribeOnError,遇到第一個錯誤時是否停止(也就是取消訂閱),預設為false
onData 是唯一必填引數,也是用的最多的,後面3個是可選的命名引數。

下面我們訂閱一個 Stream 的資料,收到資料時只是簡單地打印出來:

var data = [1, 2, 3, 4];
var stream = new Stream.fromIterable(data);

stream.listen((e)=>print(e), onDone: () => print('Done')); 
// => 1, 2, 3, 4
// => Done

上面的程式碼會先打印出從 Stream 收到的每個數字,最後列印一個‘Done’。

當 Stream 中的所有資料傳送完時,就會觸發 onDone 的呼叫,但提前取消訂閱不會觸發 onDone 。在結束的同時(收到 onDone 事件之前),所有的訂閱者都被取消了訂閱,此時 Stream 上便沒有訂閱者了。允許對一個已經結束了的 Stream 再新增訂閱者(儘管沒什麼意義),此時只會立刻收到一個 onDone 事件。

stream.listen(print, onDone: () {
print('first done');
//listen again
stream.listen(print, onDone:() => print('second done'));
});
// => data: 1,2,3,4, 
// => first done
// => no data, because stream is done
// => second done

上面的程式碼中,首先我們在 onDone 的回撥中列印了 ‘first done’ 表示第一次結束。此時 stream 上已經沒有訂閱者了,但接著我們又再次訂閱了這個 stream。這一次沒有再收到資料,而是馬上打印出了 ‘second done’ 表示第二次訂閱的結束。

高階訂閱管理

前面的示例程式碼會處理 Stream 發出的所有資料,直到 Stream 結束。如果想提前取消處理怎麼辦?listen() 方法會返回一個 StreamSubscription 物件,用於提供對訂閱的管理控制。onData、onError和onDone 這3個方法分別用於設定(如果listen方法中的引數為null)或覆蓋對應的 callback。cancel、pause和resume分別用於取消訂閱、暫停和繼續。比如,可以在 listen 方法中引數置為 null,接著通過 subscription 物件設定 callback 。此外,cancel 方法也重要,要麼一直處理資料直到 stream 結束,要麼提前取消訂閱結束處理。比如使用 Stream 讀檔案,為了使資源得到釋放,要麼讀完整個檔案,要麼使用 subscription 的 cancel 方法取消訂閱(即終止後續資料的讀取)。可以看出,這裡的 cancel 相當於傳統意義上的 close 方法。最後,pause和resume方法是嘗試向資料來源發出暫停和繼續的請求,其意義取決於實際情況,並且不保證一定能生效。比如資料來源能夠支援,或者是帶緩衝實現的 stream 才能做到暫停。

var sub = stream.listen(null);
sub.onData(print);
sub.onError((e)=>print('error $e'));
sub.onDone(()=>print('done'));
// => 1, 2, 3, 4, done

上面的程式碼與前面的 listen 示例程式碼作用相同。

var sub = stream.listen(null);
sub.onData((e){
if(e > 2)
sub.cancel();
else 
print(e);
});
sub.onDone(()=>print('done'));
// => 1, 2
// no 'done', because stream is cancel.

上面的程式碼最後會打印出1和2,但不會打印出‘done’ 。首先,listen 中的引數為 null,也就是沒有訂閱者。然後,通過 listen 的返回者 subscription 物件設定了 onData 和 onDone 的處理,這時才有了訂閱者。在 onData 中,如果收到的數字大於2就取消後續處理,因此到數字 3 的時候就沒有列印 3,而是立即結束了處理,這樣後面的 4 也不會出現了。既然是提前退出,所以 onDone 也是不會觸發的。

Stream 兩種訂閱模式

Stream有兩種訂閱模式:單訂閱(single)和多訂閱(broadcast)。單訂閱就是隻能有一個訂閱者,而廣播是可以有多個訂閱者。這就有點類似於訊息服務(Message Service)的處理模式。單訂閱類似於點對點,在訂閱者出現之前會持有資料,在訂閱者出現之後就才轉交給它。而廣播類似於釋出訂閱模式,可以同時有多個訂閱者,當有資料時就會傳遞給所有的訂閱者,而不管當前是否已有訂閱者存在。

Stream 預設處於單訂閱模式,所以同一個 stream 上的 listen 和其它大多數方法只能呼叫一次,呼叫第二次就會報錯。但 Stream 可以通過 transform() 方法(返回另一個 Stream)進行連續呼叫。通過 Stream.asBroadcastStream() 可以將一個單訂閱模式的 Stream 轉換成一個多訂閱模式的 Stream,isBroadcast 屬性可以判斷當前 Stream 所處的模式。

assert(stream.isBroadcast == false);
stream.first.then(print);
stream.last.then(print);// Bad state: Stream already has subscriber.

上的程式碼需要分別打印出 stream 的第一個資料和最後一個數據,但是單模式 Stream 只能訂閱一次,所以直接出錯了。當然,Stream 是非同步的,所以 first 也沒有打印出來。

var bs = stream.asBroadcastStream();
assert(bs.isBroadcast == true);
bs.first.then(print);
bs.last.then(print);
// OK => 1, 4

上面的程式碼,我們把單模式 Stream 轉成了多訂閱的 Stream,所以可以 first 和 last 都打印出來了。

按前面說的,單訂閱模式會持有資料,多訂閱模式如果沒有及時新增訂閱者則可能丟資料。不過具體取決於 stream 的實現。

new Timer(new Duration(seconds:5), ()=>stream.listen(print));
// after 5 second, it output 1,2,3,4

上面的程式碼利用 Timer 延遲了5秒才訂閱 stream,但仍然輸出了資料。因為我們這裡的這個 stream 是單訂閱模式,它在有訂閱者後才會發出事件。那麼多訂閱模式就一定會漏掉資料嗎?

var bs = stream.asBroadcastStream();
new Timer(new Duration(seconds:5), ()=>bs.listen(print));
// after 5 second, it also output 1,2,3,4
// because asBroadcastStream() is a simple wrap, 
// it don't change the source stream's feature

上面我們把原始的單訂閱模式轉成了多訂閱模式的 Stream,此時可以新增多個訂閱者。我們5秒後才在 broadcast stream 上添加了訂閱者,但它依然輸出了 1,2,3,4 ,並沒有漏掉資料。這其實是因為 asBroadcastStream() 只是對原始 stream 的封裝,並不改變原始 stream 的實現特性。所以這個 broadcast stream 同樣在等待有訂閱者之後才發出資料。但是如果一旦有了第一個訂閱者,然後再延遲新增第二個訂閱者就會漏資料了。

var bs = stream.asBroadcastStream();
// add first listener
new Timer(new Duration(seconds:5), ()=>bs.listen(print));
// after 5 second, it output 1,2,3,4

// add second listener
new Timer(new Duration(seconds:10), ()=>bs.listen(print));
// after 10 second, nothing output, because stream is done

再來看另外一個例子,我們自己來建立一個 Stream。StreamController 用於建立 Stream,它有兩個建構函式,分別用於建立單訂閱模式 Stream 和 多訂閱模式 Stream。然後可以利用 add()、addError() 和 close() 方法傳送事件、傳送錯誤和結束,這三個方法來自 EventSink,是各種 Sink 上的通用方法。

// build single stream
//var controller = new StreamController();

// build broadcast stream
var controller = new StreamController.broadcast();
//send event
controller..add(1)
..add(2)
..add(3)
..add(4);
//send done
controller.close();

var myStream = controller.stream;
new Timer(new Duration(seconds:5), ()=>myStream.listen(print));
//if myStream is single stream, it output 1,2,3,4
//if myStream is broadcast stream, it output nothing, because stream is done.

Stream 的集合特性

前面說過,Stream 和一般的集合類似,都是一組資料,只不過一個是非同步推送,一個是同步拉取。所以他們都很多共同的方法。例如:

stream.any((e) => e > 2).then(print);// stream.any()
print([1,2,3,4].any((e) => e > 2));// iterable.any()
// => true, true

比如 Stream 和 集合 都有 any() 方法,集合是同步的(但是惰性執行,這裡因為有 print 呼叫,所以立刻執行了)並直接返回結果, Stream 上的 any() 方法是非同步的,返回的是 Future 。方法本身的含義都是一樣的。上面的程式碼雖然 stream 的 any 方法在前,但因為是非同步的,所以的輸出在後。

在列舉其它 Stream 和 Iterable 通用的方法:

//常見集合方法
stream.first.then(print);
stream.firstWhere((e)=>e>3, defaultValue:()=>0).then(print);
stream.last.then(print);
stream.lastWhere((e)=>e>3, defaultValue:()=>0).then(print);
stream.length.then(print);
stream.isEmpty.then(print);

stream.any((e) => e > 2).then(print);
stream.every((e) => e > 2).then(print);
stream.contains(3).then(print);
stream.elementAt(2).then(print);
stream.where((e) => e >2).listen(print);

stream.skip(2).listen(print);
stream.skipWhile((e) => e < 2).listen(print);
stream.take(2).listen(print);
stream.takeWhile((e)=>e<3).listen(print);

stream.map((e) => e*2).listen(print);
stream.reduce(0, (p, c) => p + c).then(print);
stream.expand((e) => [e, e]).listen(print);

stream.toList().then(print);
stream.toSet().then(print);

注意以上方法同時只能使用一次,因為是單訂閱模式。此外,如果方法只有一個返回值,即資料收斂型別的方法,那麼返回就是一個 Future。如果是隻是資料轉換的方法,如 map ,返回的還是一個 Stream,只是資料資料的型別和數量變了。看到這麼多 Stream 與 Iterable 相同的方法,大家應該更清楚 Stream 其實也是個資料集合。

通用資料收斂方法

集合中有很多方法只返回一個值,多個數據作為輸入、一個數據作為輸出的方法就是資料收斂的方法。Stream 有一個更通用的收斂方法 pipe() 。pipe() 方法的引數要求是一個 StreamConsumer 介面的實現,該介面只有一個方法: Future consume(Stream stream)

class DataConsumer implements StreamConsumer{
Future consume(Stream stream){
return stream.reduce(0, (c,p)=>c+p);
}
}


stream.pipe(new DataConsumer()).then(print);
// => 10


// equivalent below
stream.reduce(0, (p, c) => p + c).then(print);

面我們自己實現了一個 StreamConsumer ,它只是對 Stream 的資料求和,並返回該結果。這個簡單的例子實際意義不大。但這裡只是為了演示這個通用 pipe() 方法和 StreamConsumer 介面的意義。

通用資料轉換方法

除了資料收斂方法,Stream 也有自己通用的資料轉換方法 transform() 。類似於 Future 的連續呼叫,Stream 也可以連續呼叫。 transform 方法就是把一個 Stream 作為輸入,然後經過計算或資料轉換,輸出為另一個 Stream。另一個 Stream 中的資料型別可以不同於原型別,資料多少也可以不同(比如實現一個數據的 buffer )。

transform 的方法簽名是:
Stream transform(StreamTransformer streamTransformer)

下面我們構造一個 StreamTransformer ,然後使用 Stream 的 transform() 進行轉換:

var transformer = new StreamTransformer(
handleData: (e, sink){
sink.add(e*2);
}
);
stream.transform(transformer).listen(print);

// equivalent below
stream.map((e) => e*2).listen(print);

class MyTransformer extends StreamEventTransformer {
handleData(e, sink){
sink.add(e*2);
}
}


stream.transform(new MyTransformer()).listen(print);

使用 StreamTransformer 介面的工廠建構函式 或者 繼承 StreamEventTransformer 都可以構造一個 transformer 。其本質和我們處理一個 Stream 是一樣的,就要要處理 handleData、handleError 和 handleDone 這三件事。上面的 transform 和 map 方法類似,但是 transform 方法比 map 方法更靈活。map 只能做1對1的轉換,而 transform 並沒有這個要求,因為它是利用 sink 來新增資料,而不是返回轉換結果。transform 方法和 StreamTransformer 介面是一種更通用的設計。

舉個更實用點例子,Dart 中的 StringDecoder 和 StringEncoder 就是一個 StreamTransformer,負責實現 byte stream 和 String stream 之間的轉換。LineTransformer 是切分行的 transformer。比如,使用 Stream 讀檔案需要先將位元組轉換為字元,然後還可以按行讀取。

file.openRead()
.transform(new StringDecoder())
.transform(new LineTransformer()) .listen(your_process);

注意,不管是 Stream.map() 還是 Stream.transform() ,他們都是在做轉換,而非訂閱。對於單模式 Stream ,如果沒有新增訂閱者,那麼轉換方法根本不會執行(可能是由於是惰性執行的緣故)。

stream.map((e){
print(e);
return e*2;
});
// nothing output, because lazy evaluate


class MyTransformer extends StreamEventTransformer {
handleData(e, sink){
print(e);
sink.add(e*2);
}
}
stream.transform(new MyTransformer());
// nothing output, because no subscription

上面的示例中,都在轉換過程中做了輸出,但實際不會輸出內容,因為沒有用 listen 新增訂閱者

參考: https://docs.flutter.io/flutter/dart-async/Stream-class.html