RxJava2 Flowable解決非同步回撥
阿新 • • 發佈:2019-01-01
需求:
1專案中有個需求,假設有N個數據,格式為8位純數字字串("00015756"),
2需要把N個數據按照尾號0-9分組排好。尾號為“0”的一組,尾號為“1”的一組...最多10組3在每一組之前新增新喚醒資料幀”AAAAAAAA“,在結尾新增新睡眠資料幀“BBBBBBBB”
4按照0-9分好組的順序,藍芽把這些資料傳送給硬體
5每組中喚醒幀”AAAAAAAA“發完等待11s,(11s內硬體不回覆任何內容),後傳送資料,
傳送完資料後藍芽會立刻回覆,收到資料後立即傳送下一條資料(不等待3s超時),
如果超過3秒未回覆,就要傳送下一個資料。
最後發本組睡眠幀“BBBBBBBBB”,等待3s(3s內硬體不回覆任何內容).迴圈剩下的組
想了很久,這個邏輯是無法用rxjava一行程式碼寫下來的,因為處理的是兩個資料流。
(藍芽傳送資料,藍芽接收資料)
各位寶寶先看看本寶寶最後完美實現的結果
傳送喚醒幀後間隔11s發資料,傳送睡眠幀後間隔3s發資料,傳送資料幀資料間隔3s,
如果收到回覆資料直接發下一資料
實現程式碼貼上:
public class MainActivity extends AppCompatActivity implements View.OnClickListener { private Button button; Context context; private ArrayList<String> dataList; static Subscription subscription; static Disposable disposable; static String currentData; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); context = this; setContentView(R.layout.activity_main); initView(); generateData(); sortArray(); } /** * 生成原始資料 */ public void generateData() { dataList = new ArrayList<>(); for (int i = 0; i < 30; i++) { dataList.add("000000" + i); } } /** * 資料排序 此方法寫的太醜,可以跳過不看 * 1需要把N個數據按照尾號0-9分組排好。尾號為“0”的一組,尾號為“1”的一組...最多10組 * 2在每一組之前新增新資料”AAAAAAAA“,在結尾新增新資料“BBBBBBBB” * 例如: AAAAAAAA, 00000020, 00000010, 0000000, BBBBBBBB, */ public void sortArray() { ArrayList<String> list0 = new ArrayList<>(); list0.add("AAAAAAAA"); list0.add("BBBBBBBB"); ArrayList<String> list1 = (ArrayList<String>) list0.clone(); ArrayList<String> list2 = (ArrayList<String>) list0.clone(); ArrayList<String> list3 = (ArrayList<String>) list0.clone(); ArrayList<String> list4 = (ArrayList<String>) list0.clone(); ArrayList<String> list5 = (ArrayList<String>) list0.clone(); ArrayList<String> list6 = (ArrayList<String>) list0.clone(); ArrayList<String> list7 = (ArrayList<String>) list0.clone(); ArrayList<String> list8 = (ArrayList<String>) list0.clone(); ArrayList<String> list9 = (ArrayList<String>) list0.clone(); for (String ss : dataList) { if (ss.endsWith("0")) { list0.add(1, ss); } else if (ss.endsWith("1")) { list1.add(1, ss); } else if (ss.endsWith("2")) { list2.add(1, ss); } else if (ss.endsWith("3")) { list3.add(1, ss); } else if (ss.endsWith("4")) { list4.add(1, ss); } else if (ss.endsWith("5")) { list5.add(1, ss); } else if (ss.endsWith("6")) { list6.add(1, ss); } else if (ss.endsWith("7")) { list7.add(1, ss); } else if (ss.endsWith("8")) { list8.add(1, ss); } else if (ss.endsWith("9")) { list9.add(1, ss); } } dataList.clear(); dataList.addAll(list0); dataList.addAll(list1); dataList.addAll(list2); dataList.addAll(list3); dataList.addAll(list4); dataList.addAll(list5); dataList.addAll(list6); dataList.addAll(list7); dataList.addAll(list8); dataList.addAll(list9); System.out.println(dataList); } private void initView() { button = (Button) findViewById(R.id.button); button.setOnClickListener(this); } @Override public void onClick(View v) { switch (v.getId()) { case R.id.button: test(dataList); break; } } //處理業務邏輯的主要程式碼 private void test(List<String> list) { Flowable.fromIterable(list) .onBackpressureBuffer() .observeOn(AndroidSchedulers.mainThread()) .subscribe(new MySubscriber()); } /** * 倒計時3秒,用於藍芽超時未反回時請求下一條資料 * @param time 延時時間 單位 :秒 */ public static void countDown(final int time) { //取消當前延時 if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); } //開始下個延時 Observable.timer(time, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { disposable = d; } @Override public void onNext(Long value) { requestNext(); } @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); } /** * 請求傳送下一個資料 */ public static void requestNext() { if (subscription != null) { subscription.request(1); } } /** * 模擬藍芽收到資料後並解析成功後的回撥 * 收到資料後取消當前3s等待超時,並請求下一條資料 */ public static void onBluetoothReceive() { Observable.intervalRange(0, 30, 1300, 1800, TimeUnit.MILLISECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { //假設是偶數的藍芽資料都收到了硬體的回覆 if (aLong % 2 == 0 && currentData.matches("\\d+")) { Log.e("藍芽收到資料", "時間:" + String.format("%tT%n", new Date())); requestNext(); countDown(3); } } }); } static class MySubscriber implements Subscriber<String> { @Override public void onSubscribe(Subscription s) { subscription = s; s.request(1); onBluetoothReceive(); } @Override public void onNext(String o) { currentData = o; Log.d("藍芽傳送資料", o+"時間:" + String.format("%tT%n", new Date())); //如果傳送資料延遲3s if (o.matches("\\d+")) { countDown(3); } else { //如果傳送喚醒延遲11s (包含A) if (o.contains("A")) countDown(11); else //如果傳送睡眠延遲3s countDown(3); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} } }
總結:
整個程式碼邏輯思路:1:把dataList發射出來,在MySubscriber中的onSubscribe中請求資料,並假裝開啟藍芽資料接收回調
2:在onNext收到資料執行後,呼叫countDown開始超時的延時,如果收到資料,取消當前延時,否則在countDown內又請求資料
3:核心方法 subscription.requset(long n) 要求上游給下游發射多少個數據,這樣可以主動控制
資料下發的速度。
4:Flowable 主要是解決背壓問題的,但我用subscription.requset(long n)來處理非同步回撥,也不矛盾
當我把dataList新增300個元素時也沒有出現什麼異常情況,這其實和資料數量沒啥關係,和處理速度有
關。我是參考:https://www.jianshu.com/p/ff8167c1d191