1. 程式人生 > >java阻塞佇列

java阻塞佇列

對訊息的處理有些麻煩,要保證各種確認。為了確保訊息的100%傳送成功,筆者在之前的基礎上做了一些改進。其中要用到多執行緒,用於重複傳送資訊。

所以查了很多關於執行緒安全的東西,也看到了阻塞佇列,發現這個模式很不錯,可惜我目前用不到。

關於這個的講解已經很多了,阻塞這個,就是當佇列中沒有資料的時候,執行緒讀取的話會等待。當佇列中的資料滿的時候,執行緒新增資料的時候,也會等待。

有個例子很生動形象,往盤子裡面放雞蛋,只能放固定數目的。盤子裡面沒有雞蛋,無法從中拿出來。當盤子裡滿了,也放不進去。直到被拿出去才能在放。

程式碼如下,這裡設定的是一個盤子最多放10個雞蛋:

package com.thread.two;

import java.util.ArrayList; import java.util.List; public class Plate { List<Object> eggs=new ArrayList<Object>();public synchronized Object getEgg(){while(eggs.size()==0){ try { wait(); } catch (InterruptedException e) {
// TODO Auto-generated catch block e.printStackTrace(); } } Object egg=null; for (int i = 0; i < 10; i++) { egg=eggs.get(i); System.out.println("拿到雞蛋........."); } //Object egg=eggs.get(0); eggs.clear(); notify();
//System.out.println("拿到雞蛋........."); return egg; } public synchronized void putEgg(Object egg){ while(eggs.size()>9){ try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } eggs.add(egg); notify(); System.out.println("放入雞蛋........."); } static class AddThread extends Thread{ private Plate plate; private Object egg=new Object(); public AddThread(Plate plate){ this.plate=plate; } public void run(){ for (int i = 0; i < 1000; i++) { plate.putEgg(egg); } } } static class GetThread extends Thread{ private Plate plate; public GetThread(Plate plate){ this.plate=plate; } public void run(){ for (int i = 0; i < 1000; i++) { plate.getEgg(); } } } public static void main(String[] args) throws InterruptedException { Plate plate=new Plate(); Thread add=new Thread(new AddThread(plate)); Thread get=new Thread(new GetThread(plate)); add.start(); get.start(); add.join(); get.join(); System.out.println("測試結束"); } }

這個例子很形象,用執行緒實現了上面所說的。

java現在有concurrent包,裡面有很多現成的可以用的類,很多是執行緒安全的,這樣,像上面寫的put或者get,都不需要自己寫同步方法了,這些類已經包裝好了。

這裡有一個ArrayBlockingQueue的例子,和上面實現的差不多。

首先是兩個執行緒,分別是put和get。

ThreadPut:

package com.thread.three;

import java.util.concurrent.ArrayBlockingQueue;

public class ThreadPut implements Runnable{
    private ArrayBlockingQueue<String> abq=null;
    public ThreadPut(ArrayBlockingQueue<String> abq){
        this.abq=abq;
    }
    public void run() {
        // TODO Auto-generated method stub
        while(true){
            System.out.println("要向佇列中存資料了");
            try {
                Thread.sleep(1000);
                abq.put("hi");
                System.out.println("存入後,資料一共為:"+abq.size());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

ThreadGet:

package com.thread.three;

import java.util.concurrent.ArrayBlockingQueue;

public class ThreadGet extends Thread {
    ArrayBlockingQueue<String> abq=null;
    public ThreadGet(ArrayBlockingQueue<String> abq){
        this.abq=abq;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        while(true){
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("我要從佇列中取資料了");
            String msg=null;
            if (abq.size()>0) {
                msg=abq.remove();
            }
            System.out.println("佇列中取得的資料為:"+msg+",佇列中還有一共:"+abq.size());
        }
    }
}

測試類:

public class ArrayBlockQueueApp {
    
    public static void main(String[] args) {
        ExecutorService  es=Executors.newCachedThreadPool();
        ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(10);
        ThreadGet tGet=new ThreadGet(abq);
        Thread tPut=new Thread(new ThreadPut(abq));
        es.execute(tGet);
        es.execute(tPut);
    }
}

這些佇列放訊息的話挺不錯的。