Java併發程式設計-擼一個數據庫連線池
阿新 • • 發佈:2018-12-22
章節目錄
- 等待超時模式的使用場景
- 可以掌握的技能
- 等待/通知 消費者/生產者模式
- CountDownLatch、AtomicInteger、靜態內部類、LinkedList、動態代理的使用
1.等待超時模式
場景
當我們呼叫方法時,這個方法返回的資源比較重要,比如獲取資料庫連線池中連線控制代碼。但是這個資源的返回隨著業務量的增加,那麼獲取資源(連線池連線)的時間就會增加,那麼呼叫一個方法時就要等待一段時間(一般來說是給定一個時間段),如果該方法能夠在一段時間內獲取到結果,那麼將結果立刻返回,反之,超時返回預設結果。
等待/通知的經典範式,即加鎖、條件迴圈、處理邏輯3個步驟,這種正規化沒辦法做到超時等待,對經典範式做很小的改動,就可以實現超時等待。
等待超時模式虛擬碼:
public synchronized Object get(long mills) throws InterruptedException { Object result = null; long future = System.currentTimeMills() + mills; long remaining = mills; while (result == null && remaining > 0) { wait(remaining);//釋放鎖,阻塞 mills 毫秒 remaining = future - System.currentTimeMills(); } return result;//如果超時之後獲取到result則不返回null }
超時等待的作用就是不會永遠阻塞呼叫者,但是 超時之後被喚醒,知識將執行緒從等待佇列移動至阻塞佇列,繼續向下進行返回result還是要重新獲取鎖,如果一直獲取不到鎖,那麼result也不會列印。只是增加了靈活性。
2.可以掌握的技能
實戰
使用等待超時模式擼一個簡單資料庫連線池,在示例中模擬:
- 從連線池獲取連線 RunnerThread
- 使用連線 RunnerThread
- 釋放連線 RunnerThread
注意:客戶端獲取(消費)連線的過程被設定為等待超時、等待/通知兩種模式
ConnectionPool.java-資料庫連線池
package org.seckill.DBConnection; import java.sql.Connection; import java.util.LinkedList; /** * 資料庫連線池物件 */ public class ConnectionPool { //連結串列list(池)維護 connection 連線物件 private LinkedList<Connection> pool = new LinkedList<Connection>(); //構造方法 初始化池中連線 public ConnectionPool(int initialSize) { if (initialSize > 0) { for (int i = 0; i < initialSize; i++) { pool.addLast(ConnectionDriver.createConnection());//建立initialSize個代理Connection物件 } } } //釋放connection ,相當於-生產者 public void releaseConnection(Connection connection) { if (connection != null) {//有效歸還連線 synchronized (pool) { pool.addLast(connection); //生產者動作完畢後,需要喚醒所有消費者 pool.notifyAll(); } } } //獲取connection控制代碼,相當於消費者,採用超時等待與等待/通知兩種策略 public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool) { if (mills < 0) {//非超時等待模式,採用等待/通知模式 while (pool.isEmpty()) { pool.wait();//本示例中不演示這種模式下獲取連線的情景 } return pool.removeFirst(); } else {//超時等待模式 long future = System.currentTimeMillis() + mills; long remaining = mills; while (pool.isEmpty() && remaining > 0) { pool.wait(remaining); remaining = future - System.currentTimeMillis(); } Connection connection = null; if (!pool.isEmpty()) { connection = pool.removeFirst();//返回頭結點物件 } return connection; } } } }
ConnectionDriver.java-動態生成Connection代理物件
package org.seckill.DBConnection;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
/**
* 資料庫連線驅動,
* 動態代理獲取實現java.sql.Connection 介面的代理物件
*/
public class ConnectionDriver {
static class ConnectionHandler implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName() == "commit") {
Thread.sleep(100);
}
return null;
}
}
//獲取Connection的動態代理類
public static final Connection createConnection() {
return (Connection) Proxy.newProxyInstance(
ConnectionDriver.class.getClassLoader(),//類載入器
new Class<?>[]{Connection.class},//Connection實現的介面列表,包含Connection介面
new ConnectionHandler());//與代理物件繫結的handler
}
}
ConnectionPoolTest.java--測試類
package org.seckill.DBConnection;
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class ConnectionPoolTest {
//執行緒池中初始化10個連線
static ConnectionPool connectionPool = new ConnectionPool(10);
//保證所有的ConnectionRunner 能夠同時開始
static CountDownLatch start = new CountDownLatch(1);
//main執行緒將等待所有的Connection Runner結束後才開始執行
static CountDownLatch end;
public static void main(String[] args) throws Exception {
//ConnectionRunner 執行緒數量,可以修改執行緒數量進行觀察
int threadCount = 50;
end = new CountDownLatch(threadCount);
int count = 20;//每個執行緒進行20次fetchConnetion動作
AtomicInteger got = new AtomicInteger();
AtomicInteger notGot = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
thread.start();
}
start.countDown();//使所有執行緒同時執行
end.await();//主執行緒等待所有執行緒執行完
System.out.println("總的請求次數" + threadCount * count);
System.out.println("獲取到的連線總數" + got);
System.out.println("未獲取到的連線總數" + notGot);
}
static class ConnectionRunner implements Runnable {
int count;//每個執行緒fetchConnetion的次數
AtomicInteger got;//記錄fetchConnection 成功的次數
AtomicInteger notGot;//記錄fetchConnetion 未成功的次數
public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public void run() {
try {
start.await();//等待 所有ConnectionRunner 初始化成功且處於Runnable狀態,同時開始執行,由主執行緒控制的
} catch (InterruptedException e) {
e.printStackTrace();
}
while (count > 0) {
try {
//從連線池中獲取連線,如果1000ms內無法獲取到,將會返回null。
Connection connection = connectionPool.fetchConnection(1000);
if (connection != null) {
try {
connection.createStatement();
connection.commit();
} finally {
//歸還連線
connectionPool.releaseConnection(connection);
got.incrementAndGet();//對獲取次數狀態進行更改
}
} else {
notGot.incrementAndGet();//對未獲取次數狀態進行更改
}
} catch (Exception e) {
} finally {
count--;//執行次數遞減
}
}
end.countDown();
}
}
}
執行結果
threadCount = 101.設定RunnerConnection threadCount數為10
threadCount = 202.設定RunnerConnection threadCount數為20
threadCount = 503.設定RunnerConnection threadCount數為50
threadCount = 504.設定RunnerConnection threadCount數為100
可以看到隨著 runnerConnection 連線執行緒數的遞增,連線的穩定性是越來越低的。但使用者呼叫不會長時間阻塞到connect fetch 上,而是按時返回。