?集合工具類使用線程
1. hashmap源碼解析與並發可能遇見的問題
1.HashMap中的幾個重要變量 static final int DEFAULT_INITIAL_CAPACITY = 16; //默認初始容量,必須是2的n次方 static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量,當通過構造方法傳入的容量比它還大時,就用這個最大容量,必須是2的n次方 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默認負載因子 transient Entry<K,V>[] table; //用來存儲鍵值對,可以看到鍵值對都是存儲在Entry中的 transient int size; //存放元素的個數 int threshold; //臨界值 當實際大小超過臨界值時,會進行擴容threshold = 加載因子*容量 final float loadFactor; //加載因子 transient int modCount; //被修改的次數 存儲結構
2.Entry是一個鏈表結構,不僅包含key和value,還有可以指向下一個的next static class Entry<K,V> implements Map.Entry<K,V> { final K key; V value; Entry<K,V> next; int hash; /** * Creates new entry. */ Entry(int h, K k, V v, Entry<K,V> n) { value = v; next = n; key = k; hash = h; } ... //3.put方法 public V put(K key, V value) { if (key == null) return putForNullKey(value);//儲存空鍵 int hash = hash(key);//計算hash值 int i = indexFor(hash, table.length);//計算存儲位置 for (Entry<K,V> e = table[i]; e != null; e = e.next) {//遍歷hashmap的內部數據 Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } //這個for循環,當發生並發,兩個線程沖突的時候,這個鏈表的結構會發生變化:可能兩個key互為對方的next元素。此時通過next遍歷,會形成死循環。在jdb8中已經不存在了。最好的解決辦法是使用concurrenthashmap modCount++; addEntry(hash, key, value, i); return null; } //首先通過hash方法對hashcode進行處理: final int hash(Object k) { int h = 0; h ^= k.hashCode(); h ^= (h >>> 20) ^ (h >>> 12); return h ^ (h >>> 7) ^ (h >>> 4); } //可以看到只是在key的hashcode值上做了一些處理,通過hash計算出來的值將會使用indexFor方法找到它應該所在的table下標: static int indexFor(int h, int length) { return h & (length-1); } //這個方法其實相當於對table.length取模。 //當需要插入的key為null時,調用putForNullKey方法處理: private V putForNullKey(V value) { for (Entry<K,V> e = table[0]; e != null; e = e.next) { if (e.key == null) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; addEntry(0, null, value, 0); return null; } //putForNullKey方法只從table[0]這個位置開始遍歷,因為key為null只放在table中的第一個位置,下標為0,在遍歷中如果發現已經有key為null了,則替換新value,返回舊value,結束;如果還沒有key為null,調用addEntry方法增加一個Entry: void addEntry(int hash, K key, V value, int bucketIndex) { if ((size >= threshold) && (null != table[bucketIndex])) { resize(2 * table.length); hash = (null != key) ? hash(key) : 0; bucketIndex = indexFor(hash, table.length); } createEntry(hash, key, value, bucketIndex); } //可以看到jdk7中resize的條件已經發生改變了,只有當 size>=threshold並且 table中的那個槽中已經有Entry時,才會發生resize。即有可能雖然size>=threshold,但是必須等到每個槽都至少有一個Entry時,才會擴容。還有註意每次resize都會擴大一倍容量 void createEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value, e); size++; } //最後看createEntry,它先保存這個桶中的第一個Entry,創建新的Entry放入第一個位置,將原來的Entry接在後面。這裏采用的是頭插法插入元素。 4.get方法 //其實get方法和put方法如出一轍,怎麽放的怎麽拿 public V get(Object key) { if (key == null) return getForNullKey(); Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue(); } //key為null時,還是去table[0]去取: private V getForNullKey() { for (Entry<K,V> e = table[0]; e != null; e = e.next) { if (e.key == null) return e.value; } return null; } //否則調用getEntry方法: final Entry<K,V> getEntry(Object key) { int hash = (key == null) ? 0 : hash(key); for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; } //這個方法也是通過key的hashcode計算出它應該所在的下標,再遍歷這個下標的Entry鏈,如果key的內存地址相等(即同一個引用)或者equals相等,則說明找到了 hash的原則 A、等冪性。不管執行多少次獲取Hash值的操作,只要對象不變,那麽Hash值是固定的。如果第一次取跟第N次取不一樣,那就用起來很麻煩. B、對等性。若兩個對象equal方法返回為true,則其hash值也應該是一樣的。舉例說明:若你將objA作為key存入HashMap中,然後new了一個objB。在你看來objB和objA是一個東西(因為他們equal),但是使用objB到hashMap中卻取不出來東西。 C、互異性。若兩個對象equal方法返回為false,hash值有可能相同,但最好是不同的,這個不是必須的,只是這樣做會提高hash類操作的性能(碰撞幾率低)。 解決hash碰撞的方法: hashmap采用的就是鏈地址法,這種方法好處是無堆積現象,但是next指針會占用額外空間 和jdk8中的HashMap區別 在jdk8中,仍然會根據key.hashCode()計算出hash值,再通過這個hash值去定位這個key,但是不同的是,當發生沖突時,會采用鏈表和紅黑樹兩種方法去處理,當結點個數較少時用鏈表(用Node存儲),個數較多時用紅黑樹(用TreeNode存儲),同時結點也不叫Entry了,而是分成了Node和TreeNode。再最壞的情況下,鏈表查找的時間復雜度為O(n),而紅黑樹一直是O(logn),這樣會提高HashMap的效率。 Put方法也變了,為了防止並發問題。 擴展:為何數組的長度是 2 的 n 次方呢? 1.這個方法非常巧妙,它通過 h & (table.length -1) 來得到該對象的保存位,而HashMap 底層數組的長度總是 2 的 n 次方,2n-1 得到的二進制數的每個位上的值都為 1,那麽與全部為 1 的一個數進行與操作,速度會大大提升。 2.當 length 總是 2 的 n 次方時,h& (length-1)運算等價於對 length 取模,也就是h%length,但是&比%具有更高的效率。 3.當數組長度為 2 的 n 次冪的時候,不同的 key 算得的 index 相同的幾率較小,那麽數據在數組上分布就比較均勻,也就是說碰撞的幾率小,相對的,查詢的時候就不用遍歷某個位置上的鏈表,這樣查詢效率也就較高了。 HashMap 的擴容機制: 當 HashMap 中的結點個數超過數組大小*loadFactor(加載因子)時,就會進行數組擴容,loadFactor 的默認值為 0.75。也就是說,默認情況下,數組大小為 16,那麽當 HashMap中結點個數超過 16*0.75=12 的時候,就把數組的大小擴展為 2*16=32,即擴大一倍,然後重新計算每個元素在數組中的位置,並放進去,而這是一個非常消耗性能的操作。 |
Hashmap和HashTable的異同:
01.兩者的默認容量與負載因子有變化
02.hashtable的 容量可以是任意值,而hashmap必須是2的次冪
03.hashtable中在put方法裏面不允許值與鍵為空
04.計算索引的方式不同(indexof函數不同)
05.hashtable大部分方法都加上了sychronied關鍵字
06.hashtable每次擴容,容量為原來的兩倍加2.
2. concurrenthashmap源碼解析與並發編程
使用與獲取全局信息的方法並不頻繁的時候
01.在 ConcurrentHashMap 中,不允許用 null 作為鍵和值。
02.ConcurrentHashMap 使用分段鎖(減少鎖粒度)技術,將數據分成一段一段的存儲,然後給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問,能夠實現真正的並發訪問。
默認情況下分為16個段。
03.當增加一個新的表項,不是全部加鎖,會先計算在哪個段,對指定的段加鎖。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; //上面兩行用於獲取段號 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);//得到段,將數據插入到段中 return s.put(key, hash, value, false); }
04.當系統需要取得全局鎖,消耗資源就會比較多。比如size()方法:事實上會先使用無鎖的方式求和,如果失敗,會先獲得所有段的鎖再去求和。
3. BlockingQueue
A線程可以知道b線程的存在
是一個接口並非一個具體實現:
ArrayBlockingQueue
ArrayBlockingQueue的內部元素都放置在一個對象數組中:final Object[] items;
Offer():當隊列已經滿了,會立即返回false
Put():如果隊列滿了會一直等待
Pool():彈出元素,如果為空返回null
Take():彈出元素,如果為空等待到有元素即可。
Take方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
Put方法:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
/** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
LinkedBlockingQueue(鎖分離)
兩把不同的鎖 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
Take函數 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//不能有兩個線程同時取數據 try { while (count.get() == 0) {//如果沒有數據,一直等待(因為是lockInterruptibly,可中斷) notEmpty.await(); } x = dequeue();//取得第一個數據 c = count.getAndDecrement();//數量-1,原子操作,因為會和put同時訪問count。 if (c > 1) notEmpty.signal();//通知其他take操作 } finally { takeLock.unlock();//釋放鎖 } if (c == capacity) signalNotFull();//通知put操作,已有空余空間 return x; }
Put函數 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly();//上鎖不能有兩個線程同時進行put函數 try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) {//當隊列已經滿了以後,等待 notFull.await(); } enqueue(node);//插入數據 c = count.getAndIncrement();//更新總數 if (c + 1 < capacity) notFull.signal();//有足夠的空間,通知其他線程 } finally { putLock.unlock();//釋放鎖 } if (c == 0) signalNotEmpty();//釋放成功後,通知take函數取數據 }
4. 並發下的ArrayList
當ArrayList在擴容的時候,內部一致性被破壞,由於沒有鎖的保護,另外一個線程訪問不到不一致的內部狀態,導致出現越界問題。
還會出現多個線程同時對同一位置進行賦值。
5. concurrentlinkedqueue:
高並發環境中可以說是最好的隊列,也可以看做是一個線程安全的linkedList。
6. CopyOnWriteArrayList
性能很好的讀寫list,在讀寫的時候任何時候都不加鎖;只有在寫寫的時候需要同步等待。
當寫操作的時候,進行一次自我復制。對原有的數據進行一次復制,將修改的內容寫入副本修改完之後,將副本替換原來的數據。
?集合工具類使用線程