1. 程式人生 > >leveldb簡介與java實現demo

leveldb簡介與java實現demo

簡介

1.簡介

Leveldb是一個google實現的非常高效的kv資料庫,目前的版本1.2能夠支援billion級別的資料量了。 在這個數量級別下還有著非常高的效能,主要歸功於它的良好的設計。特別是LSM演算法。

2特點

LevelDB 是單程序的服務,效能非常之高,在一臺4核Q6600的CPU機器上,每秒鐘寫資料超過40w,而隨機讀的效能每秒鐘超過10w。 此處隨機讀是完全命中記憶體的速度,如果是不命中 速度大大下降

3侷限

LevelDB 只是一個 C/C++ 程式語言的庫, 不包含網路服務封裝, 所以無法像一般意義的儲存伺服器(如 MySQL)那樣, 用客戶端來連線它. LevelDB 自己也宣告, 使用者應該封裝自己的網路伺服器.

4實現介紹

leveldb中有一系列引數會與讀寫的效率有關,將相關的配置以及編譯常量統一修改成可執行時配置引數,測試選取最佳配置值。

應用場景

  1. 記憶體佔用大的服務
  • leveldb是持久化型資料庫
  1. 寫多於讀
  • 寫操作要大大快於讀操作,而順序讀寫操作則大大快於隨機讀寫操作。官方網站報道其隨機寫效能達到40萬條記錄每秒,而隨機讀效能達到6萬條記錄每秒

涉及專案

  • ActiveMQ

