java9特性例項1
阿新 • • 發佈:2019-01-10
import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; public class FlowDemo { public static void main(String[] args) throws Exception { //https://www.cnblogs.com/cyl048/p/9073980.html // 1. 定義釋出者, 釋出的資料型別是 Integer // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 介面 SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定義訂閱者 Subscriber<Integer> subscriber = new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 儲存訂閱關係, 需要用它來給釋出者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("接受到資料: " + item); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } // 處理完呼叫request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理資料的時候產生了異常) throwable.printStackTrace(); // 我們可以告訴釋出者, 後面不接受資料了 this.subscription.cancel(); } @Override public void onComplete() { // 全部資料處理完了(釋出者關閉了) System.out.println("處理完了!"); } }; // 3. 釋出者和訂閱者 建立訂閱關係 publiser.subscribe(subscriber); // 4. 生產資料, 併發布 // 這裡忽略資料生產過程 for (int i = 0; i < 1000; i++) { System.out.println("生成資料:" + i); // submit是個block方法 publiser.submit(i); } // 5. 結束後 關閉釋出者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主執行緒延遲停止, 否則資料沒有消費就退出 Thread.currentThread().join(1000); // debug的時候, 下面這行需要有斷點 // 否則主執行緒結束無法debug System.out.println(); } }