java 並發 (四) ---- 並發容器
Hashmap 和 Concurrenthashmap
Hashmap 不適合並發,應該使用ConcurrentHashMap .
這是很多人都知道的,但是為什麽呢? 可以先看一下這兩篇文章. JDK7與JDK8中HashMap的實現 和 談談HashMap線程不安全的體現.
由這兩篇文章我們可以知道 :
- Hashmap 不適合並發的原因是當Hashmap擴容的時候,遷移會產生回環.
- Hashmap 在JDK1.7 解決沖突的方法是生成鏈表,而1.8是生成紅黑樹.
明白了Hashmap之後,我們來看一下 ConcurrentHashMap 的實現是怎麽樣的? 漫畫:什麽是ConcurrentHashMap的? 我們可以總結一下 ConcurrentHashMap的幾個要點 : (ccHp縮寫 ConcurrentHashMap)
- ccHp 的實現是 分段鎖,而不是整個對象鎖住,增強了並發性. 每一段是一個 segment
- ccHp的size() 方法 ,即是容器中的元素個數.統計數量的邏輯如下 :
可以看到ccHp 統計size 時判斷是否有沒被修改和 CAS 相似.
ccHp的運用可以適合並發,在web上例如session的管理,下面是shiro session 管理類.(shiro開源可以好好學習)
public class MemorySessionDAO extends AbstractSessionDAO { private static final Logger log = LoggerFactory.getLogger(MemorySessionDAO.class); private ConcurrentMap<Serializable, Session> sessions;public MemorySessionDAO() { this.sessions = new ConcurrentHashMap<Serializable, Session>(); } protected Serializable doCreate(Session session) { Serializable sessionId = generateSessionId(session); assignSessionId(session, sessionId); storeSession(sessionId, session); return sessionId; } protected Session storeSession(Serializable id, Session session) { if (id == null) { throw new NullPointerException("id argument cannot be null."); } return sessions.putIfAbsent(id, session); } protected Session doReadSession(Serializable sessionId) { return sessions.get(sessionId); } public void update(Session session) throws UnknownSessionException { storeSession(session.getId(), session); } public void delete(Session session) { if (session == null) { throw new NullPointerException("session argument cannot be null."); } Serializable id = session.getId(); if (id != null) { sessions.remove(id); } } public Collection<Session> getActiveSessions() { Collection<Session> values = sessions.values(); if (CollectionUtils.isEmpty(values)) { return Collections.emptySet(); } else { return Collections.unmodifiableCollection(values); } } }
CopyOnWriteArrayList 和 CopyOnWriteArraySet
下文縮寫CopyOnWriteArrayList 為 cowaList. cowaList 是為了替代同步 List, cowaSet 同理為了替代同步Set的. 閱讀下面進行了解原理先. CopyOnWriteArrayList實現原理及源碼分析 .
由此我們可以總結一下 ccowaList 的幾個重要點 :
- ccowaList 適合 多讀少寫 因為讀是沒加鎖的,增加元素時先復制一份,即寫是在副本上,而讀是原始容器中實現了讀寫分離.
- 缺點 --- 要是寫多的話,每次的復制會是性能問題 ; 無法實時數據,這是因為讀寫分離了.CopyOnWrite容器只能保證數據的最終一致性,不能保證數據的實時一致性。
運用場景和缺點分析, 詳細的看這裏 Java並發編程:並發容器之CopyOnWriteArrayList(轉載)
下文簡稱cdl. 首先cdl定義一個count, 這個count表示一個有多少個執行任務,需要等待幾個執行,然後cdl開啟await()並且阻塞 ,然後每個線程執行完任務,調用countDown()方法,個count-1 ,直到全部任務完成,cdl繼續執行. 詳見 什麽時候使用CountDownLatch
使用例子如下 :
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); Thread thread1 = new Thread(() -> { try { System.out.println("線程1 開始執行" + new Date()); Thread.sleep(1000 * 3); System.out.println("線程1 執行完畢"+ new Date()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread thread2 = new Thread(() -> { try { System.out.println("線程2 開始執行 " + new Date()); Thread.sleep(2 * 1000); System.out.println("線程2 執行結束 " + new Date()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }); thread1.start(); thread2.start(); latch.await(); System.out.println("任務全部完成"); }
下文簡稱ft. 那麽ft的作用到底是幹什麽的呢?具體來說就是可以返回線程執行的結果,可以獲取線程執行的狀態,可以中斷線執行的類. 具體使用見 : Java並發編程:Callable、Future和FutureTask
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future<Integer> result = executor.submit(task); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主線程在執行任務"); try { System.out.println("task運行結果"+result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任務執行完畢"); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子線程在進行計算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
Semaphore 這個類就像一個停車場的保安,停車場的車位是固定的,獲取信號量就是進入停車場停車,而釋放信號量就是離開停車場.
Semaphore 分為兩種模式,假如假如車位滿了,當有車出來時,那麽公平的方式就是在場外的車先到先進,不公平的方式就是無論先來晚來
的一起競爭. 詳見這兩篇文章 : Semaphore的工作原理及實例 和 深入理解Semaphore
示例代碼來自 Semaphore的工作原理及實例
public class SemaphoreDemo { private static final Semaphore semaphore=new Semaphore(3); private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); private static class InformationThread extends Thread{ private final String name; private final int age; public InformationThread(String name,int age) {; this.age=age; } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"歲當前時間為:"+System.currentTimeMillis()); Thread.sleep(1000); System.out.println(name+"要準備釋放許可證了,當前時間為:"+System.currentTimeMillis()); System.out.println("當前可使用的許可數為:"+semaphore.availablePermits()); semaphore.release(); } catch(InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { String[] name= {"李明","王五","張傑","王強","趙二","李四","張三"}; int[] age= {26,27,33,45,19,23,41}; for(int i=0;i<7;i++) { Thread t1=new InformationThread(name[i],age[i]); threadPool.execute(t1); } } }
可以看到要是沒有許可的話,調用acquire 方法就會一直阻塞.
柵欄能阻塞一組線程直到某個事件發生,柵欄與閉鎖的區別的關鍵區別在於,所有線程必須同時到達柵欄位置,才能繼續執行.而閉鎖是等待一組線程完成後,某個操作才可以進行.可以這樣比喻吧,例如有場賽馬表演,當所有的馬到達終點後會放飛和平鴿.那麽當所有馬都準備好,即是條件滿足了,所有的馬奔騰而出,就是CyclicBarrier ,而當所有的馬都到達終點了,這麽條件滿足了,和平鴿被放飛了,這個相當於是CountDownLatch.
static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(() -> { try { System.out.println("線程1 進入cyclicBarrier.await() " + new Date()); c.await(); } catch (Exception e) { } System.out.println("線程1 柵欄打開 " + new Date()); System.out.println(1); }).start(); try { System.out.println("主線程 進入cyclicBarrier.await() " + new Date()); c.await(); } catch (Exception e) { } System.out.println("主線程 柵欄打開 " + new Date()); System.out.println(2); }
1 public class Test { 2 public static void main(String[] args) { 3 int N = 4; 4 CyclicBarrier barrier = new CyclicBarrier(N); 5 6 for(int i=0;i<N;i++) { 7 new Writer(barrier).start(); 8 } 9 10 try { 11 Thread.sleep(25000); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 16 System.out.println("CyclicBarrier重用"); 17 18 for(int i=0;i<N;i++) { 19 new Writer(barrier).start(); 20 } 21 } 22 static class Writer extends Thread{ 23 private CyclicBarrier cyclicBarrier; 24 public Writer(CyclicBarrier cyclicBarrier) { 25 this.cyclicBarrier = cyclicBarrier; 26 } 27 28 @Override 29 public void run() { 30 System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據..."); 31 try { 32 Thread.sleep(5000); //以睡眠來模擬寫入數據操作 33 System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其他線程寫入完畢"); 34 35 cyclicBarrier.await(); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 }catch(BrokenBarrierException e){ 39 e.printStackTrace(); 40 } 41 System.out.println(Thread.currentThread().getName()+"所有線程寫入完畢,繼續處理其他任務..."); 42 } 43 } 44 }
第二個代碼展示了CyclicBarrier 執行的東西可以復用.
下面總結一下 CyclicBarrier 和 CountDownLatch 的區別
- CountDownLatch 的信號量不能重新設置,CyclicBarrier 可以重新設置.
- CyclicBarrier 可以復用, 而CountDownLatch不能復用.
Exchanger 像是交換東西的一個平臺.
1 public static void main(String[] args) { 2 3 ExecutorService executor = Executors.newCachedThreadPool(); 4 final Exchanger exchanger = new Exchanger(); 5 6 executor.execute(new Runnable() { 7 String data1 = "Ling"; 8 @Override 9 public void run() { 10 doExchangeWork(data1, exchanger); 11 } 12 }); 13 14 executor.execute(new Runnable() { 15 String data1 = "huhx"; 16 @Override 17 public void run() { 18 doExchangeWork(data1, exchanger); 19 } 20 }); 21 22 executor.shutdown(); 23 } 24 25 26 private static void doExchangeWork(String data1, Exchanger exchanger) { 27 try { 28 System.out.println(Thread.currentThread().getName() + "正在把數據 " + data1 + " 交換出去"); 29 Thread.sleep((long) (Math.random() * 1000)); 30 //放進交換的位置. 31 String data2 = (String); 32 System.out.println(Thread.currentThread().getName() + "交換數據 到 " + data2); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 }
