1. 程式人生 > 實用技巧 >ReentrantLock實現阻塞佇列

ReentrantLock實現阻塞佇列

package com.test;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author: movie
 * @Date: 2020/3/29 15:42
 */
public
class Concurrent { public static void main(String[] args) { Map<String, String> map = new ConcurrentSkipListMap(); map.put("a", "1"); map.put("b", "12"); map.put("aa", "3"); map.put("ba", "4"); map.forEach((k, v) -> System.out.println(String.format("%s:%s", k, v))); MyBlockQueue myBlockQueue
= new MyBlockQueue(5); int len = 20; new Thread(() -> { for (int i = 0; i < len; i++) { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } myBlockQueue.put(i); System.out.println(
"produce:" + i); } }).start(); new Thread(() -> { for (int i = 0; i < len; i++) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } try { myBlockQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("take:" + i); } }).start(); while (Thread.activeCount() > 1) { Thread.yield(); } System.out.println(myBlockQueue.size()); } static class MyBlockQueue { private Object[] items; private int putIndex; private int takeIndex; private int count; private Condition notFull; private Condition notEmpty; private ReentrantLock lock; public MyBlockQueue(int cap) { if (cap <= 0) { throw new IllegalArgumentException(); } this.items = new Object[cap]; this.lock = new ReentrantLock(); notFull = lock.newCondition(); notEmpty = lock.newCondition(); } public void put(Object element) { Objects.requireNonNull(element); ReentrantLock lock = this.lock; lock.lock(); try { while (count == items.length) { notFull.await(); } enqueue(element); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void enqueue(Object element) { Object[] items = this.items; items[putIndex] = element; while (++putIndex == items.length) { putIndex = 0; } count++; notEmpty.signal(); } public Object take() throws InterruptedException { ReentrantLock lock = this.lock; try { lock.lock(); while (count == 0) { notEmpty.await(); } return dequeue(); } finally { lock.unlock(); } } private Object dequeue() { Object[] items = this.items; Object element = items[takeIndex]; items[takeIndex] = null; while (++takeIndex == items.length) { takeIndex = 0; } count--; notFull.signal(); return element; } public int size() { ReentrantLock lock = this.lock; try { lock.lock(); return count; } finally { lock.unlock(); } } } }