基於memcached for java 實現通用分散式快取和叢集分散式快取
阿新 • • 發佈:2019-01-01
前提:基於memcached client for java 的基礎進行的二次封裝,實現快取儲存的兩種模式:通用分散式快取和叢集分散式快取。以下是對於memcached client for Java 二次封裝的UML圖。
對於memcached的客戶端初始化在CacheFactory中通過讀取配置檔案cacheConfig.xml完成。通用分散式快取,只是一個簡單的封裝,利用memcached client for java提供的分散式支援來實現,這裡主要說一下clusterCache的實現思想:對存入的快取物件的key值進行一次hash,找到對應的伺服器存入,然後根據一定的規則再次進行hash,找到另外一個不同的伺服器存入,取快取時,先對要取的key值進行一次hash,找到主伺服器,如果獲取失敗或者獲取到的值為null,就對key進行再次hash,找到其從伺服器,從這臺伺服器取快取結果(如果取到結果就非同步的更新到主伺服器),這樣就形成了主從式叢集快取。特點是:沒有絕對的主節點和從節點,正常情況下所有伺服器共同承擔快取伺服器,在一臺伺服器出現異常時其他伺服器共同承擔增加的訪問壓力。
拓撲結構如下:
原始碼
package com.yx.cache; public interface Cache<T> { /** * 獲取快取中的資料 * * @param key * @return */ T get(String key); /** * 把資料放入快取 如果存在與key對應的值,則返回失敗 * * @param key * @param value * @return */ boolean add(String key, T value); /** * 把資料放入快取 如果存在與key對應的值,則覆蓋原有的值 * * @param key * @param value * @return */ boolean set(String key, T value); /** * 快取更新 如果不存在與key對應的快取值,則不更新 * * @param key * @param value * @return */ boolean update(String key, T value); /** * 刪除快取 * * @param key * @return */ boolean delete(String key); }
通用分散式快取實現類:
叢集分散式快取實現類package com.yx.cache; import com.danga.MemCached.MemCachedClient; public class CommonCache<T> implements Cache<T> { private static MemCachedClient memCachedClient = null; private String base = null; CommonCache(Class<T> t, MemCachedClient client) { memCachedClient = client; base = t.getSimpleName() + "-"; } public T get(String key) { return (T) memCachedClient.get(base + key); } public boolean set(String key, T value) { return memCachedClient.set(base + key, value); } @Override public boolean update(String key, T value) { return memCachedClient.replace(base + key, value); } @Override public boolean delete(String key) { return memCachedClient.delete(base + key); } @Override public boolean add(String key, T value) { return memCachedClient.add(base + key, value); } }
package com.yx.cache;
import com.danga.MemCached.MemCachedClient;
import com.schooner.MemCached.SchoonerSockIOPool;
import com.yx.cache.util.HashCodeUtil;
import com.yx.task.ThreadPoolManager;
public class ClusterCache<T> implements Cache<T> {
private static MemCachedClient memCachedClient = null;
private static ThreadPoolManager taskManager = ThreadPoolManager
.getInstance("cache");
private String base = null;
private SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance();
ClusterCache(Class<T> t, MemCachedClient client) {
memCachedClient = client;
base = "i-" + t.getSimpleName() + "-";
}
@Override
public T get(String key) {
T value = null;
if (key == null) {
return null;
}
key = base + key;
if (pool.getServers().length < 2) {
value = (T) memCachedClient.get(key);
} else {
int hashCode = HashCodeUtil.getHash(key);
value = (T) memCachedClient.get(key, hashCode);
if (value == null) {
hashCode = this.getRehashCode(key, hashCode);
value = (T) memCachedClient.get(key, hashCode);
if (value != null) {// 如果在另外一臺伺服器上取到了快取,則恢復第一臺伺服器
UpdateTask task = new UpdateTask(key, value);
taskManager.submit(task);
}
}
}
return value;
}
@Override
public boolean set(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.set(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.set(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.set(key, value, hashCode);
// }
}
return result;
}
private int getRehashCode(String key, int oldHashcode) {
String host = pool.getHost(key, oldHashcode);
int rehashTries = 0;
// if (result) {
int hashCode = HashCodeUtil.getHash(rehashTries + key);
while (host.equals(pool.getHost(key, hashCode))) {
rehashTries++;
hashCode = HashCodeUtil.getHash(rehashTries + key);
}
return hashCode;
}
@Override
public boolean update(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.replace(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.replace(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.replace(key, value, hashCode);
// }
}
return result;
}
@Override
public boolean delete(String key) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.delete(key);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.delete(key, hashCode, null);
// if (result) {
hashCode = this.getRehashCode(key, hashCode);
memCachedClient.delete(key, hashCode, null);
// }
}
return result;
}
@Override
public boolean add(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length < 2) {
result = memCachedClient.add(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);
result = memCachedClient.add(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.add(key, value, hashCode);
// }
}
return result;
}
static class UpdateTask implements Runnable {
private String key;
private Object value;
UpdateTask(String key, Object value) {
this.key = key;
this.value = value;
}
@Override
public void run() {
memCachedClient.set(key, value, HashCodeUtil.getHash(key));
}
}
}
基於工廠模式建立memcached 儲存模式(通用模式還是叢集模式)
package com.yx.cache;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.danga.MemCached.MemCachedClient;
import com.danga.MemCached.SockIOPool;
import com.yx.cache.util.ConfigUtil;
public class CacheFactory {
private static MemCachedClient memCachedClient = null;
@SuppressWarnings("rawtypes")
private static final Map<String, Cache> map = new ConcurrentHashMap<String, Cache>();
static {
String serverStr = ConfigUtil.getConfigValue("servers", "");
List<String> servers = new ArrayList<String>();
for (String s : serverStr.split(",")) {
s = s.trim();
if (!"".equals(s)) {
servers.add(s);
}
}
if (servers.size() < 1) {
throw new RuntimeException("cache 初始化失敗!");
}
SockIOPool pool = SockIOPool.getInstance();
pool.setServers(servers.toArray(new String[] {}));
pool.setFailover(Boolean.valueOf(ConfigUtil.getConfigValue("failover",
"true")));
pool.setInitConn(Integer.valueOf(ConfigUtil.getConfigValue("initConn",
"100")));
pool.setMinConn(Integer.valueOf(ConfigUtil.getConfigValue("minConn",
"25")));
pool.setMaxConn(Integer.valueOf(ConfigUtil.getConfigValue("maxConn",
"250")));
pool.setMaintSleep(Integer.valueOf(ConfigUtil.getConfigValue(
"maintSleep", "30")));
pool.setNagle(Boolean.valueOf(ConfigUtil.getConfigValue("nagle",
"false")));// 關閉nagle演算法
pool.setSocketTO(Integer.valueOf(ConfigUtil.getConfigValue("socketTO",
"3000")));
pool.setAliveCheck(Boolean.valueOf(ConfigUtil.getConfigValue(
"aliveCheck", "true")));
pool.setHashingAlg(Integer.valueOf(ConfigUtil.getConfigValue(
"hashingAlg", "0")));
pool.setSocketConnectTO(Integer.valueOf(ConfigUtil.getConfigValue(
"socketConnectTO", "3000")));
String wStr = ConfigUtil.getConfigValue("weights", "");
List<Integer> weights = new ArrayList<Integer>();
for (String s : wStr.split(",")) {
s = s.trim();
if (!"".equals(s)) {
weights.add(Integer.valueOf(s));
}
}
if (weights.size() == servers.size()) {
pool.setWeights(weights.toArray(new Integer[] {}));
}
pool.initialize();
memCachedClient = new MemCachedClient();
}
public static <T> Cache<T> getCommonCache(Class<T> t) {
Cache<T> cache = map.get(t.getName());
if (cache == null) {
cache = createCommonCache(t);
}
return cache;
}
public static <T> Cache<T> getClusterCache(Class<T> t) {
Cache<T> cache = map.get("i-" + t.getName());
if (cache == null) {
cache = createClusterCache(t);
}
return cache;
}
private static synchronized <T> Cache<T> createCommonCache(Class<T> t) {
Cache<T> cache = map.get(t.getName());
if (cache == null) {
cache = new CommonCache<T>(t, memCachedClient);
map.put(t.getName(), cache);
}
return cache;
}
private static synchronized <T> Cache<T> createClusterCache(Class<T> t) {
Cache<T> cache = map.get(t.getName());
if (cache == null) {
cache = new ClusterCache<T>(t, memCachedClient);
map.put(t.getName(), cache);
}
return cache;
}
}
讀取配置檔案工具類封裝和生成Hash程式碼工具類
ConfigUtil.Java和HashCodeUtil.java
package com.yx.cache.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
public class ConfigUtil {
private static final String CONFILE = "cacheConfig.xml";
private static final Map<String, String> map = new HashMap<String, String>();
static {
SAXReader saxReader = new SAXReader();
InputStream ins = ConfigUtil.class.getClassLoader()
.getResourceAsStream(CONFILE);
try {
if (ins != null) {
Document doc = saxReader.read(ins);
Element root = doc.getRootElement();
Iterator<Element> iter = root.elementIterator();
while (iter.hasNext()) {
Element e = iter.next();
map.put(e.getName(), e.getTextTrim());
}
}
} catch (DocumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new RuntimeException("找不到配置檔案:" + CONFILE);
} finally {
try {
if (ins != null) {
ins.close();
} else {
throw new RuntimeException("找不到配置檔案:" + CONFILE);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static String getConfigValue(String key, String defaultValue) {
String tmp = map.get(key);
return isEmpty(tmp) ? defaultValue : tmp;
}
public static void main(String[] args) {
System.out.println(map);
}
private static boolean isEmpty(String str) {
if (str == null || "".equals(str)) {
return true;
}
return false;
}
}
package com.yx.cache.util;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.zip.CRC32;
import com.schooner.MemCached.SchoonerSockIOPool;
public class HashCodeUtil {
public static final int NATIVE_HASH = 0; // native String.hashCode();
public static final int OLD_COMPAT_HASH = 1; // original compatibility
public static final int NEW_COMPAT_HASH = 2; // new CRC32 based
public static final int CONSISTENT_HASH = 3; // MD5 Based -- Stops
private static int hashingAlg = SchoonerSockIOPool.getInstance()
.getHashingAlg();
/**
* Returns a bucket to check for a given key.
*
* @param key
* String key cache is stored under
* @return int bucket
*/
public static final int getHash(String key) {
switch (hashingAlg) {
case NATIVE_HASH:
return key.hashCode();
case OLD_COMPAT_HASH:
return origCompatHashingAlg(key);
case NEW_COMPAT_HASH:
return newCompatHashingAlg(key);
case CONSISTENT_HASH:
return md5HashingAlg(key);
default:
// use the native hash as a default
hashingAlg = NATIVE_HASH;
return key.hashCode();
}
}
private static int origCompatHashingAlg(String key) {
int hash = 0;
char[] cArr = key.toCharArray();
for (int i = 0; i < cArr.length; ++i) {
hash = (hash * 33) + cArr[i];
}
return hash;
}
private static int newCompatHashingAlg(String key) {
CRC32 checksum = new CRC32();
checksum.update(key.getBytes());
int crc = (int) checksum.getValue();
return (crc >> 16) & 0x7fff;
}
private static int md5HashingAlg(String key) {
MessageDigest md5 = MD5.get();
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
int res = ((bKey[3] & 0xFF) << 24) | ((bKey[2] & 0xFF) << 16)
| ((bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF);
return res;
}
private static ThreadLocal<MessageDigest> MD5 = new ThreadLocal<MessageDigest>() {
@Override
protected final MessageDigest initialValue() {
try {
return MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(" no md5 algorythm found");
}
}
};
}
對於叢集分散式快取還缺少一個工具類ThreadPoolManage.java
package com.yx.task;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author liuyuxiao
* @Date 2011-5-30 下午04:34:16
*/
public class ThreadPoolManager {
private static final Map<String, ThreadPoolManager> map = new HashMap<String, ThreadPoolManager>();
final int CORE_SIZE = 5;
private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newFixedThreadPool(CORE_SIZE);
public void submit(Runnable task) {
executor.submit(task);
}
public boolean finished() {
return executor.getCompletedTaskCount() == executor.getTaskCount();
}
private ThreadPoolManager() {
}
public static synchronized ThreadPoolManager getInstance(String key) {
ThreadPoolManager t = map.get(key);
if (t == null) {
t = new ThreadPoolManager();
map.put(key, t);
}
return t;
}
}
對於叢集快取模式和通用快取模式測試:
package com.yx.cache.test;
import com.yx.cache.Cache;
import com.yx.cache.CacheFactory;
public class TestCommonCache {
/**
* @param args
*/
public static void main(String[] args) {
Cache<String> cache = CacheFactory.getCommonCache(String.class);
int count = 0;
for (int i = 0; i < 100; i++) {
// cache.set("" + i, "Hello!" + i);
String result = cache.get("" + i);
// System.out.println(String.format("set( %d ): %s", i, success));
if (result == null) {
count++;
}
System.out.println(String.format("get( %d ): %s", i, result));
}
System.out.println(count);
// for (int i = 0; i < 500; i++) {
// MemTask task = new MemTask();
// Thread t = new Thread(task);
// t.start();
// }
}
}
package com.yx.cache.test;
import com.yx.cache.Cache;
import com.yx.cache.CacheFactory;
public class TestClusterCache {
public static void main(String[] args) {
Cache<String> cache = CacheFactory.getClusterCache(String.class);
int count = 0;
for (int i = 0; i < 100; i++) {
// cache.set("" + i, "Hello!" + i);
String result = cache.get("" + i);
// System.out.println(String.format("set( %d ): %s", i, success));
if (result == null) {
count++;
}
System.out.println(String.format("get( %d ): %s", i, result));
}
}
}