1. 程式人生 > >java併發包訊息佇列

java併發包訊息佇列

訊息佇列常用於有生產者和消費者兩類角色的多執行緒同步場景

BlockingQueue也是java.util.concurrent下的主要用來控制執行緒同步的工具。

主要的方法是:put、take一對阻塞存取;add、poll一對非阻塞存取。

         插入:

                   1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則丟擲異常

        2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.

        3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻塞直到BlockingQueue裡面有空間再繼續.

讀取:

 4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null

        5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止

其他

int remainingCapacity();返回佇列剩餘的容量,在佇列插入和獲取的時候,不要瞎搞,數 據可能不準

boolean remove(Object o); 從佇列移除元素,如果存在,即移除一個或者更多,佇列改    變了返回true

public boolean contains(Object o); 檢視佇列是否存在這個元素,存在返回true

int drainTo(Collection<? super E> c); 傳入的集合中的元素,如果在佇列中存在,那麼將     佇列中的元素移動到集合中

int drainTo(Collection<? super E> c, int maxElements); 和上面方法的區別在於,制定了移   動的數量

案例:

package blockingqueue;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {

      this.queue = queue;

   }

   @Override

   public void run() {

      try {

         String consumer = Thread.currentThread().getName();

         System.out.println(consumer);

         //如果佇列為空,會阻塞當前執行緒

         String temp = queue.take();

         System.out.println(consumer + "消費者  get a product:" + temp);

      } catch (Exception e) {

         e.printStackTrace();

      }

   }

}

package blockingqueue;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

   BlockingQueue<String> queue;   

    public Producer(BlockingQueue<String> queue) { 

        this.queue = queue

    }   

    @Override 

    public void run() { 

        try

            String temp = "A Product, 生產執行緒:" 

                    + Thread.currentThread().getName(); 

            queue.put(temp);//如果佇列是滿的話,會阻塞當前執行緒

            System.out.println("生產者 I have made a product: " 

                 + Thread.currentThread().getName());

        } catch (InterruptedException e) { 

            e.printStackTrace(); 

        } 

    }

}

package blockingqueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

public class Test {

   public static void main(String[] args) throws Exception {

      BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

      // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

      // 不設定的話,LinkedBlockingQueue預設大小為Integer.MAX_VALUE

      // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

      Consumer consumer = new Consumer(queue);

      Producer producer = new Producer(queue);

      for (int i = 0; i < 3; i++) {

         new Thread(producer, "Producer" + (i + 1)).start();

      }

      for (int i = 0; i < 5; i++) {

         new Thread(consumer, "Consumer" + (i + 1)).start();

      }

      Thread.sleep(5000);

      new Thread(producer, "Producer" + (5)).start();

   }

}

BlockingQueue有四個具體的實現類,常用的兩種實現類為:

1、ArrayBlockingQueue:一個由陣列支援的有界阻塞佇列,規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的。

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的。

         LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,預設最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在佇列滿的時候會阻塞直到有佇列成員被消費,take方法在佇列空的時候會阻塞,直到有佇列成員被放進來。

LinkedBlockingQueue和ArrayBlockingQueue區別:

LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue.

生產者消費者的示例程式碼:

見程式碼  TestBlockingQueue  TestBlockingQueueConsumer   TestBlockingQueueProducer

package blockingqueue;

import java.util.Random;

import java.util.concurrent.BlockingQueue;

public class TestBlockingQueueProducer implements Runnable {

   BlockingQueue<String> queue;

   Random random = new Random();

   public TestBlockingQueueProducer(BlockingQueue<String> queue) {

      this.queue = queue;

   }

   @Override

   public void run() {

      for (int i = 0; i < 10; i++) {

         try {

            Thread.sleep(random.nextInt(10));

            String task = Thread.currentThread().getName() + " made a product " + i;

            System.out.println(task);

            queue.put(task);  //阻塞方法

         } catch (InterruptedException e) {

            e.printStackTrace();

         }

      }

   }

}

package blockingqueue;

import java.util.Random;

import java.util.concurrent.BlockingQueue;

public class TestBlockingQueueConsumer implements Runnable {

   BlockingQueue<String> queue;

    Random random = new Random();

    public TestBlockingQueueConsumer(BlockingQueue<String> queue){ 

        this.queue = queue; 

    }       

    @Override 

    public void run() { 

        try { 

          Thread.sleep(random.nextInt(10));

          System.out.println(Thread.currentThread().getName()+ "trying...");

            String temp = queue.take();//如果佇列為空,會阻塞當前執行緒 

            int remainingCapacity = queue.remainingCapacity();

            System.out.println(Thread.currentThread().getName() + " get a job " +temp);

            // System.out.println("佇列中的元素個數: "+ remainingCapacity);

        } catch (InterruptedException e) { 

            e.printStackTrace();

        } 

    }

}

package blockingqueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

public class TestBlockingQueue {

   public static void main(String[] args) {

      BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

      // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

      // 不設定的話,LinkedBlockingQueue預設大小為Integer.MAX_VALUE

      // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

      TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);

      TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);

      for (int i = 0; i < 3; i++) {

         new Thread(producer, "Producer" + (i + 1)).start();

      }

      for (int i = 0; i < 5; i++) {

         new Thread(consumer, "Consumer" + (i + 1)).start();

      }

   }

}