1. 程式人生 > >RxJava2 Flowable解決非同步回撥

RxJava2 Flowable解決非同步回撥

需求:

1專案中有個需求,假設有N個數據,格式為8位純數字字串("00015756"),

2需要把N個數據按照尾號0-9分組排好。尾號為“0”的一組,尾號為“1”的一組...最多10組
3在每一組之前新增新喚醒資料幀”AAAAAAAA“,在結尾新增新睡眠資料幀“BBBBBBBB”
4按照0-9分好組的順序,藍芽把這些資料傳送給硬體
5每組中喚醒幀”AAAAAAAA“發完等待11s,(11s內硬體不回覆任何內容),後傳送資料,
 傳送完資料後藍芽會立刻回覆,收到資料後立即傳送下一條資料(不等待3s超時),
 如果超過3秒未回覆,就要傳送下一個資料。
 最後發本組睡眠幀“BBBBBBBBB”,等待3s(3s內硬體不回覆任何內容).迴圈剩下的組


想了很久,這個邏輯是無法用rxjava一行程式碼寫下來的,因為處理的是兩個資料流。
(藍芽傳送資料,藍芽接收資料)

各位寶寶先看看本寶寶最後完美實現的結果


傳送喚醒幀後間隔11s發資料,傳送睡眠幀後間隔3s發資料,傳送資料幀資料間隔3s,

如果收到回覆資料直接發下一資料

實現程式碼貼上:

public class MainActivity extends AppCompatActivity implements View.OnClickListener {

    private Button button;
    Context context;
    private  ArrayList<String> dataList;
    static Subscription subscription;
    static Disposable disposable;
    static String currentData;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        context = this;
        setContentView(R.layout.activity_main);
        initView();
        generateData();
        sortArray();
    }

    /**
     *    生成原始資料
     */
    public  void generateData() {
        dataList = new ArrayList<>();
        for (int i = 0; i < 30; i++) {
            dataList.add("000000" + i);
        }
    }

    /**
     * 資料排序  此方法寫的太醜,可以跳過不看
     * 1需要把N個數據按照尾號0-9分組排好。尾號為“0”的一組,尾號為“1”的一組...最多10組
     * 2在每一組之前新增新資料”AAAAAAAA“,在結尾新增新資料“BBBBBBBB”
     * 例如: AAAAAAAA, 00000020, 00000010, 0000000, BBBBBBBB,
     */
    public  void sortArray() {
        ArrayList<String> list0 = new ArrayList<>();
        list0.add("AAAAAAAA");
        list0.add("BBBBBBBB");
        ArrayList<String> list1 = (ArrayList<String>) list0.clone();
        ArrayList<String> list2 = (ArrayList<String>) list0.clone();
        ArrayList<String> list3 = (ArrayList<String>) list0.clone();
        ArrayList<String> list4 = (ArrayList<String>) list0.clone();
        ArrayList<String> list5 = (ArrayList<String>) list0.clone();
        ArrayList<String> list6 = (ArrayList<String>) list0.clone();
        ArrayList<String> list7 = (ArrayList<String>) list0.clone();
        ArrayList<String> list8 = (ArrayList<String>) list0.clone();
        ArrayList<String> list9 = (ArrayList<String>) list0.clone();
        for (String ss : dataList) {
            if (ss.endsWith("0")) {
                list0.add(1, ss);
            } else if (ss.endsWith("1")) {
                list1.add(1, ss);
            } else if (ss.endsWith("2")) {
                list2.add(1, ss);
            } else if (ss.endsWith("3")) {
                list3.add(1, ss);
            } else if (ss.endsWith("4")) {
                list4.add(1, ss);
            } else if (ss.endsWith("5")) {
                list5.add(1, ss);
            } else if (ss.endsWith("6")) {
                list6.add(1, ss);
            } else if (ss.endsWith("7")) {
                list7.add(1, ss);
            } else if (ss.endsWith("8")) {
                list8.add(1, ss);
            } else if (ss.endsWith("9")) {
                list9.add(1, ss);
            }
        }
        dataList.clear();
        dataList.addAll(list0);
        dataList.addAll(list1);
        dataList.addAll(list2);
        dataList.addAll(list3);
        dataList.addAll(list4);
        dataList.addAll(list5);
        dataList.addAll(list6);
        dataList.addAll(list7);
        dataList.addAll(list8);
        dataList.addAll(list9);
        System.out.println(dataList);
    }


    private void initView() {
        button = (Button) findViewById(R.id.button);
        button.setOnClickListener(this);
    }

    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.button:
                test(dataList);
                break;
        }
    }

    //處理業務邏輯的主要程式碼 
    private void test(List<String> list) {
        Flowable.fromIterable(list)
                .onBackpressureBuffer()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new MySubscriber());
    }


    /**
     * 倒計時3秒,用於藍芽超時未反回時請求下一條資料
     * @param time 延時時間 單位 :秒
     */
    public static void countDown(final int time) {
        //取消當前延時
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
        //開始下個延時
        Observable.timer(time, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onNext(Long value) {
                        requestNext();
                    }

                    @Override
                    public void onError(Throwable e) {}

                    @Override
                    public void onComplete() {}
                });
    }

    /**
     * 請求傳送下一個資料
     */
    public static void requestNext() {
        if (subscription != null) {
            subscription.request(1);
        }
    }


    /**
     * 模擬藍芽收到資料後並解析成功後的回撥
     * 收到資料後取消當前3s等待超時,並請求下一條資料
     */
    public static void onBluetoothReceive() {
        Observable.intervalRange(0, 30, 1300, 1800, TimeUnit.MILLISECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        //假設是偶數的藍芽資料都收到了硬體的回覆
                        if (aLong % 2 == 0 && currentData.matches("\\d+")) {
                            Log.e("藍芽收到資料", "時間:" + String.format("%tT%n", new Date()));
                            requestNext();
                            countDown(3);
                        }
                    }
                });
    }


    static class MySubscriber implements Subscriber<String> {
        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            s.request(1);
            onBluetoothReceive();
        }

        @Override
        public void onNext(String o) {
            currentData = o;
            Log.d("藍芽傳送資料", o+"時間:" + String.format("%tT%n", new Date()));
            //如果傳送資料延遲3s
            if (o.matches("\\d+")) {
                countDown(3);
            } else {
                //如果傳送喚醒延遲11s (包含A)
                if (o.contains("A"))
                    countDown(11);
                else
                    //如果傳送睡眠延遲3s
                    countDown(3);
            }
        }

        @Override
        public void onError(Throwable t) {}

        @Override
        public void onComplete() {}
    }




}

總結:

整個程式碼邏輯思路:
1:把dataList發射出來,在MySubscriber中的onSubscribe中請求資料,並假裝開啟藍芽資料接收回調
2:在onNext收到資料執行後,呼叫countDown開始超時的延時,如果收到資料,取消當前延時,否則在countDown內又請求資料
3:核心方法 subscription.requset(long n) 要求上游給下游發射多少個數據,這樣可以主動控制
資料下發的速度。
4:Flowable 主要是解決背壓問題的,但我用subscription.requset(long n)來處理非同步回撥,也不矛盾
當我把dataList新增300個元素時也沒有出現什麼異常情況,這其實和資料數量沒啥關係,和處理速度有
關。我是參考:https://www.jianshu.com/p/ff8167c1d191