同步(二)
阿新 • • 發佈:2018-11-21
一、原子變數:
1、AtomicInteger:原子Integer型別
構造:
public AtomicInteger(int initialValue)
public AtomicInteger()
方法:
//以原子方式獲取舊值並設定新值 public final int getAndSet(int newValue) //以原子方式獲取舊值並給當前值加1 public final int getAndIncrement() //以原子方式獲取舊值並給當前值減1 public final int getAndDecrement() //以原子方式獲取舊值並給當前值加delta public final int getAndAdd(int delta) //以原子方式給當前值加1並獲取新值 public final int incrementAndGet() //以原子方式給當前值減1並獲取新值 public final int decrementAndGet() //以原子方式給當前值加delta並獲取新值 public final int addAndGet(int delta)
例如:incrementAndGet方法以原子方式將變數自增,並返回自增後的值。也就是說,獲得值、增1並設定然後生成新值的操作不會中斷。
相關原理:
private volatile int value;
通過volatile進行修飾,保證記憶體可見性
其他型別:
- AtomicBoolean:原子Boolean型別
- AtomicLong:原子Long型別
- AtomicReference:原子引用型別
…
2、AtomicIntegerArray:原子陣列型別
構造:
public AtomicIntegerArray(int length) { array = new int[length]; } public AtomicIntegerArray(int[] array) { this.array = array.clone(); }
常用方法:
public final boolean compareAndSet(int i, int expect, int update)
public final int getAndIncrement(int i)
public final int getAndAdd(int i, int delta)
二、併發容器
1、CopyOnWriteArrayList
特點:
- 它是執行緒安全的,可以被多個執行緒併發訪問
- 它的迭代器不支援修改操作
- 它以原子方式支援一些複合操作
private transient volatile Object[] elements;
通過volatile進行修飾保證記憶體可見性
存入元素
/**
* 不存在就新增,並返回true
*/
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
private boolean addIfAbsent(E e, Object[] snapshot) {
//通過synchronized實現執行緒同步
synchronized (lock) {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i]
&& Objects.equals(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
//建立新的陣列,並將元素拷貝進去
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
//修改內部陣列引用
setArray(newElements);
return true;
}
}
獲取元素:
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}
總結:CopyOnWriteArrayList陣列通過volatile修飾保證記憶體可見,寫操作通過synchronized實現同步,效能會低一些,讀操作不需要同步。適用於多執行緒操作寫操作比較少,讀操作比較多的情況。
2、ConcurrentHashMap
特點:
- 執行緒安全
- 分段鎖
- 讀不需要加鎖
原理:
- ConcurrentHashMap採用分段鎖技術。根據雜湊值將將資料分為多段,而每個段有一個獨立的鎖,每一個段相當於一個獨立的雜湊表。
無論是儲存鍵值對還是根據鍵查詢,都先根據鍵的雜湊值對映到段,再在段對應的雜湊表上進行操作。 - 對於寫操作,需要獲取鎖,不能並行,但是讀操作可以,多個讀可以並行,寫的同時也可以讀。
三、阻塞佇列
1、阻塞佇列使用場景:
- 當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞,直到有資料放入佇列。
- 當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞,直到佇列中有空的位置,執行緒被自動喚醒。
2、常見阻塞佇列
- ArrayBlockingQueue:由陣列結構組成的有界阻塞佇列。
- LinkedBlockingQueue:由連結串列結構組成的有界阻塞佇列。
- PriorityBlockingQueue:支援優先順序排序的無界阻塞佇列。
- DelayQueue:使用優先順序佇列實現的無界阻塞佇列。
- SynchronousQueue:不儲存元素的阻塞佇列。
- LinkedTransferQueue:由連結串列結構組成的無界阻塞佇列。
- LinkedBlockingDeque:由連結串列結構組成的雙向阻塞佇列。
3、使用:
public class BlockTest{
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
BlockTest test = new BlockTest();
Product product = test.new Product();
product.start();
Consume consume = test.new Consume();
consume.start();
}
class Product extends Thread {
@Override
public void run() {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consume extends Thread {
@Override
public void run() {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4、原理:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
......
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
//獲取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//元素被空進行等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//喚醒其他執行緒
notFull.signal();
return x;
}
//儲存元素
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
//元素個數等於陣列長度進行等待,當被其他執行緒喚醒時插入元素
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
//喚醒其他執行緒
notEmpty.signal();
}
......
}
總結:主要通過ReentrantLock加鎖,避免多執行緒引起的執行緒安全問題。通過await和signal進行消費者執行緒和生產者執行緒的相互切換。
四、同步器:
1、訊號量Semaphore
//傳入引數為許可數
private static Semaphore semaphore = new Semaphore(2);
public static void main(String[] args) {
for(int i = 0; i < 5; i ++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//獲取許可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " " + new Date());
Thread.sleep(5000l);
//釋放許可
semaphore.release();
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " interrupted");
}
}
}).start();
}
}
限制對資源的併發訪問數
2、倒計時門栓CountDownLatch
它相當於是一個門栓,一開始是關閉的,所有希望通過該門的執行緒都需要等待,然後開始倒計時,倒計時變為0後,門栓開啟,等待的所有執行緒都可以通過。
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i = 0; i < 5; i ++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " " + new Date() + " run");
try {
Thread.sleep(5000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}
}).start();
}
try {
//await()檢查計數是否為0,如果大於0,就等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
3、障柵CyclicBarrier
CyclicBarrier阻塞呼叫的執行緒,直到條件滿足時,阻塞的執行緒同時被開啟。
public static void main(String[] args) {
Random random = new Random();
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for(int i = 0; i < 5; i ++) {
new Thread(new Runnable() {
@Override
public void run() {
int secs = random.nextInt(5);
System.out.println(Thread.currentThread().getName() + " " + new Date() + " run, sleep " + secs + " secs");
try {
Thread.sleep(secs * 1000);
//呼叫await後,表示自己已經到達,如果自己是最後一個到達的,就執行可選的命令,執行後,喚醒所有等待的執行緒
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + new Date() + " runs over");
}
}).start();
}
}