1. 程式人生 > 實用技巧 >阻塞佇列概念及其簡單使用

阻塞佇列概念及其簡單使用

什麼是阻塞佇列

  概念

    當佇列滿的時候,插入元素的執行緒被阻塞,直到佇列不滿

    佇列為空的時候,獲取元素的執行緒被阻塞,直到佇列不為空

    生產者消費者模式也是阻塞佇列的一種體現

  常用阻塞佇列

    ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列

    LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列

    PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列

    DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列

    SynchronousQueue:一個不儲存元素的阻塞佇列

    LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列

    LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列

  常用方法

方法 丟擲異常 返回值 一直阻塞 超時退出
插入方法 add offer put offer(timeout)
移除方法 remove poll take poll(timeout)
檢查方法 element peek N/A N/A

  介紹:

    ArrayBlockingQueue:按照先進先出的原則,初始化需要設定大小

    LinkedBlockingQueue:按照先進先出的原則,可以不設定初始大小,不設定,預設就會Integer.MAX_VALUE

    區別:

      鎖:ArrayBlockingQueue,只使用了一把鎖,而LinkedBlockingQueue使用了2把

      實現:ArrayBlockingQueue直接插入元素,LinkedBlockingQueue需要轉換

      初始化:ArrayBlockingQueue必須指定初始化大小,LinkedBlockingQueue可以不指定

    PriorityBlockingQueue:預設採用自然順序排序,就1<2,A>B... ,如果想自定義順序有要麼實現CompateTo方法,要麼指定構造引數Comparator,如果一致,PriorityBlockingQueue不保證優先順序順序

    DelayQueue:支援延時獲取元素的阻塞佇列,內部使用PriorityBlockingQueue,元素必須實現Delayed接口才允許放入

    SynchronousQueue:每一個put操作,都要等待一個take操作,類似於等待喚醒機制

    LinkedTransferQueue:相比於LinkedBlockingQueue多了兩個方法,transfer和tryTransfer,transfer在往佇列裡面插入元素之前,先看一下有沒有消費者在等著,如果有,直接把元素交給消費者,省去了插入和,取出的步驟,tryTransfer嘗試把元素給消費者,無論消費者是否接收,都會立即返回,transfer必須要消費者消費之後,才會返回

    LinkedBlockingDeque:可以從佇列的頭部和尾部都可以插入和移除元素,可以在有競爭的時候從兩側獲取元素,減少一半的時間,在ForkJoin中的工作密取機制就是採用的LinkedBlockingDeque實現的,凡是方法名帶了First的都是從頭去拿,帶了Last都是從尾部拿,不加的話,預設add等於addLast,remove等於removeFirst,take方法等於takeFirst

    建議:儘量採用有界阻塞佇列,因為在流量高峰的時候,無界阻塞佇列會不斷的增加佔用資源,可能導致伺服器宕機

  案例:

    使用DelayQueue實現延時訂單功能

定義元素容器類

package org.dance.day5.bq;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * 存放到佇列的元素
 * @author ZYGisComputer
 */
public class ItemVo<T> implements Delayed {

    /**
     * 延時時間 單位為毫秒
     */
    private long activeTime;

    private T data;

    public ItemVo(long activeTime, T data) {
        // 將時間轉換為納秒 並 + 當前納秒 = 將超時時長轉換為超時時刻
        this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,TimeUnit.MILLISECONDS) + System.nanoTime();
        this.data = data;
    }

    /**
     * 返回元素的剩餘時間
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    /**
     * 按照剩餘時間排序
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        return (d==0)?0:(d>0)?1:-1;
    }

    public long getActiveTime() {
        return activeTime;
    }

    public T getData() {
        return data;
    }
}

定義訂單實體類

package org.dance.day5.bq;

/**
 * 訂單實體類
 * @author ZYGisComputer
 */
public class Order {

    private final String orderNo;

    private final double orderMoney;

    public Order(String orderNo, double orderMoney) {
        this.orderNo = orderNo;
        this.orderMoney = orderMoney;
    }

    public String getOrderNo() {
        return orderNo;
    }

    public double getOrderMoney() {
        return orderMoney;
    }
}

定義訂單放入執行緒

package org.dance.day5.bq;

import java.util.concurrent.DelayQueue;

/**
 * 將訂單放入佇列
 * @author ZYGisComputer
 */
public class PutOrder implements Runnable{

    private DelayQueue<ItemVo<Order>> queue;

    public PutOrder(DelayQueue<ItemVo<Order>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        // 5秒到期
        Order order = new Order("1",16.00d);
        // 包裝成ItemVo
        ItemVo<Order> itemVo = new ItemVo<>(5000,order);
        queue.offer(itemVo);
        System.out.println("訂單5秒後到期:"+order.getOrderNo());

        // 5秒到期
        order = new Order("2",18.00d);
        // 包裝成ItemVo
        itemVo = new ItemVo<>(8000,order);
        queue.offer(itemVo);
        System.out.println("訂單8秒後到期:"+order.getOrderNo());
    }
}

定義訂單消費執行緒

package org.dance.day5.bq;

import java.util.concurrent.DelayQueue;

/**
 * 獲取訂單
 *
 * @author ZYGisComputer
 */
public class FetchOrder implements Runnable {

    private DelayQueue<ItemVo<Order>> queue;

    public FetchOrder(DelayQueue<ItemVo<Order>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            ItemVo<Order> poll = null;
            try {
                poll = queue.take();
                Order data = poll.getData();
                String orderNo = data.getOrderNo();
                System.out.println(orderNo + "已經消費");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
        }
    }
}

測試類

package org.dance.day5.bq;

import java.util.Queue;
import java.util.concurrent.DelayQueue;

/**
 * @author ZYGisComputer
 */
public class Test {

    public static void main(String[] args) throws InterruptedException {

        DelayQueue<ItemVo<Order>> queue = new DelayQueue<>();
        new Thread(new PutOrder(queue)).start();
        new Thread(new FetchOrder(queue)).start();
        for (int i = 0; i < 15; i++) {
            Thread.sleep(500);
            System.out.println(i*500);
        }

    }

}

執行結果

訂單5秒後到期:1
訂單8秒後到期:2
0
500
1000
1500
2000
2500
3000
3500
4000
4500
1已經消費
5000
5500
6000
6500
7000
2已經消費

通過執行結果分析,可以得知,兩個放入阻塞佇列的元素已經成功被消費掉了,當然這種阻塞佇列也可以用於別的場景,比如實現本地延時快取,限時繳費....

作者:彼岸舞

時間:2021\01\11

內容關於:併發程式設計

本文來源於網路,只做技術分享,一概不負任何責任