jdk提供的阻塞佇列BlockingQueue下的五個實現簡單操作以及介紹
package cn.yarne.com.base.test;
import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.Vector; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock;
import cn.yarne.com.base.test.BlockingQueueTest.Comp; import cn.yarne.com.base.test.BlockingQueueTest.DelayTest;
public class BlockingQueueTest {
/** * ArrayBlockingQueue是一個有界佇列 需要了解的是put,add,offer的區別: * put 如果加不進去會阻塞,直到加進去為止 add 如果加不進去會丟擲錯誤 offer 如果加不進去會返回false * @throws InterruptedException */ private static void ArrayBlockingQueueTest() throws InterruptedException { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(5); arrayBlockingQueue.add("a"); arrayBlockingQueue.add("b"); arrayBlockingQueue.add("c"); arrayBlockingQueue.add("d"); arrayBlockingQueue.add("e"); new Thread(() -> { try { Thread.sleep(3000); String take = arrayBlockingQueue.take(); System.out.println(take); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }).start(); arrayBlockingQueue.put("f");
for (Iterator iterator = arrayBlockingQueue.iterator(); iterator.hasNext();) { String string = (String) iterator.next(); System.out.println(string); } }
/** * LinkedBlockingQueueTest是一個無界的阻塞佇列,但是也可以給界限,其他的和ArrayBlockingQueue相似 */ public static void LinkedBlockingQueueTest() { LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(); linkedBlockingQueue.add("a"); linkedBlockingQueue.add("b"); linkedBlockingQueue.add("c"); boolean offer = linkedBlockingQueue.offer("d"); System.out.println(offer); linkedBlockingQueue.offer("e"); }
/** * SynchronousQueue是一個沒有緩衝的佇列,這個佇列生產者生產的資料會直接被消費者進行消費 * 也就是說,如果沒有消費者(take)方法,那麼這個生產者是不能生產(put)資料的,會一直阻塞 */ public static void SynchronousQueueTest() throws InterruptedException { SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); System.out.println(synchronousQueue.size()); new Thread(() -> { try { Thread.sleep(3000); String take = synchronousQueue.take(); System.out.println(take + "取出"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}).start(); new Thread(() -> { try { synchronousQueue.put("新增"); System.out.println(synchronousQueue.size()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }
}).start(); }
class Comp implements Comparable<Comp> { private String name; private int id;
public Comp(String name, int id) { super(); this.name = name; this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getId() { return id; }
public void setId(int id) { this.id = id; }
@Override public int compareTo(Comp o) { return this.id - o.getId(); }
@Override public String toString() { return "Comp [name=" + name + ", id=" + id + "]"; }
}
/** * PriorityBlockingQueue 是一個基於優先順序的阻塞佇列 * 說到優先順序肯定是需要排序,所以說想要實現排序,傳入的物件必須實現Comparable介面 * 這個佇列的排序是在add或者take的時候,每次把最高優先順序的一個放在第一位,準備被下一次take出去 * * * @throws InterruptedException */ public static void PriorityBlockingQueueTest() throws InterruptedException { BlockingQueueTest blockingQueueTest = new BlockingQueueTest(); PriorityBlockingQueue<Comp> priorityBlockingQueue = new PriorityBlockingQueue<>();
Comp comp2 = blockingQueueTest.new Comp("李四", 8); Comp comp1 = blockingQueueTest.new Comp("張三", 3); Comp comp3 = blockingQueueTest.new Comp("王五", 6); Comp comp4 = blockingQueueTest.new Comp("王五", 7);
priorityBlockingQueue.add(comp1); priorityBlockingQueue.add(comp2); priorityBlockingQueue.add(comp3); priorityBlockingQueue.add(comp4);
for (Iterator iterator = priorityBlockingQueue.iterator(); iterator.hasNext();) { Comp comp = (Comp) iterator.next(); System.out.println(comp.toString()); }
Comp take = priorityBlockingQueue.take(); Comp take2 = priorityBlockingQueue.take(); System.out.println(take.toString()); System.out.println(take2.toString());
}
class DelayTest implements Delayed { private int id; private long time;
public DelayTest(int id, long time) { super(); this.id = id; this.time = time; }
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public long getTime() { return time; }
public void setTime(long time) { this.time = time; }
@Override public int compareTo(Delayed o) { DelayTest delayTest = (DelayTest) o; return this.id - delayTest.id; }
@Override public long getDelay(TimeUnit unit) { long ti=this.time-System.currentTimeMillis(); System.out.println(ti); return ti; }
@Override public String toString() { return "DelayTest [id=" + id + ", time=" + time + "]"; } }
/** * DelayQueue 是一個帶有延時時間的佇列,元素只有時間到了,take的時候才能取到元素 * 所以DelayQueue中存入的元素必須實現Delayed介面,主要判斷時間是否到期的方法是 * getDelay() 所以這個方法中的返回值必須是一個不斷減少的值,例如lthis.time-System.currentTimeMillis(); * 否則的話會導致take方法等不到過期的元素,所以會阻塞 * 這是一個沒有大小限制的佇列,使用場景很多,比如快取超時處理,任務超時的處理,空閒連線的關閉 * @throws InterruptedException */ public static void DelayQueueTest() throws InterruptedException { DelayQueue<DelayTest> delayQueue = new DelayQueue<>(); BlockingQueueTest blockingQueueTest = new BlockingQueueTest(); DelayTest delayTest1 = blockingQueueTest.new DelayTest(3, System.currentTimeMillis()+3000); DelayTest delayTest2 = blockingQueueTest.new DelayTest(5, System.currentTimeMillis()+5000); DelayTest delayTest3 = blockingQueueTest.new DelayTest(10, System.currentTimeMillis()+10000); delayQueue.add(delayTest2); delayQueue.add(delayTest3); delayQueue.add(delayTest1);
for (Iterator iterator = delayQueue.iterator(); iterator.hasNext();) { DelayTest delayTest = (DelayTest) iterator.next(); System.out.println(delayTest.toString()); } DelayTest take = delayQueue.take(); System.out.println(take.toString()); DelayTest take1 = delayQueue.take(); System.out.println(take1.toString()); DelayTest take2 = delayQueue.take(); System.out.println(take2.toString()); } public static void main(String[] args) throws InterruptedException { }
}