【學習筆記】PostgreSQL進階查詢之連線查詢和子查詢
目錄
1. 鎖用來做什麼?
2. 鎖的實現方式
4. 生產者消費者中的鎖
5.Condition介面
6.
ReadWriteLock7. 關於使用執行緒安全的集合
8. 關於佇列
9. 關於i++的不安全問題與AtomicInteger
1. 鎖用來做什麼?
解決執行緒同步問題,當多執行緒共同訪問同一個物件(臨界資源)的時候, 如果破壞了不可分割的操作(原子操作),就可能發生資料不一致,有可能出現多個執行緒先後更改資料,造成所得到的資料是髒資料。鎖是鎖定臨界資源。
2. 鎖的實現方式
在Java中通常實現鎖有兩種方式,一種是synchronized關鍵字,另一種是Lock。
①使用 synchronized。必須要獲取當前物件的互斥鎖標記,如果得不到就被阻塞,直到得到互斥鎖標記。執行緒執行完同步方法,會自動歸還互斥鎖標記
②使用Lock。 Lock介面的常用實現類 ReentrantLock /riː'entrənt/ :互斥鎖
兩者的區別:
①首先最大的不同:synchronized是基於JVM層面實現的,而Lock是基於JDK層面實現的。
②synchronized是一個關鍵字,Lock是一個介面.
③synchronized程式碼塊執行完成之後會自動釋放鎖物件,Lock必須手動呼叫方法釋放鎖物件。
④synchronized程式碼塊出現了異常也會自動釋放鎖物件,Lock介面中出現異常也需要手動釋放鎖物件。
⑤在併發量比較小的情況下,使用synchronized;但是在併發量比較高的情況下,其效能下降會很嚴重,此時推薦使用ReentrantLock。
例項:
package day20; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) throws Exception { MyList2 list = new MyList2(); Thread t1= new Thread(new Runnable(){ public void run(){ list.add("C"); } }); Thread t2 = new Thread(new Runnable(){ public void run(){ list.add("D"); } }); t1.start(); t2.start(); t1.join(); t2.join(); list.add("E"); list.print(); } } class MyList2{ String[] data = {"A","B","","","",""}; int index = 2; Lock lock = new ReentrantLock();//Lock介面,ReentrantLock為實現類 public void add(String s){ try{ lock.lock();//加鎖 //lock.tryLock();嘗試加鎖,失敗時返回false,此時可進行其他操作,但有可能造成活鎖。 data[index] = s ; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } index++; } finally{ lock.unlock();//釋放鎖,為了避免鎖內的程式碼塊出現異常後直接返回而沒有釋放鎖的問題,將此句程式碼放到Finally中 } } public void print(){ for(int i = 0 ; i < data.length ; i++){ System.out.println(data[i]); } } }
怎麼來避免死鎖?
o.wait():執行緒會釋放鎖標記,進入等待狀態
o.notify()/o.notifyAll():從等待狀態中釋放一個/全部執行緒
注意:以上三個方法必須出現在對o加鎖的同步程式碼塊中
4. 生產者消費者中的鎖
使用準則:
1 永遠在synchronized的方法或物件裡使用wait、notify和notifyAll,不然Java虛擬機器會生成 IllegalMonitorStateException。
2 永遠在while迴圈裡而不是if語句下使用wait。這樣,迴圈會線上程睡眠前後都檢查wait的條件,並在條件實際上並未改變的情況下處理喚醒通知。
3 永遠在多執行緒間共享的物件(在生產者消費者模型裡即緩衝區佇列)上使用wait。
例項:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestProducerConsumer { public static void main(String[] args) { MyStack stack = new MyStack(); Runnable task1 = new Runnable(){ public void run(){ for(char c = 'A' ; c<='Z' ; c++){ stack.push(c+""); } } }; Runnable task2 = new Runnable(){ public void run(){ for(int i = 1 ; i <= 26; i++){ stack.pop(); } } }; new Thread(task1).start(); new Thread(task1).start(); new Thread(task2).start(); new Thread(task2).start(); } } class MyStack{ String[] data = {"","","","","",""}; int index; Lock lock = new ReentrantLock(); Condition full = lock.newCondition();//獲得Condition例項 Condition empty = lock.newCondition(); public void push(String s){ try { lock.lock(); while (data.length == index) { try { full.await();//(不符合條件的等待)滿了即等待 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(s + " pushed "); data[index] = s; index++; print(); empty.signalAll();//通知消費者 } finally{ lock.unlock(); } } public void pop(){ try { lock.lock(); while (index == 0) { try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } index--; String o = data[index]; data[index] = ""; System.out.print(o + " poped "); print(); full.signalAll(); //通知生產者 } finally{ lock.unlock(); } } public void print(){ for(int i = 0 ; i < data.length ; i++){ System.out.print(data[i]+" "); } System.out.println(); } }
5.Condition介面
public interface Condition (介面)
條件(Conditio也稱為條件佇列或條件變數)為一個執行緒的暫停執行(“等待”)提供了一種方法,直到另一個執行緒通知某些狀態現在可能為真。
Condition取代了物件監視器方法的使用。可以使用兩個Condition例項來實現
一個Condition例項本質上繫結到一個鎖。 要獲得特定Condition例項的Condition例項,使用其newCondition()方法。
void | await() 導致當前執行緒等到發訊號或interrupted。 |
---|---|
void | signal() 喚醒一個等待執行緒。 |
void | signalAll() 喚醒所有等待執行緒。 |
例如,假設我們有一個有限的緩衝區,它支援put
和take
方法。 如果在一個空的緩衝區嘗試一個take
,則執行緒將阻塞直到一個專案可用; 如果put
試圖在一個完整的緩衝區,那麼執行緒將阻塞,直到空間變得可用。 我們希望在單獨的等待集中等待put
執行緒和take
執行緒,以便我們可以在緩衝區中的專案或空間可用的時候使用僅通知單個執行緒的優化。 這可以使用兩個Condition
例項來實現。
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await();//滿了,put等待 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal();//喚醒take } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await();//空了,take等待 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal();//喚醒put return x; } finally { lock.unlock(); } } }
(ArrayBlockingQueue
類提供此功能,因此沒有理由實現此示例使用類。)
例項:數字和字母交替列印
public class TestNumberCharPrint { public static void main(String[] args) throws InterruptedException { final Object o = new Object();//全域性物件,用於分別不同時間拿到鎖標記來交替 Runnable task1 = new Runnable(){ public void run(){ synchronized (o) {//加鎖保證此處原子操作 for (int i = 1; i <= 52; i++) { System.out.println(i); if (i % 2 ==0){ o.notifyAll();//釋放字母執行緒 try { if(i!=52) o.wait();//若等於52時進入等待,則此執行緒已完成全部任務單還沒結束 } catch (InterruptedException e) { e.printStackTrace(); } } } } } }; Runnable task2 = new Runnable(){ public void run(){ synchronized (o) { for (char c = 'A'; c <= 'Z'; c++) { System.out.println(c); o.notifyAll();//釋放數字執行緒 try { if (c!='Z') o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; Thread t1 = new Thread(task1); Thread t2 = new Thread(task2); t1.start(); Thread.sleep(1); t2.start(); } }
6.
Lock介面的一個實現類
ReadWriteLock(讀寫鎖)是什麼
ReadWriteLock維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。
讀鎖和寫鎖不能被同時載入,寫鎖載入則不能讀,讀鎖載入則不能寫。
若寫鎖未被載入,讀取鎖可以多個讀執行緒同時保持,
若讀鎖未被載入,寫入鎖也是獨佔的,不能同時寫。
例項:
package day20; import java.util.ArrayList; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class TestMyList { public static void main(String[] args) { CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); list.add("A"); } } //改造ArrayList為執行緒安全的(部分方法,通過加讀或寫鎖來實現) class MyList extends ArrayList{ ReadWriteLock rwl = new ReentrantReadWriteLock();//讀寫鎖 Lock rl = rwl.readLock();//讀鎖 Lock wl = rwl.writeLock();//寫鎖 @Override public int size() { try{ rl.lock(); return super.size(); } finally{ rl.unlock(); } } @Override public Object get(int index) { try{ rl.lock(); return super.get(index); } finally{ rl.unlock(); } } @Override public boolean add(Object e) { try{ wl.lock(); return super.add(e); } finally{ wl.unlock(); } } @Override public Object remove(int index) { try{ wl.lock(); return super.remove(index); } finally{ wl.unlock(); } } @Override public void clear() { try{ wl.lock(); super.clear(); } finally{ wl.unlock(); } } }
7. 關於使用執行緒安全的集合
Collection
List (ArrayList LinkedList Vector CopyOnWriteArrayList)
Set (HashSet LinkedHashSet CopyOnWriteArraySet)
SortedSet (TreeSet)
Queue (LinkedList ConcurrentLinkedQueue)
BlockingQueue (ArrayBlockingQueue LinkedBlockingQueue)
Map (HashMap LinkedHashMap Hashtable Properties ConcurrentHashMap )
SortedMap (TreeMap)
以下集合的效率都比直接加鎖的效率高
CopyOnWriteArrayList 利用複製陣列的方式實現陣列元素的修改, 寫效率低 讀效率高(讀操作遠多於寫操作) 總體效率提高
CopyOnWriteArraySet 執行緒安全的Set集合
ConcurrentHashMap 分段鎖,將HashMap的陣列連結串列分為16段,多個執行緒讀取和寫入同一段時,需依次進行(需等待),讀取或寫入不同段時互不 影響,由於HashCode相等的概率不大,所以效率遠高於HashTable。
ConcurrentLinkedQueue 執行緒安全的佇列(連結串列實現的) 利用一個無鎖演算法(CAS,和預期值比較,不同則重試)實現執行緒安全——效率高
8. 關於佇列
Queue:佇列 FIFO
常用方法:
add() :新增元素
offer():新增元素 優先使用
remove():刪除元素
poll ():刪除元素,優先使用
element():獲取佇列的頭元素
peek():獲取佇列的頭元素 優先使用
實現類:LinkedList ConcurrentLinkedQueue
BlockQueue 阻塞佇列 (是個介面)
put () 新增元素到佇列中 如果佇列滿,則等待
take()刪除佇列頭元素 , 如果佇列空,則等待
實現類:ArrayBlockingQueue 陣列實現 有界佇列 put方法可能會等待
LinkedBlockingQueue 連結串列實現 無界佇列 put方法不等待
package day21; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestBlockingQueue { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<String>(6);//佇列 Runnable task1 = new Runnable(){ public void run(){ for(int i = 1 ; i<= 100; i++){ try { queue.put("A"+i); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable task2 = new Runnable(){ public void run(){ for(int i = 1 ; i<= 100; i++){ try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }; ExecutorService es = Executors.newFixedThreadPool(2); es.submit(task1); es.submit(task2); es.shutdown(); } }
9. 關於i++的不安全問題與AtomicInteger
i++是先把數i讀到另外一個暫存器,加1運算後再寫回到原暫存器,中間過程被另外一個執行緒打斷時就不是原子操作了,會造成結果不一致
package day21; import java.util.concurrent.atomic.AtomicInteger; public class TestAtomicInteger { static int i = 0 ; //error! static AtomicInteger a = new AtomicInteger(0);//利用AtomicInteger解決,還有AtomicBoolean等,利用了不加鎖的比較演算法(不是預期值時,撤回,重新加) static Integer b = Integer.valueOf(0);//error!無臨界資源,因為Integer+1後成為了另外一個物件Integer,可以定義其他型別的物件來加鎖,如下列的obj static MyObject obj = new MyObject(); public static void main(String[] args) throws Exception{ Thread[] ts = new Thread[10]; for(int k = 0 ; k<ts.length ; k++){ ts[k] = new Thread(new Runnable(){ public void run(){ for(int k = 1 ; k <= 10000; k++){ i++; a.incrementAndGet(); synchronized(b){ b = Integer.valueOf(b.intValue()+1); } synchronized(obj){ obj.x++; } } } }); ts[k].start(); } for(int k=0; k <ts.length ; k++){ ts[k].join(); } System.out.println(i); System.out.println(a); System.out.println(b); System.out.println(obj.x); } } class MyObject{ public int x=0; }