1. 程式人生 > >並發(3) 容器

並發(3) 容器

tab lse ole 鏈表 單向鏈表 nts exception 邊界 peek

  容器類中提供的ArrayList、HashMap、HashSet不是線程安全的,並發包下提供了類似功能的線程安全的集合。

說明 原理
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet

  隊列是一種數據結構,它以一種先進先出的方式管理數據。如果你試圖向一個 已經滿了的阻塞隊列中添加一個元素或者是從一個空的阻塞隊列中移除一個元索,將導致線程阻塞。

技術分享圖片

隊列操作:

方法 說明
boolean add(E e) 添加一個元素到隊列中,如果隊列已滿,則拋出異常
E remove() 移除並返回隊列頭部的元素,如果隊列為空,則拋出異常
E element() 返回隊列頭部的元素,如果隊列為空,則拋出異常
boolean offer(E e) 添加一個元素到隊列中,如果隊列已滿,返回false
E poll() 移除並返回隊列頭部的元素,如果隊列為空,返回null
E peek() 返回隊列頭部的元素,如果隊列為空,返回null
void put(E e) 返回隊列頭部的元素,如果隊列已滿,阻塞
E take() 移除並返回隊列頭部的元素,如果隊列為空,阻塞

數組實現

ArrayBlockingQueue

  Qeueu的數組實現,底層使用一個數組實現,數組大小不可變,使用一個count表示當前元素個數,使用putIndex表示當前尾的index,使用takeIndex表示當前頭的index,putindex不一定比takeindex大,是在數組連續的循環。使用一個ReentrantLock控制讀寫並發。使用兩個Condition來阻塞數組為空時消費或者數組滿時生產的線程,當數組中有數據或者有空間時喚醒。叠代器中使用一個index指向下一個元素位置;

技術分享圖片
  1 public class ArrayBlockingQueue<E> extends
AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 4 //存儲數據數組 5 final Object[] items; 6 //頭index 7 int takeIndex; 8 //尾index 9 int putIndex; 10 //隊列長度 11 int count; 12 // 13 final ReentrantLock lock; 14 //非空條件 15 private final Condition notEmpty; 16 //非滿條件 17 private final Condition notFull; 18 //初始化 19 public ArrayBlockingQueue(int capacity) { 20 this(capacity, false); 21 } 22 public ArrayBlockingQueue(int capacity, boolean fair) { 23 if (capacity <= 0) 24 throw new IllegalArgumentException(); 25 this.items = new Object[capacity]; 26 lock = new ReentrantLock(fair); 27 notEmpty = lock.newCondition(); 28 notFull = lock.newCondition(); 29 } 30 public ArrayBlockingQueue(int capacity, boolean fair, 31 Collection<? extends E> c) { 32 this(capacity, fair); 33 34 final ReentrantLock lock = this.lock; 35 lock.lock(); // Lock only for visibility, not mutual exclusion 36 try { 37 int i = 0; 38 try { 39 for (E e : c) { 40 checkNotNull(e); 41 items[i++] = e; 42 } 43 } catch (ArrayIndexOutOfBoundsException ex) { 44 throw new IllegalArgumentException(); 45 } 46 count = i; 47 putIndex = (i == capacity) ? 0 : i; 48 } finally { 49 lock.unlock(); 50 } 51 } 52 //新增一個元素 53 public boolean add(E e) { 54 return super.add(e); 55 } 56 57 //新增一個元素 58 public boolean offer(E e) { 59 checkNotNull(e); 60 final ReentrantLock lock = this.lock; 61 lock.lock(); 62 try { 63 //如果已滿,返回false,否則添加到隊列中 64 if (count == items.length) 65 return false; 66 else { 67 enqueue(e); 68 return true; 69 } 70 } finally { 71 lock.unlock(); 72 } 73 } 74 //新增一個元素 75 public void put(E e) throws InterruptedException { 76 checkNotNull(e); 77 final ReentrantLock lock = this.lock; 78 lock.lockInterruptibly(); 79 try { 80 //如果已滿,等待,否則添加到隊列中 81 while (count == items.length) 82 notFull.await(); 83 enqueue(e); 84 } finally { 85 lock.unlock(); 86 } 87 } 88 //添加一個元素 89 private void enqueue(E x) { 90 final Object[] items = this.items; 91 items[putIndex] = x; 92 if (++putIndex == items.length) 93 putIndex = 0; 94 count++; 95 notEmpty.signal(); 96 } 97 //獲取頭部元素 98 public E poll() { 99 final ReentrantLock lock = this.lock; 100 lock.lock(); 101 try { 102 //如果為空返回null,否則返回頭部元素 103 return (count == 0) ? null : dequeue(); 104 } finally { 105 lock.unlock(); 106 } 107 } 108 //獲取頭部元素 109 public E take() throws InterruptedException { 110 final ReentrantLock lock = this.lock; 111 lock.lockInterruptibly(); 112 try { 113 //如果為空阻塞,否則返回頭部元素 114 while (count == 0) 115 notEmpty.await(); 116 return dequeue(); 117 } finally { 118 lock.unlock(); 119 } 120 } 121 //獲取頭部元素 122 private E dequeue() { 123 // assert lock.getHoldCount() == 1; 124 // assert items[takeIndex] != null; 125 final Object[] items = this.items; 126 @SuppressWarnings("unchecked") 127 E x = (E) items[takeIndex]; 128 items[takeIndex] = null; 129 if (++takeIndex == items.length) 130 takeIndex = 0; 131 count--; 132 if (itrs != null) 133 itrs.elementDequeued(); 134 notFull.signal(); 135 return x; 136 } 137 }
View Code

