1. 程式人生 > >同步(二)

同步(二)

一、原子變數:

1、AtomicInteger:原子Integer型別

構造:

public AtomicInteger(int initialValue)
public AtomicInteger()

方法:

//以原子方式獲取舊值並設定新值
public final int getAndSet(int newValue)
//以原子方式獲取舊值並給當前值加1
public final int getAndIncrement()
//以原子方式獲取舊值並給當前值減1
public final int getAndDecrement()
//以原子方式獲取舊值並給當前值加delta
public final int getAndAdd(int delta)
//以原子方式給當前值加1並獲取新值
public final int incrementAndGet()
//以原子方式給當前值減1並獲取新值
public final int decrementAndGet()
//以原子方式給當前值加delta並獲取新值
public final int addAndGet(int delta)

例如:incrementAndGet方法以原子方式將變數自增,並返回自增後的值。也就是說,獲得值、增1並設定然後生成新值的操作不會中斷。

相關原理:

private volatile int value;

通過volatile進行修飾,保證記憶體可見性

其他型別:

  • AtomicBoolean:原子Boolean型別
  • AtomicLong:原子Long型別
  • AtomicReference:原子引用型別

2、AtomicIntegerArray:原子陣列型別

構造:

public AtomicIntegerArray(int length) {
    array = new int[length];
}

public AtomicIntegerArray(int[] array) {
    this.array = array.clone();
}

常用方法:

public final boolean compareAndSet(int i, int expect, int update)
public final int getAndIncrement(int i)
public final int getAndAdd(int i, int delta)

二、併發容器

1、CopyOnWriteArrayList

特點:

  • 它是執行緒安全的,可以被多個執行緒併發訪問
  • 它的迭代器不支援修改操作
  • 它以原子方式支援一些複合操作
private transient volatile Object[] elements;

通過volatile進行修飾保證記憶體可見性

存入元素

/**
 * 不存在就新增,並返回true
 */
public boolean addIfAbsent(E e) {
    Object[] snapshot = getArray();
    return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
        addIfAbsent(e, snapshot);
}


private boolean addIfAbsent(E e, Object[] snapshot) {
    //通過synchronized實現執行緒同步
    synchronized (lock) {
        Object[] current = getArray();
        int len = current.length;
        if (snapshot != current) {
            // Optimize for lost race to another addXXX operation
            int common = Math.min(snapshot.length, len);
            for (int i = 0; i < common; i++)
                if (current[i] != snapshot[i]
                    && Objects.equals(e, current[i]))
                    return false;
            if (indexOf(e, current, common, len) >= 0)
                    return false;
        }
        //建立新的陣列,並將元素拷貝進去
        Object[] newElements = Arrays.copyOf(current, len + 1);
        newElements[len] = e;
        //修改內部陣列引用
        setArray(newElements);
        return true;
    }
}

獲取元素:

public E get(int index) {
    return get(getArray(), index);
}
private E get(Object[] a, int index) {
    return (E) a[index];
}

總結:CopyOnWriteArrayList陣列通過volatile修飾保證記憶體可見,寫操作通過synchronized實現同步,效能會低一些,讀操作不需要同步。適用於多執行緒操作寫操作比較少,讀操作比較多的情況。

2、ConcurrentHashMap

特點:

  • 執行緒安全
  • 分段鎖
  • 讀不需要加鎖

原理:

  • ConcurrentHashMap採用分段鎖技術。根據雜湊值將將資料分為多段,而每個段有一個獨立的鎖,每一個段相當於一個獨立的雜湊表。
    無論是儲存鍵值對還是根據鍵查詢,都先根據鍵的雜湊值對映到段,再在段對應的雜湊表上進行操作。
  • 對於寫操作,需要獲取鎖,不能並行,但是讀操作可以,多個讀可以並行,寫的同時也可以讀。

三、阻塞佇列

1、阻塞佇列使用場景:

  • 當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞,直到有資料放入佇列。
  • 當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞,直到佇列中有空的位置,執行緒被自動喚醒。

2、常見阻塞佇列

  • ArrayBlockingQueue:由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue:由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue:支援優先順序排序的無界阻塞佇列。
  • DelayQueue:使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:不儲存元素的阻塞佇列。
  • LinkedTransferQueue:由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:由連結串列結構組成的雙向阻塞佇列。

3、使用:

public class BlockTest{

  private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

  public static void main(String[] args) {
    BlockTest test = new BlockTest();
    Product product = test.new Product();
    product.start();
    Consume consume = test.new Consume();
    consume.start();
  }

  class Product extends Thread {
    @Override
    public void run() {
      try {
        queue.put(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  class Consume extends Thread {
    @Override
    public void run() {
      try {
        queue.take();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

4、原理:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    ......
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    //獲取元素
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //元素被空進行等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //喚醒其他執行緒
        notFull.signal();
        return x;
    }
    
    //儲存元素
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        //元素個數等於陣列長度進行等待,當被其他執行緒喚醒時插入元素
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        //喚醒其他執行緒
        notEmpty.signal();
    }
    
    ......
}

總結:主要通過ReentrantLock加鎖,避免多執行緒引起的執行緒安全問題。通過await和signal進行消費者執行緒和生產者執行緒的相互切換。

四、同步器:

在這裡插入圖片描述
1、訊號量Semaphore

//傳入引數為許可數
private static Semaphore semaphore = new Semaphore(2);

public static void main(String[] args) {
    for(int i = 0; i < 5; i ++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //獲取許可
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " " + new Date());
                    Thread.sleep(5000l);
                    //釋放許可
                    semaphore.release();
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                }
            }
        }).start();
    }
}

限制對資源的併發訪問數

2、倒計時門栓CountDownLatch

它相當於是一個門栓,一開始是關閉的,所有希望通過該門的執行緒都需要等待,然後開始倒計時,倒計時變為0後,門栓開啟,等待的所有執行緒都可以通過。

public static void main(String[] args) {
    CountDownLatch countDownLatch = new CountDownLatch(5);
    for(int i = 0; i < 5; i ++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " " + new Date() + " run");
                try {
                    Thread.sleep(5000l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            }
        }).start();
    }
    try {
        //await()檢查計數是否為0,如果大於0,就等待
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

3、障柵CyclicBarrier

CyclicBarrier阻塞呼叫的執行緒,直到條件滿足時,阻塞的執行緒同時被開啟。

public static void main(String[] args) {
    Random random = new Random();
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    for(int i = 0; i < 5; i ++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                int secs = random.nextInt(5);
                System.out.println(Thread.currentThread().getName() + " " + new Date() + " run, sleep " + secs + " secs");
                try {
                    Thread.sleep(secs * 1000);
                    //呼叫await後,表示自己已經到達,如果自己是最後一個到達的,就執行可選的命令,執行後,喚醒所有等待的執行緒
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " " + new Date() + " runs over");
            }
        }).start();
    }
}