並發(3) 容器
容器類中提供的ArrayList、HashMap、HashSet不是線程安全的,並發包下提供了類似功能的線程安全的集合。
類 | 說明 | 原理 |
ConcurrentHashMap | ||
ConcurrentSkipListMap | ||
ConcurrentSkipListSet | ||
CopyOnWriteArrayList | ||
CopyOnWriteArraySet |
隊列是一種數據結構,它以一種先進先出的方式管理數據。如果你試圖向一個 已經滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索,將導致線程阻塞。
隊列操作:
方法 | 說明 |
boolean add(E e) | 添加一個元素到隊列中,如果隊列已滿,則拋出異常 |
E remove() | 移除並返回隊列頭部的元素,如果隊列為空,則拋出異常 |
E element() | 返回隊列頭部的元素,如果隊列為空,則拋出異常 |
boolean offer(E e) | 添加一個元素到隊列中,如果隊列已滿,返回false |
E poll() | 移除並返回隊列頭部的元素,如果隊列為空,返回null |
E peek() | 返回隊列頭部的元素,如果隊列為空,返回null |
void put(E e) | 返回隊列頭部的元素,如果隊列已滿,阻塞 |
E take() | 移除並返回隊列頭部的元素,如果隊列為空,阻塞 |
數組實現
ArrayBlockingQueue
Qeueu的數組實現,底層使用一個數組實現,數組大小不可變,使用一個count表示當前元素個數,使用putIndex表示當前尾的index,使用takeIndex表示當前頭的index,putindex不一定比takeindex大,是在數組連續的循環。使用一個ReentrantLock控制讀寫並發。使用兩個Condition來阻塞數組為空時消費或者數組滿時生產的線程,當數組中有數據或者有空間時喚醒。叠代器中使用一個index指向下一個元素位置;
1 public class ArrayBlockingQueue<E> extendsView CodeAbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 4 //存儲數據數組 5 final Object[] items; 6 //頭index 7 int takeIndex; 8 //尾index 9 int putIndex; 10 //隊列長度 11 int count; 12 //鎖 13 final ReentrantLock lock; 14 //非空條件 15 private final Condition notEmpty; 16 //非滿條件 17 private final Condition notFull; 18 //初始化 19 public ArrayBlockingQueue(int capacity) { 20 this(capacity, false); 21 } 22 public ArrayBlockingQueue(int capacity, boolean fair) { 23 if (capacity <= 0) 24 throw new IllegalArgumentException(); 25 this.items = new Object[capacity]; 26 lock = new ReentrantLock(fair); 27 notEmpty = lock.newCondition(); 28 notFull = lock.newCondition(); 29 } 30 public ArrayBlockingQueue(int capacity, boolean fair, 31 Collection<? extends E> c) { 32 this(capacity, fair); 33 34 final ReentrantLock lock = this.lock; 35 lock.lock(); // Lock only for visibility, not mutual exclusion 36 try { 37 int i = 0; 38 try { 39 for (E e : c) { 40 checkNotNull(e); 41 items[i++] = e; 42 } 43 } catch (ArrayIndexOutOfBoundsException ex) { 44 throw new IllegalArgumentException(); 45 } 46 count = i; 47 putIndex = (i == capacity) ? 0 : i; 48 } finally { 49 lock.unlock(); 50 } 51 } 52 //新增一個元素 53 public boolean add(E e) { 54 return super.add(e); 55 } 56 57 //新增一個元素 58 public boolean offer(E e) { 59 checkNotNull(e); 60 final ReentrantLock lock = this.lock; 61 lock.lock(); 62 try { 63 //如果已滿,返回false,否則添加到隊列中 64 if (count == items.length) 65 return false; 66 else { 67 enqueue(e); 68 return true; 69 } 70 } finally { 71 lock.unlock(); 72 } 73 } 74 //新增一個元素 75 public void put(E e) throws InterruptedException { 76 checkNotNull(e); 77 final ReentrantLock lock = this.lock; 78 lock.lockInterruptibly(); 79 try { 80 //如果已滿,等待,否則添加到隊列中 81 while (count == items.length) 82 notFull.await(); 83 enqueue(e); 84 } finally { 85 lock.unlock(); 86 } 87 } 88 //添加一個元素 89 private void enqueue(E x) { 90 final Object[] items = this.items; 91 items[putIndex] = x; 92 if (++putIndex == items.length) 93 putIndex = 0; 94 count++; 95 notEmpty.signal(); 96 } 97 //獲取頭部元素 98 public E poll() { 99 final ReentrantLock lock = this.lock; 100 lock.lock(); 101 try { 102 //如果為空返回null,否則返回頭部元素 103 return (count == 0) ? null : dequeue(); 104 } finally { 105 lock.unlock(); 106 } 107 } 108 //獲取頭部元素 109 public E take() throws InterruptedException { 110 final ReentrantLock lock = this.lock; 111 lock.lockInterruptibly(); 112 try { 113 //如果為空阻塞,否則返回頭部元素 114 while (count == 0) 115 notEmpty.await(); 116 return dequeue(); 117 } finally { 118 lock.unlock(); 119 } 120 } 121 //獲取頭部元素 122 private E dequeue() { 123 // assert lock.getHoldCount() == 1; 124 // assert items[takeIndex] != null; 125 final Object[] items = this.items; 126 @SuppressWarnings("unchecked") 127 E x = (E) items[takeIndex]; 128 items[takeIndex] = null; 129 if (++takeIndex == items.length) 130 takeIndex = 0; 131 count--; 132 if (itrs != null) 133 itrs.elementDequeued(); 134 notFull.signal(); 135 return x; 136 } 137 }
LinkedBlockingQueue
Qeueu的列表實現,底層使用一個單向鏈表實現。大小可變也可以設定大小。使用一個節點作為頭節點,不存儲數據;使用一個節點作為尾節點,存儲數據,使用兩個ReentrantLock分別控制讀寫鎖,頭節點不存儲數據也是避免讀寫並發沖突,count使用了原子變量也是為了避免讀寫沖突。
因為使用了讀寫鎖,所以吞吐量要比ArrayBlockingQueue好。對內存和GC的影響會大於ArrayBlockingQueue。
1 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 private static final long serialVersionUID = -6903933977591709194L; 4 //鏈表節點 5 static class Node<E> { 6 E item; 7 8 /** 9 * One of: 10 * - the real successor Node 11 * - this Node, meaning the successor is head.next 12 * - null, meaning there is no successor (this is the last node) 13 */ 14 Node<E> next; 15 16 Node(E x) { item = x; } 17 } 18 //隊列容量 19 private final int capacity; 20 //隊列元素個數 21 private final AtomicInteger count = new AtomicInteger(); 22 //頭節點 23 transient Node<E> head; 24 //尾節點 25 private transient Node<E> last; 26 //取數據鎖 27 private final ReentrantLock takeLock = new ReentrantLock(); 28 29 private final Condition notEmpty = takeLock.newCondition(); 30 //存數據鎖 31 private final ReentrantLock putLock = new ReentrantLock(); 32 33 private final Condition notFull = putLock.newCondition(); 34 35 public LinkedBlockingQueue() { 36 this(Integer.MAX_VALUE); 37 } 38 39 public LinkedBlockingQueue(int capacity) { 40 if (capacity <= 0) throw new IllegalArgumentException(); 41 this.capacity = capacity; 42 last = head = new Node<E>(null); 43 } 44 //存放元素 45 public void put(E e) throws InterruptedException { 46 if (e == null) throw new NullPointerException(); 47 int c = -1; 48 Node<E> node = new Node<E>(e); 49 final ReentrantLock putLock = this.putLock; 50 final AtomicInteger count = this.count; 51 putLock.lockInterruptibly(); 52 try { 53 //如果到達容量,則等待 54 while (count.get() == capacity) { 55 notFull.await(); 56 } 57 enqueue(node); 58 c = count.getAndIncrement(); 59 if (c + 1 < capacity) 60 notFull.signal(); 61 } finally { 62 putLock.unlock(); 63 } 64 if (c == 0) 65 signalNotEmpty(); 66 } 67 68 //存放元素 69 public boolean offer(E e) { 70 if (e == null) throw new NullPointerException(); 71 final AtomicInteger count = this.count; 72 if (count.get() == capacity) 73 return false; 74 int c = -1; 75 Node<E> node = new Node<E>(e); 76 final ReentrantLock putLock = this.putLock; 77 putLock.lock(); 78 try { 79 //如果到達容量,返回false 80 if (count.get() < capacity) { 81 enqueue(node); 82 c = count.getAndIncrement(); 83 if (c + 1 < capacity) 84 notFull.signal(); 85 } 86 } finally { 87 putLock.unlock(); 88 } 89 if (c == 0) 90 signalNotEmpty(); 91 return c >= 0; 92 } 93 private void enqueue(Node<E> node) { 94 // assert putLock.isHeldByCurrentThread(); 95 // assert last.next == null; 96 last = last.next = node; 97 } 98 //獲取元素 99 public E take() throws InterruptedException { 100 E x; 101 int c = -1; 102 final AtomicInteger count = this.count; 103 final ReentrantLock takeLock = this.takeLock; 104 takeLock.lockInterruptibly(); 105 try { 106 //如果隊列為空,則等待 107 while (count.get() == 0) { 108 notEmpty.await(); 109 } 110 x = dequeue(); 111 c = count.getAndDecrement(); 112 if (c > 1) 113 notEmpty.signal(); 114 } finally { 115 takeLock.unlock(); 116 } 117 if (c == capacity) 118 signalNotFull(); 119 return x; 120 } 121 //獲取元素 122 public E poll() { 123 final AtomicInteger count = this.count; 124 if (count.get() == 0) 125 return null; 126 E x = null; 127 int c = -1; 128 final ReentrantLock takeLock = this.takeLock; 129 takeLock.lock(); 130 try { 131 //如果隊列為空,返回null 132 if (count.get() > 0) { 133 x = dequeue(); 134 c = count.getAndDecrement(); 135 if (c > 1) 136 notEmpty.signal(); 137 } 138 } finally { 139 takeLock.unlock(); 140 } 141 if (c == capacity) 142 signalNotFull(); 143 return x; 144 } 145 private E dequeue() { 146 // assert takeLock.isHeldByCurrentThread(); 147 // assert head.item == null; 148 Node<E> h = head; 149 Node<E> first = h.next; 150 h.next = h; // help GC 151 head = first; 152 E x = first.item; 153 first.item = null; 154 return x; 155 } 156 //獲取元素 157 public E peek() { 158 if (count.get() == 0) 159 return null; 160 final ReentrantLock takeLock = this.takeLock; 161 takeLock.lock(); 162 try { 163 Node<E> first = head.next; 164 if (first == null) 165 return null; 166 else 167 return first.item; 168 } finally { 169 takeLock.unlock(); 170 } 171 } 172 }View Code
SychronousQueue
PriorityQueue
優先級隊列,按照自定義的優先級順序進行讀取。底層使用一個數組實現。大小可變且無邊界
DelayQueue
延時隊列,底層使用一個PriorityQueue實現,使用一個ReentrantLock控制並發。其實就是在每次往優先級隊列中添加元素,然後以元素的delay/過期值作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從隊列裏取出來都是最先要過期的元素
1 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> 2 implements BlockingQueue<E> { 3 4 private final transient ReentrantLock lock = new ReentrantLock(); 5 private final PriorityQueue<E> q = new PriorityQueue<E>(); 6 private Thread leader = null; 7 private final Condition available = lock.newCondition(); 8 9 public DelayQueue() {} 10 //向隊列中添加元素,因為是無邊界隊列,所以不會拋異常 11 public boolean add(E e) { 12 return offer(e); 13 } 14 //向隊列中添加元素 15 public boolean offer(E e) { 16 final ReentrantLock lock = this.lock; 17 lock.lock(); 18 try { 19 q.offer(e); 20 if (q.peek() == e) { 21 leader = null; 22 available.signal(); 23 } 24 return true; 25 } finally { 26 lock.unlock(); 27 } 28 } 29 //向隊列中添加元素,因為是無邊界隊列,所以不會阻塞 30 public void put(E e) { 31 offer(e); 32 } 33 //從隊列總獲取數據 34 public E poll() { 35 final ReentrantLock lock = this.lock; 36 lock.lock(); 37 try { 38 E first = q.peek(); 39 if (first == null || first.getDelay(NANOSECONDS) > 0) 40 return null; 41 else 42 return q.poll(); 43 } finally { 44 lock.unlock(); 45 } 46 } 47 //從隊列總獲取數據,如果隊列為空,則阻塞 48 public E take() throws InterruptedException { 49 final ReentrantLock lock = this.lock; 50 lock.lockInterruptibly(); 51 try { 52 for (;;) { 53 E first = q.peek(); 54 if (first == null) 55 available.await(); 56 else { 57 long delay = first.getDelay(NANOSECONDS); 58 if (delay <= 0) 59 return q.poll(); 60 first = null; // don‘t retain ref while waiting 61 if (leader != null) 62 available.await(); 63 else { 64 Thread thisThread = Thread.currentThread(); 65 leader = thisThread; 66 try { 67 available.awaitNanos(delay); 68 } finally { 69 if (leader == thisThread) 70 leader = null; 71 } 72 } 73 } 74 } 75 } finally { 76 if (leader == null && q.peek() != null) 77 available.signal(); 78 lock.unlock(); 79 } 80 } 81 82 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 83 long nanos = unit.toNanos(timeout); 84 final ReentrantLock lock = this.lock; 85 lock.lockInterruptibly(); 86 try { 87 for (;;) { 88 E first = q.peek(); 89 if (first == null) { 90 if (nanos <= 0) 91 return null; 92 else 93 nanos = available.awaitNanos(nanos); 94 } else { 95 long delay = first.getDelay(NANOSECONDS); 96 if (delay <= 0) 97 return q.poll(); 98 if (nanos <= 0) 99 return null; 100 first = null; // don‘t retain ref while waiting 101 if (nanos < delay || leader != null) 102 nanos = available.awaitNanos(nanos); 103 else { 104 Thread thisThread = Thread.currentThread(); 105 leader = thisThread; 106 try { 107 long timeLeft = available.awaitNanos(delay); 108 nanos -= delay - timeLeft; 109 } finally { 110 if (leader == thisThread) 111 leader = null; 112 } 113 } 114 } 115 } 116 } finally { 117 if (leader == null && q.peek() != null) 118 available.signal(); 119 lock.unlock(); 120 } 121 } 122 // 123 public E peek() { 124 final ReentrantLock lock = this.lock; 125 lock.lock(); 126 try { 127 return q.peek(); 128 } finally { 129 lock.unlock(); 130 } 131 } 132 133 134 }View Code
並發(3) 容器