LinkedBlockingQueue

  Qeueu的列表實現,底層使用一個單向鏈表實現。大小可變也可以設定大小。使用一個節點作為頭節點,不存儲數據;使用一個節點作為尾節點,存儲數據,使用兩個ReentrantLock分別控制讀寫鎖,頭節點不存儲數據也是避免讀寫並發沖突,count使用了原子變量也是為了避免讀寫沖突。

  因為使用了讀寫鎖,所以吞吐量要比ArrayBlockingQueue好。對內存和GC的影響會大於ArrayBlockingQueue。

技術分享圖片
  1 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  2         implements BlockingQueue<E>, java.io.Serializable {
  3     private static final long serialVersionUID = -6903933977591709194L;
  4     //鏈表節點
  5     static class Node<E> {
  6         E item;
  7 
  8         /**
  9          * One of:
 10          * - the real successor Node
 11          * - this Node, meaning the successor is head.next
 12          * - null, meaning there is no successor (this is the last node)
 13          */
 14         Node<E> next;
 15 
 16         Node(E x) { item = x; }
 17     }
 18     //隊列容量
 19     private final int capacity;
 20     //隊列元素個數
 21     private final AtomicInteger count = new AtomicInteger();
 22     //頭節點
 23     transient Node<E> head;
 24     //尾節點
 25     private transient Node<E> last;
 26     //取數據鎖
 27     private final ReentrantLock takeLock = new ReentrantLock();
 28     
 29     private final Condition notEmpty = takeLock.newCondition();
 30     //存數據鎖
 31     private final ReentrantLock putLock = new ReentrantLock();
 32 
 33     private final Condition notFull = putLock.newCondition();
 34     
 35     public LinkedBlockingQueue() {
 36         this(Integer.MAX_VALUE);
 37     }
 38 
 39     public LinkedBlockingQueue(int capacity) {
 40         if (capacity <= 0) throw new IllegalArgumentException();
 41         this.capacity = capacity;
 42         last = head = new Node<E>(null);
 43     }
 44     //存放元素
 45     public void put(E e) throws InterruptedException {
 46         if (e == null) throw new NullPointerException();
 47         int c = -1;
 48         Node<E> node = new Node<E>(e);
 49         final ReentrantLock putLock = this.putLock;
 50         final AtomicInteger count = this.count;
 51         putLock.lockInterruptibly();
 52         try {
 53             //如果到達容量,則等待
 54             while (count.get() == capacity) {
 55                 notFull.await();
 56             }
 57             enqueue(node);
 58             c = count.getAndIncrement();
 59             if (c + 1 < capacity)
 60                 notFull.signal();
 61         } finally {
 62             putLock.unlock();
 63         }
 64         if (c == 0)
 65             signalNotEmpty();
 66     }
 67     
 68     //存放元素
 69     public boolean offer(E e) {
 70         if (e == null) throw new NullPointerException();
 71         final AtomicInteger count = this.count;
 72         if (count.get() == capacity)
 73             return false;
 74         int c = -1;
 75         Node<E> node = new Node<E>(e);
 76         final ReentrantLock putLock = this.putLock;
 77         putLock.lock();
 78         try {
 79             //如果到達容量,返回false
 80             if (count.get() < capacity) {
 81                 enqueue(node);
 82                 c = count.getAndIncrement();
 83                 if (c + 1 < capacity)
 84                     notFull.signal();
 85             }
 86         } finally {
 87             putLock.unlock();
 88         }
 89         if (c == 0)
 90             signalNotEmpty();
 91         return c >= 0;
 92     }
 93     private void enqueue(Node<E> node) {
 94         // assert putLock.isHeldByCurrentThread();
 95         // assert last.next == null;
 96         last = last.next = node;
 97     }
 98     //獲取元素
 99     public E take() throws InterruptedException {
100         E x;
101         int c = -1;
102         final AtomicInteger count = this.count;
103         final ReentrantLock takeLock = this.takeLock;
104         takeLock.lockInterruptibly();
105         try {
106             //如果隊列為空,則等待
107             while (count.get() == 0) {
108                 notEmpty.await();
109             }
110             x = dequeue();
111             c = count.getAndDecrement();
112             if (c > 1)
113                 notEmpty.signal();
114         } finally {
115             takeLock.unlock();
116         }
117         if (c == capacity)
118             signalNotFull();
119         return x;
120     }
121     //獲取元素
122     public E poll() {
123         final AtomicInteger count = this.count;
124         if (count.get() == 0)
125             return null;
126         E x = null;
127         int c = -1;
128         final ReentrantLock takeLock = this.takeLock;
129         takeLock.lock();
130         try {
131             //如果隊列為空,返回null
132             if (count.get() > 0) {
133                 x = dequeue();
134                 c = count.getAndDecrement();
135                 if (c > 1)
136                     notEmpty.signal();
137             }
138         } finally {
139             takeLock.unlock();
140         }
141         if (c == capacity)
142             signalNotFull();
143         return x;
144     }
145     private E dequeue() {
146         // assert takeLock.isHeldByCurrentThread();
147         // assert head.item == null;
148         Node<E> h = head;
149         Node<E> first = h.next;
150         h.next = h; // help GC
151         head = first;
152         E x = first.item;
153         first.item = null;
154         return x;
155     }
156     //獲取元素
157     public E peek() {
158         if (count.get() == 0)
159             return null;
160         final ReentrantLock takeLock = this.takeLock;
161         takeLock.lock();
162         try {
163             Node<E> first = head.next;
164             if (first == null)
165                 return null;
166             else
167                 return first.item;
168         } finally {
169             takeLock.unlock();
170         }
171     }
172 }
View Code

SychronousQueue

  

PriorityQueue

  優先級隊列,按照自定義的優先級順序進行讀取。底層使用一個數組實現。大小可變且無邊界

DelayQueue

  延時隊列,底層使用一個PriorityQueue實現,使用一個ReentrantLock控制並發。其實就是在每次往優先級隊列中添加元素,然後以元素的delay/過期值作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從隊列裏取出來都是最先要過期的元素

技術分享圖片
  1 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  2         implements BlockingQueue<E> {
  3 
  4     private final transient ReentrantLock lock = new ReentrantLock();
  5     private final PriorityQueue<E> q = new PriorityQueue<E>();
  6     private Thread leader = null;
  7     private final Condition available = lock.newCondition();
  8 
  9     public DelayQueue() {}
 10     //向隊列中添加元素,因為是無邊界隊列,所以不會拋異常
 11     public boolean add(E e) {
 12         return offer(e);
 13     }
 14     //向隊列中添加元素
 15     public boolean offer(E e) {
 16         final ReentrantLock lock = this.lock;
 17         lock.lock();
 18         try {
 19             q.offer(e);
 20             if (q.peek() == e) {
 21                 leader = null;
 22                 available.signal();
 23             }
 24             return true;
 25         } finally {
 26             lock.unlock();
 27         }
 28     }
 29     //向隊列中添加元素,因為是無邊界隊列,所以不會阻塞
 30     public void put(E e) {
 31         offer(e);
 32     }
 33     //從隊列總獲取數據
 34     public E poll() {
 35         final ReentrantLock lock = this.lock;
 36         lock.lock();
 37         try {
 38             E first = q.peek();
 39             if (first == null || first.getDelay(NANOSECONDS) > 0)
 40                 return null;
 41             else
 42                 return q.poll();
 43         } finally {
 44             lock.unlock();
 45         }
 46     }
 47     //從隊列總獲取數據,如果隊列為空,則阻塞
 48     public E take() throws InterruptedException {
 49         final ReentrantLock lock = this.lock;
 50         lock.lockInterruptibly();
 51         try {
 52             for (;;) {
 53                 E first = q.peek();
 54                 if (first == null)
 55                     available.await();
 56                 else {
 57                     long delay = first.getDelay(NANOSECONDS);
 58                     if (delay <= 0)
 59                         return q.poll();
 60                     first = null; // don‘t retain ref while waiting
 61                     if (leader != null)
 62                         available.await();
 63                     else {
 64                         Thread thisThread = Thread.currentThread();
 65                         leader = thisThread;
 66                         try {
 67                             available.awaitNanos(delay);
 68                         } finally {
 69                             if (leader == thisThread)
 70                                 leader = null;
 71                         }
 72                     }
 73                 }
 74             }
 75         } finally {
 76             if (leader == null && q.peek() != null)
 77                 available.signal();
 78             lock.unlock();
 79         }
 80     }
 81 
 82     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 83         long nanos = unit.toNanos(timeout);
 84         final ReentrantLock lock = this.lock;
 85         lock.lockInterruptibly();
 86         try {
 87             for (;;) {
 88                 E first = q.peek();
 89                 if (first == null) {
 90                     if (nanos <= 0)
 91                         return null;
 92                     else
 93                         nanos = available.awaitNanos(nanos);
 94                 } else {
 95                     long delay = first.getDelay(NANOSECONDS);
 96                     if (delay <= 0)
 97                         return q.poll();
 98                     if (nanos <= 0)
 99                         return null;
100                     first = null; // don‘t retain ref while waiting
101                     if (nanos < delay || leader != null)
102                         nanos = available.awaitNanos(nanos);
103                     else {
104                         Thread thisThread = Thread.currentThread();
105                         leader = thisThread;
106                         try {
107                             long timeLeft = available.awaitNanos(delay);
108                             nanos -= delay - timeLeft;
109                         } finally {
110                             if (leader == thisThread)
111                                 leader = null;
112                         }
113                     }
114                 }
115             }
116         } finally {
117             if (leader == null && q.peek() != null)
118                 available.signal();
119             lock.unlock();
120         }
121     }
122     //
123     public E peek() {
124         final ReentrantLock lock = this.lock;
125         lock.lock();
126         try {
127             return q.peek();
128         } finally {
129             lock.unlock();
130         }
131     }
132 
133 
134 }
View Code

並發(3) 容器