一個簡單的資料庫連線池示例
一個簡單的資料庫連線池, 即一個通過建構函式初始化連線的最大上限,並通過一個雙向佇列來維護連線,呼叫方需要先呼叫fetchConnection(long)方法來指定在多少毫秒內超時獲取連線,連線使用完成後,需要呼叫releaseConnection(Connection)方法將連線放回執行緒池,連線池的程式碼例項如下:
/**
* 簡易連線池
*/
package com.lbbywyt.concurrent;
import java.sql.Connection;
import java.util.LinkedList;
/**
* @author Administrator
*
*/
public class ConnectionPool {
//
private LinkedList<Connection> pool = new LinkedList<Connection>();
//構造耗時,初始化連線池大小.
public ConnectionPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(ConnectionDriver.createConnection());
}
}
}
/**
* 釋放連線.
* @param connection connection
*/
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool) {
// 連線釋放後需要進行通知,這樣其他消費者能夠感知到連線池中已經歸還了一個連線
pool.addLast(connection);
pool.notifyAll();
}
}
}
// 在mills內無法獲取到連線,將會返回null
public Connection fetchConnection(long mills) throws InterruptedException {
synchronized (pool) {
// 不設超時
if (mills <= 0) {
//不設定等待時間時,只要連線池不為空,即返回一個連線
while (pool.isEmpty()) {
pool.wait();
}
return pool.removeFirst();
} else {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while (pool.isEmpty() && remaining > 0) {
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if (!pool.isEmpty()) {
result = pool.removeFirst();
}
return result;
}
}
}
}
由於java.sql.Connection是一個介面,最終的實現是由資料庫驅動提供方來實現的,書中通過動態代理構造了一個Connection,該Connection的代理實現僅僅是在commit()方法呼叫時休眠100毫秒,示例如程式碼:
/**
*該代理類用於建立資料庫連線.
*/
package com.lbbywyt.concurrent;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;
/**
* @author Administrator
*/
public class ConnectionDriver {
// 每一個動態代理類都必須要實現InvocationHandler這個介面,並且每個代理類的例項都關聯到了一個handler,當我們通過代理物件呼叫一個方法的時候,這個方法的呼叫就會被轉發為由InvocationHandler這個介面的
// invoke 方法來進行呼叫。
static class ConnectionHandler implements InvocationHandler {
// proxy: 指代我們所代理的那個真實物件
// method: 指代的是我們所要呼叫真實物件的某個方法的Method物件
// args: 指代的是呼叫真實物件某個方法時接受的引數
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
if (method.getName().equals("commit")) {
TimeUnit.MILLISECONDS.sleep(100);
}
return null;
}
}
// 建立一個Connection的代理,在commit時休眠100毫秒
public static final Connection createConnection() {
// newProxyInstance返回一個動態代理物件。
// loader: 一個ClassLoader物件,定義了由哪個ClassLoader物件來對生成的代理物件進行載入
// interfaces: 一個Interface物件的陣列,表示的是我將要給我需要代理的物件提供一組什麼介面,如果我提供了一組介面給它,那麼這個代理物件就宣稱實現了該介面(多型),這樣我就能呼叫這組介面中的方法了
// h: 一個InvocationHandler物件,表示的是當我這個動態代理物件在呼叫方法的時候,會關聯到哪一個InvocationHandler物件上
// 在newProxyInstance這個方法的第二個引數上,我們給這個代理物件提供了一組什麼介面,那麼我這個代理物件就會實現了這組介面,這個時候我們當然可以將這個代理物件強制型別轉化為這組介面中的任意一個,
// 例項中即當我們呼叫connection物件的commit方法時sleep 1秒.
return (Connection) Proxy.newProxyInstance(
ConnectionDriver.class.getClassLoader(),
new Class<?>[] { Connection.class }, new ConnectionHandler());
}
}
最後建立一個客戶端類來模擬從連線池中獲取連線,客戶端類中使用了同步工具類CountDownLatch來保證每個執行緒獲取執行緒時的公平性。CountDownLatch允許一個或多個執行緒一直等待,直到其他執行緒的操作執行完後再執行,它通過一個計數器來實現,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務。
/**
* 連線池客戶端.
*/
package com.lbbywyt.concurrent;
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author libaobao
*
*/
public class ConnectionPoolTest {
static ConnectionPool pool = new ConnectionPool(10);
// CountDownLatch,同步工具類,它允許一個或多個執行緒一直等待,直到其他執行緒的操作執行完後再執行。
// CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務
// 保證所有ConnectionRunner能夠同時開始
static CountDownLatch start = new CountDownLatch(1);
// main執行緒將會等待所有ConnectionRunner結束後才能繼續執行
static CountDownLatch end;
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
//執行緒數量10
int threadCount = 50;
end = new CountDownLatch(threadCount);
//每個執行緒20次嘗試
int count = 20;
AtomicInteger got = new AtomicInteger();
AtomicInteger notGot = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new ConnetionRunner(count, got, notGot),
"ConnectionRunnerThread");
thread.start();
}
//start 使所有執行緒建立完之後才去獲取連線,保證公平性。
start.countDown();
end.await();//等待所有執行緒都執行完之後才執行下面3個輸出語句
System.out.println("total invoke: " + (threadCount * count));
System.out.println("got connection: " + got);
System.out.println("not got connection " + notGot);
}
static class ConnetionRunner implements Runnable {
int count;
AtomicInteger got;
AtomicInteger notGot;
public ConnetionRunner(int count, AtomicInteger got,
AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public void run() {
try {
Profiler.begin();
start.await();
} catch (Exception ex) {
}
while (count > 0) {
try {
// 從執行緒池中獲取連線,如果1000ms內無法獲取到,將會返回null
// 分別統計連接獲取的數量got和未獲取到的數量notGot
Connection connection = pool.fetchConnection(1000);
if (connection != null) {
try {
connection.createStatement();
connection.commit();
} finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} else {
notGot.incrementAndGet();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
count--;
}
}
Profiler.end();
end.countDown();
}
}
}
上述客戶端類,共建立了50個執行緒,每個執行緒分別嘗試獲取20次,測試輸出如下。
……
Cost: 8204 mills
Cost: 8304 mills
Cost: 8305 mills
Cost: 8504 mills
Cost: 8905 mills
total invoke: 1000
got connection: 824
not got connection: 176
Profiler.begin()和Profiler.end();用來記錄每個執行緒獲取連線的耗時,具體程式碼如下:
/**
* 使用ThreadLocal實現計算方法呼叫耗時.
*/
package com.lbbywyt.concurrent;
import java.util.concurrent.TimeUnit;
/**
* @author libaobao
*
*/
public class Profiler {
// 第一次get()方法呼叫時會進行初始化(如果set方法沒有呼叫),每個執行緒會呼叫一次
private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
protected Long initialValue() {
return System.currentTimeMillis();
}
};
public static final void begin() {
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static final long end() {
long millis = System.currentTimeMillis() - TIME_THREADLOCAL.get();
System.out.println("Cost: " + millis+ " mills");
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
public static void main(String[] args) throws Exception {
Profiler.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println("Cost: " + Profiler.end() + " mills");
}
}