限制和弊端

  1. key和value資料尺寸不能太大,在KB級別,如果儲存較大的key或者value,將對leveld的讀寫效能都有較大的影響。每個key所對應的內容不宜太大,超過32KB效能就會下降很快
  2. 一次只允許一個程序訪問一個特定的資料庫
  3. 沒有內建的C/S架構,但開發者可以使用LevelDB庫自己封裝一個server,不具備“分散式”叢集架構能力
  • 百度開源的分散式檔案系統BFS(開源地址:https://github.com/baidu/bfs)提 供了mount工具,可以將整個分散式檔案系統直接掛載到本地目錄,從而可以像操作本地檔案一樣來操作分散式檔案系統中的檔案,我們可以利用分散式檔案系統本身提供的資料高可靠特性來保證leveldb中資料的安全。
  1. 不支援索引
  2. 不支援事務

java呼叫

原生leveldb是基於C++開發,java語言無法直接使用;iq80對leveldb使用JAVA 語言進行了“逐句”重開發,經過很多大型專案的驗證(比如ActiveMQ),iq80開發的JAVA版leveldb在效能上損失極少(10%)。對於JAVA開發人員來說,我們直接使用即可,無需額外的安裝其他lib。

java maven依賴

<dependency>
	<groupId>org.iq80.leveldb</groupId>
	<artifactId>leveldb</artifactId>
	<version>0.7</version>
</dependency>
<dependency>
	<groupId>org.iq80.leveldb</groupId>
	<artifactId>leveldb-api</artifactId>
	<version>0.7</version>
</dependency>

備註:最新版0.10, 2017年12月

測試demo

package cn.demo.leveldb;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Map;

import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBFactory;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
import org.iq80.leveldb.Snapshot;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.WriteOptions;
import org.iq80.leveldb.impl.Iq80DBFactory;
import org.junit.Test;

/**
 * 各介面測試
 * @author lym
 *
 */
public class LevelDBDemoTest {

	private static final String PATH = "/data/leveldb";
	private static final Charset CHARSET = Charset.forName("utf-8");
	private static final File FILE = new File(PATH);
	
	@Test
	public void putTest() {
		DBFactory factory = new Iq80DBFactory();
		// 預設如果沒有則建立
		Options options = new Options();
		File file = new File(PATH);
		DB db = null;
		try {
			db = factory.open(file, options);
			byte[] keyByte1 = "key-01".getBytes(CHARSET);
			byte[] keyByte2 = "key-02".getBytes(CHARSET);
			// 會寫入磁碟中
			db.put(keyByte1, "value-01".getBytes(CHARSET));
			db.put(keyByte2, "value-02".getBytes(CHARSET));
			String value1 = new String(db.get(keyByte1), CHARSET);
			System.out.println(value1);
			System.out.println(new String(db.get(keyByte2), CHARSET));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	@Test
	public void readFromSnapshotTest() {
		DBFactory factory = new Iq80DBFactory();
		File file = new File(PATH);
		Options options = new Options();
		DB db = null;
		try {
			db = factory.open(file, options);
			// 讀取當前快照,重啟服務仍能讀取,說明快照持久化至磁碟,
			Snapshot snapshot = db.getSnapshot();
			// 讀取操作
			ReadOptions readOptions = new ReadOptions();
			// 遍歷中swap出來的資料,不應該儲存在memtable中。
			readOptions.fillCache(false);
			// 預設snapshot為當前
			readOptions.snapshot(snapshot);
			
			DBIterator it = db.iterator(readOptions);
			while (it.hasNext()) {
				Map.Entry<byte[], byte[]> entry = (Map.Entry<byte[], byte[]>) it
						.next();
				String key = new String(entry.getKey(), CHARSET);
				String value = new String(entry.getValue(), CHARSET);
				System.out.println("key: " + key + " value: " + value);
				if (key.equals("key-01")) {
					System.out.println("".equals(value));
				}
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	@Test
	public void snapshotTest() {
		DBFactory factory = new Iq80DBFactory();
		Options options = new Options();
		DB db = null;
		try {
			db = factory.open(FILE, options);
			
			db.put("key-04".getBytes(CHARSET), "value-04".getBytes(CHARSET));
			// 只能之前到getSnapshot之前put的值,之後的無法獲取,即讀取期間資料的變更,不會反應出來
			Snapshot snapshot = db.getSnapshot();
			db.put("key-05".getBytes(CHARSET), "value-05".getBytes(CHARSET));
			ReadOptions readOptions = new ReadOptions();
			readOptions.fillCache(false);
			readOptions.snapshot(snapshot);
			DBIterator it = db.iterator(readOptions);
			while (it.hasNext()) {
				Map.Entry<byte[],byte[]> entry = (Map.Entry<byte[],byte[]>) it
						.next();
				String key = new String(entry.getKey(), CHARSET);
				String value = new String(entry.getValue(), CHARSET);
				System.out.println("key: " + key + " value: " + value);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	@Test
	public void writeOptionsTest() {
		DBFactory factory = new Iq80DBFactory();
		Options options = new Options();
		DB db = null;
		try {
			db = factory.open(FILE, options);
			WriteOptions writeOptions = new WriteOptions().sync(true);	// 執行緒安全
			// 沒有writeOptions時,會new一個,所以猜測這裡添加了這個引數的意義就是可以設定sync和snapshot引數,建議採用這種方式
			db.put("key-06".getBytes(CHARSET), "value-06".getBytes(CHARSET), writeOptions);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	@Test
	public void deleteTest() {
		DBFactory factory = new Iq80DBFactory();
		Options options = new Options();
		DB db = null;
		try {
			db = factory.open(FILE, options);
			// 存在會刪除,之後查詢不出,根據說明可能不是真刪除,而是新增一個標記,待測試(大量資料之後刪除,檔案大小是否明顯變化)
			db.delete("key-02".getBytes(CHARSET));
			// 不存在不會報錯
			db.delete("key02".getBytes(CHARSET));
			
			Snapshot snapshot = db.getSnapshot();
			ReadOptions readOptions = new ReadOptions();
			readOptions.fillCache(false);
			readOptions.snapshot(snapshot);
			
			DBIterator it = db.iterator(readOptions);
			while (it.hasNext()) {
				Map.Entry<byte[], byte[]> entry = (Map.Entry<byte[], byte[]>) it
						.next();
				String key = new String(entry.getKey(), CHARSET);
				String value = new String(entry.getValue(), CHARSET);
				System.out.println("key: " + key + " value: " + value);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	@Test
	public void writeBatchTest() {
		DBFactory factory = Iq80DBFactory.factory;
		Options options = new Options();
		DB db = null;
		try {
			db = factory.open(FILE, options);
			// 批量儲存,批量修改
			WriteBatch writeBatch = db.createWriteBatch();
			writeBatch.put("key-07".getBytes(CHARSET), "value-07".getBytes(CHARSET));
			writeBatch.put("key-08".getBytes(CHARSET), "value-08".getBytes(CHARSET));
			writeBatch.put("key-09".getBytes(CHARSET), "value-09".getBytes(CHARSET));
			// 這裡也可以新增writeOptions
			db.write(writeBatch);
			
			DBIterator it = db.iterator();
			while (it.hasNext()) {
				Map.Entry<byte[], byte[]> entry = (Map.Entry<byte[], byte[]>) it
						.next();
				String key = new String(entry.getKey(), CHARSET);
				String value = new String(entry.getValue(), CHARSET);
				System.out.println("key: " + key + " value: " + value);
			}
			it.close();
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	@Test
	public void writeBatchDeleteTest() {
		DBFactory factory = Iq80DBFactory.factory;
		Options options = new Options();
		DB db = null;
		try {
			db = factory.open(FILE, options);
			WriteBatch writeBatch = db.createWriteBatch();
			writeBatch.put("key-10".getBytes(CHARSET), "value-10".getBytes(CHARSET));
			writeBatch.put("key-11".getBytes(CHARSET), "value-11".getBytes(CHARSET));
			// 會將key-01的value置為""
			writeBatch.delete("key-01".getBytes(CHARSET));
			db.write(writeBatch);
			writeBatch.close();
			DBIterator it = db.iterator();
			while (it.hasNext()) {
				Map.Entry<byte[], byte[]> entry = (Map.Entry<byte[], byte[]>) it
						.next();
				String key = new String(entry.getKey(), CHARSET);
				String value = new String(entry.getValue(), CHARSET);
				System.out.println("key: " + key + " value: " + value);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
}

效能測試

資料量 儲存 隨機讀一條 順序讀一條 順序讀所有 磁碟佔用 測試環境 速率
10w 494ms 36ms -- 174ms 9.75M windows 20.29w/s
40w 2772ms 46ms 1ms 551ms 28.6M windows 14.43w/s
100w 5533ms 44ms 1ms 457ms 71.5M windows 18.07w/s
1000w 117661ms 11m+ -- -- 715M windows --

資料型別

1. 物件

需要實現序列化介面

public class DataTest {

	private static final String PATH = "/data/leveldb";
	private static final File FILE = new File(PATH);
	private static final Charset CHARSET = Charset.forName("UTF-8");
	
	@Test
	public void writeObject() {
		Options options = new Options();
		DBFactory factory = Iq80DBFactory.factory;
		DB db = null;
		try {
			db = factory.open(FILE, options);
			User user = new User();
			user.setName("admin");
			user.setSex("男");
			WriteOptions writeOptions = new WriteOptions();
			writeOptions.snapshot(true);
			
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			ObjectOutputStream oos = new ObjectOutputStream(baos);
			oos.writeObject(user);
			
			db.put(user.getName().getBytes(CHARSET), baos.toByteArray());
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	@Test
	public void readObject() {
		Options options = new Options();
		DBFactory factory = Iq80DBFactory.factory;
		DB db = null;
		try {
			db = factory.open(FILE, options);
			byte[] valueByte = db.get("admin".getBytes(CHARSET));
			ByteArrayInputStream bais = new ByteArrayInputStream(valueByte);
			ObjectInputStream ois = new ObjectInputStream(bais);
			User user = (User) ois.readObject();
			System.out.println(user);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (db != null) {
				try {
					db.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
}

2. 物件集合

@Test
public void writeObjectList() {
	Options options = new Options();
	DBFactory factory = Iq80DBFactory.factory;
	DB db = null;
	try {
		db = factory.open(FILE, options);
		List<User> userList = new ArrayList<User>();
		User user = new User();
		user.setName("admin");
		user.setSex("男");
		User user2 = new User();
		user2.setName("root");
		user2.setSex("女");
		userList.add(user);
		userList.add(user2);
		WriteOptions writeOptions = new WriteOptions();
		writeOptions.snapshot(true);
		
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		ObjectOutputStream oos = new ObjectOutputStream(baos);
		oos.writeObject(userList);
		
		db.put("userList".getBytes(CHARSET), baos.toByteArray());
	} catch (IOException e) {
		e.printStackTrace();
	} finally {
		if (db != null) {
			try {
				db.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
@Test
public void readObjectList() {
	Options options = new Options();
	DBFactory factory = Iq80DBFactory.factory;
	DB db = null;
	try {
		db = factory.open(FILE, options);
		byte[] valueByte = db.get("userList".getBytes(CHARSET));
		ByteArrayInputStream bais = new ByteArrayInputStream(valueByte);
		ObjectInputStream ois = new ObjectInputStream(bais);
		List<User> userList = new ArrayList<User>();
		userList = (List<User>) ois.readObject();
		System.out.println(userList);
	} catch (IOException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	} catch (ClassNotFoundException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	} finally {
		if (db != null) {
			try {
				db.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

參考: