1. 程式人生 > >java9特性例項1

java9特性例項1

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class FlowDemo {

	public static void main(String[] args) throws Exception {
		//https://www.cnblogs.com/cyl048/p/9073980.html
		// 1. 定義釋出者, 釋出的資料型別是 Integer
		// 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 介面
		SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

		// 2. 定義訂閱者
		Subscriber<Integer> subscriber = new Subscriber<Integer>() {

			private Subscription subscription;

			@Override
			public void onSubscribe(Subscription subscription) {
				// 儲存訂閱關係, 需要用它來給釋出者響應
				this.subscription = subscription;

				// 請求一個數據
				this.subscription.request(1);
			}

			@Override
			public void onNext(Integer item) {
				// 接受到一個數據, 處理
				System.out.println("接受到資料: " + item);

				try {
					TimeUnit.SECONDS.sleep(3);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
				// 處理完呼叫request再請求一個數據
				this.subscription.request(1);

				// 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
				// this.subscription.cancel();
			}

			@Override
			public void onError(Throwable throwable) {
				// 出現了異常(例如處理資料的時候產生了異常)
				throwable.printStackTrace();

				// 我們可以告訴釋出者, 後面不接受資料了
				this.subscription.cancel();
			}

			@Override
			public void onComplete() {
				// 全部資料處理完了(釋出者關閉了)
				System.out.println("處理完了!");
			}
		};

		// 3. 釋出者和訂閱者 建立訂閱關係
		publiser.subscribe(subscriber);

		// 4. 生產資料, 併發布
		// 這裡忽略資料生產過程
		for (int i = 0; i < 1000; i++) {
			System.out.println("生成資料:" + i);
			// submit是個block方法
			publiser.submit(i);
		}

		// 5. 結束後 關閉釋出者
		// 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
		publiser.close();

		// 主執行緒延遲停止, 否則資料沒有消費就退出
		Thread.currentThread().join(1000);
		// debug的時候, 下面這行需要有斷點
		// 否則主執行緒結束無法debug
		System.out.println();
	}
}