janusgraph原始碼分析8-底層互動
反向分析
cassandra 寫資料 API
cassandra 的結構類似 bigtable ,資料實際上是多層巢狀的 map,第一個 key 是 rowkey,第二層key 是 columnFamily,第三層key 是 column,第四層(也可以忽略) 是 timestamp,然後是 value。
寫資料的 API 如下:
CTConnection conn = null;
try {
conn = pool.borrowObject(keySpaceName);
Cassandra.Client client = conn.getClient();
if (atomicBatch) {
client.atomic_batch_mutate(batch, consistency);
} else {
client.batch_mutate(batch, consistency);
}
} catch (Exception ex) {
throw CassandraThriftKeyColumnValueStore.convertException(ex);
} finally {
pool.returnObjectUnsafe(keySpaceName, conn) ;
}
這裡的 batch 就是一個多層巢狀的map。final Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch = new HashMap<>(size);
這裡看起來只有兩層,第一層的 ByteBuffer 當然是 rowKey,第二層是 String 是 columnFamily。而 List<org.apache.cassandra.thrift.Mutation>
很明顯就是新增或者刪除的 key:value。
寫入 cassandra 的資料格式
上面是寫 cassandra 的 API,而最終呼叫這段程式碼的位置在 CassandraThriftStoreManager.mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh)
方法。
我們需要了解的就是 Map<String, Map<StaticBuffer, KCVMutation>> mutations
和 Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch
的對應關係。
從程式碼可以看出:
final Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch = new HashMap<>(size);
for (final Map.Entry<String, Map<StaticBuffer, KCVMutation>> keyMutation : mutations.entrySet()) {
// mutations 的 key 是 columnFamily
final String columnFamily = keyMutation.getKey();
for (final Map.Entry<StaticBuffer, KCVMutation> mutEntry : keyMutation.getValue().entrySet()) {
// mutations 的第二層 key 是 rowKey
ByteBuffer keyBB = mutEntry.getKey().asByteBuffer();
// Get or create the single Cassandra Mutation object responsible for this key
// Most mutations only modify the edgeStore and indexStore
final Map<String, List<org.apache.cassandra.thrift.Mutation>> cfmutation
= batch.computeIfAbsent(keyBB, k -> new HashMap<>(3));
final KCVMutation mutation = mutEntry.getValue();
final List<org.apache.cassandra.thrift.Mutation> thriftMutation = new ArrayList<>(mutations.size());
// 省略刪除的程式碼。
if (mutation.hasAdditions()) {
for (final Entry ent : mutation.getAdditions()) {
final ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
// mutations 的第三層 key 是 column
final Column column = new Column(ent.getColumnAs(StaticBuffer.BB_FACTORY));
// mutations 的 value 是 value
column.setValue(ent.getValueAs(StaticBuffer.BB_FACTORY));
column.setTimestamp(commitTime.getAdditionTime(times));
final Integer ttl = (Integer) ent.getMetaData().get(EntryMetaData.TTL);
if (null != ttl && ttl > 0) {
column.setTtl(ttl);
}
columnOrSuperColumn.setColumn(column);
org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
m.setColumn_or_supercolumn(columnOrSuperColumn);
thriftMutation.add(m);
}
}
cfmutation.put(columnFamily, thriftMutation);
}
}
我們可以看出 mutateMany 方法的引數和寫到 cassandra 的結果不是完全一致,主要是 rowkey 和 columnFamily 的位置是反的。
傳入 mutateMany 的資料
通過除錯可以看出,呼叫 mutateMany 的地方主要是 CacheTransation.persist
,而呼叫 persist 的就是 flushInternal 方法。相應程式碼:
// 成員變數: Map<KCVSCache, Map<StaticBuffer, KCVEntryMutation>> mutations
// 新建Map,這個 map 就是上面 mutateMany 的引數,key 分別是 columnFamily 和 rowKey ,
final Map<String, Map<StaticBuffer, KCVMutation>> subMutations = new HashMap<>(mutations.size());
int numSubMutations = 0;
// 遍歷 mutations
for (Map.Entry<KCVSCache,Map<StaticBuffer, KCVEntryMutation>> storeMutations : mutations.entrySet()) {
final Map<StaticBuffer, KCVMutation> sub = new HashMap<>();
// KCVSCache 的 getKey().getName() 就是 columnFamily
subMutations.put(storeMutations.getKey().getName(),sub);
// mutations 的 value
for (Map.Entry<StaticBuffer,KCVEntryMutation> mutationsForKey : storeMutations.getValue().entrySet()) {
if (mutationsForKey.getValue().isEmpty()) continue;
// 將 mutationsForKey 放進去,這個 convert 做了啥沒有具體研究,可能只是一個適配。
sub.put(mutationsForKey.getKey(), convert(mutationsForKey.getValue()));
numSubMutations+=mutationsForKey.getValue().getTotalMutations();
if (numSubMutations>= persistChunkSize) {
numSubMutations = persist(subMutations);
sub.clear();
subMutations.put(storeMutations.getKey().getName(),sub);
}
}
}
mutations 的構造
上面我們看出了,其實基本上沒複雜處理,接下來我們看看 mutations 資料哪裡來的。
對於 mutations 的修改操作,來自於 mutate 方法,程式碼:
// 傳入的是 store(包含了columnFamily) key(rowKey) additions 和 deletions
void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry> deletions) throws BackendException {
Preconditions.checkNotNull(store);
if (additions.isEmpty() && deletions.isEmpty()) return;
// 構造 KCVEntryMutation
KCVEntryMutation m = new KCVEntryMutation(additions, deletions);
// 這幾步就是簡單的合併所以的 additions 和 deletions
final Map<StaticBuffer, KCVEntryMutation> storeMutation = mutations.computeIfAbsent(store, k -> new HashMap<>());
KCVEntryMutation existingM = storeMutation.get(key);
if (existingM != null) {
existingM.merge(m);
} else {
storeMutation.put(key, m);
}
numMutations += m.getTotalMutations();
if (batchLoading && numMutations >= persistChunkSize) {
flushInternal();
}
}
mutate 方法引數來源
mutate 方法傳入的是 store(包含了columnFamily) key(rowKey) additions 和 deletions,這幾個引數哪裡來的呢? KCVSCache 的 mutateEntries, mutateEdges 呼叫時機呢? edgeStore.mutateEntries(key, additions, deletions, storeTx); indexStore.mutateEntries(key, additions, deletions, storeTx); 我們先以 edgeStore 為例,在 StandardJanusGraph 的 prepareCommit 方法中,呼叫了 mutator.mutateEdges(vertexKey, additions, deletions); 程式碼如下,我們刪掉了部分程式碼,包括 索引和資料刪除。
ListMultimap<Long, InternalRelation> mutations = ArrayListMultimap.create();
ListMultimap<InternalVertex, InternalRelation> mutatedProperties = ArrayListMultimap.create();
List<IndexSerializer.IndexUpdate> indexUpdates = Lists.newArrayList();
//2) Collect added edges and their index updates and acquire edge locks
// add 是 InternalRelation ,包括 VertexProperty 和 Edge,前面分析過,VertexProperty 實際上就是頂點和一個 schema 的訂單建一條邊,Edge 就是兩個頂點建一條邊。
for (InternalRelation add : Iterables.filter(addedRelations,filter)) {
Preconditions.checkArgument(add.isNew());
// getLen 返回這個 Relation 的長度,如果是 VertexProperty 是1,Edge 是需要根據方向進行判斷
for (int pos = 0; pos < add.getLen(); pos++) {
// 得到對應的 vertex
InternalVertex vertex = add.getVertex(pos);
if (pos == 0 || !add.isLoop()) {
// mutatedProperties 的 key: InternalVertex,value:InternalRelation,mutatedProperties 是用於更新索引的,在我們這裡實際上沒什麼用。
if (add.isProperty()) mutatedProperties.put(vertex,add);
// mutations 的 key : vertexId, value : InternalRelation
mutations.put(vertex.longId(), add);
}
if (!vertex.isNew() && acquireLock(add,pos,acquireLocks)) {
Entry entry = edgeSerializer.writeRelation(add, pos, tx);
mutator.acquireEdgeLock(idManager.getKey(vertex.longId()), entry.getColumn());
}
}
}
//5) Add relation mutations
for (Long vertexId : mutations.keySet()) {
Preconditions.checkArgument(vertexId > 0, "Vertex has no id: %s", vertexId);
final List<InternalRelation> edges = mutations.get(vertexId);
final List<Entry> additions = new ArrayList<>(edges.size());
final List<Entry> deletions = new ArrayList<>(Math.max(10, edges.size() / 10));
for (final InternalRelation edge : edges) {
// 得到 InternalRelationType ,分為 PropertyKey 和 EdgeLabel 兩類
final InternalRelationType baseType = (InternalRelationType) edge.getType();
assert baseType.getBaseType()==null;
for (InternalRelationType type : baseType.getRelationIndexes()) {
if (type.getStatus()== SchemaStatus.DISABLED) continue;
// getArity 和 getLen 不一樣,
for (int pos = 0; pos < edge.getArity(); pos++) {
if (!type.isUnidirected(Direction.BOTH) && !type.isUnidirected(EdgeDirection.fromPosition(pos)))
continue; //Directionality is not covered
// 如果是起始頂點
if (edge.getVertex(pos).longId()==vertexId) {
// 根據 edge type pos tx 得到應該序列化的 StaticArrayEntry
StaticArrayEntry entry = edgeSerializer.writeRelation(edge, type, pos, tx);
if (edge.isRemoved()) {
deletions.add(entry);
} else {
Preconditions.checkArgument(edge.isNew());
int ttl = getTTL(edge);
if (ttl > 0) {
entry.setMetaData(EntryMetaData.TTL, ttl);
}
additions.add(entry);
}
}
}
}
}
StaticBuffer vertexKey = idManager.getKey(vertexId);
mutator.mutateEdges(vertexKey, additions, deletions);
}
edgeSerializer.writeRelation 到底做了什麼
我們現在就想知道,資料是怎麼被序列化話 entry 的,程式碼如下:
public StaticArrayEntry writeRelation(InternalRelation relation, InternalRelationType type, int position,
TypeInspector tx) {
assert type==relation.getType() || (type.getBaseType() != null
&& type.getBaseType().equals(relation.getType()));
// 得到方向,pos 是 0 就是 out,是 1 就是 in
Direction dir = EdgeDirection.fromPosition(position);
// 方向驗證
Preconditions.checkArgument(type.isUnidirected(Direction.BOTH) || type.isUnidirected(dir));
// 得到 typeId, 這個 type 是 VertexLabel 或者 PropertyKey
long typeId = type.longId();
// 得到 dirID
DirectionID dirID = getDirID(dir, relation.isProperty() ? RelationCategory.PROPERTY : RelationCategory.EDGE);
// 得到 一個 out
DataOutput out = serializer.getDataOutput(DEFAULT_CAPACITY);
// key 和 value 的分割地址。
int valuePosition;
// 寫 typeId 和 dirID
IDHandler.writeRelationType(out, typeId, dirID, type.isInvisibleType());
// 得到 multiplicity 和 sortKey
Multiplicity multiplicity = type.multiplicity();
long[] sortKey = type.getSortKey();
assert !multiplicity.isConstrained() || sortKey.length==0: type.name();
int keyStartPos = out.getPosition();
if (!multiplicity.isConstrained()) {
// 如果 multiplicity 是 沒有限制,也就是為 MULTI,必須要有 sortKey,寫出 sortKey。
writeInlineTypes(sortKey, relation, out, tx, InlineType.KEY);
}
// 到這裡 key 就寫完了,得到 key 的 pos
int keyEndPos = out.getPosition();
long relationId = relation.longId();
//How multiplicity is handled for edges and properties is slightly different
if (relation.isEdge()) {
// 得到另一個 vertex 的 id
long otherVertexId = relation.getVertex((position + 1) % 2).longId();
// 如果 multiplicity 有限制
if (multiplicity.isConstrained()) {
// isUnique
if (multiplicity.isUnique(dir)) {
// 得到 valuePosition ,寫出 otherVertexId
valuePosition = out.getPosition();
VariableLong.writePositive(out, otherVertexId);
} else {
// 反方向寫 otherVertexId ,記下 valuePosition
VariableLong.writePositiveBackward(out, otherVertexId);
valuePosition = out.getPosition();
}
// 寫下 relationId
VariableLong.writePositive(out, relationId);
} else {
// 沒有限制,反方向寫出 otherVertexId 和 relationId ,記下 valuePosition
VariableLong.writePositiveBackward(out, otherVertexId);
VariableLong.writePositiveBackward(out, relationId);
valuePosition = out.getPosition();
}
} else { // PropertyKey
assert relation.isProperty();
Preconditions.checkArgument(relation.isProperty());
// 得到 property 的值。
Object value = ((JanusGraphVertexProperty) relation).value();
Preconditions.checkNotNull(value);
PropertyKey key = (PropertyKey) type;
assert key.dataType().isInstance(value);
// 寫出 value 得到 valuePosition
if (multiplicity.isConstrained()) { // 沒有限制的 property
if
相關推薦
janusgraph原始碼分析8-底層互動
反向分析
cassandra 寫資料 API
cassandra 的結構類似 bigtable ,資料實際上是多層巢狀的 map,第一個 key 是 rowkey,第二層key 是 columnFamily,第三層key 是 column,第四層(也可以忽略)
elasticSearch6原始碼分析(8)RepositoriesModule模組
1.RepositoriesModule概述
Sets up classes for Snapshot/Restore
1.1 snapshot概述
A snapshot is a backup taken from a running Elasticsearch cluster. Yo
Leveldb原始碼分析--8
6 SSTable之2
6.4 建立sstable檔案
瞭解了sstable檔案的儲存格式,以及Data Block的組織,下面就可以分析如何建立sstable檔案了。相關程式碼在table_builder.h/.cc以及block_builder.h/.cc(
Android 7.0 Gallery相簿原始碼分析8
在Android 7.0 Gallery相簿原始碼分析3 - 資料載入及顯示流程一文最後講了AlbumSetSlidingWindow的onContentChanged方法,專輯縮圖和縮圖下面的label的載入就是在此方法中完成的
public
dubbo原始碼分析8 -- DubboProtocol 之提供端釋出服務export
在前面提到,在RegistryProtocol釋出服務時,首先會dubbo對外提供介面
根據url的地址,協議是dubbo,呼叫protocol.export(…), 但是根據ExtensionLoader.getExtensionLoader獲取的到的pro
ffdshow 原始碼分析 8: 視訊解碼器類(TvideoCodecDec)
=====================================================ffdshow原始碼分析系列文章列表:=====================================================前面兩篇文章介紹了ffds
RTMPdump(libRTMP) 原始碼分析 8: 傳送訊息(Message)
=====================================================RTMPdump(libRTMP) 原始碼分析系列文章:=====================================================函式呼叫
lucene原始碼分析---8
lucene原始碼分析—查詢過程
本章開始介紹lucene的查詢過程,即IndexSearcher的search函式,
IndexSearcher::search
public TopDocs search(Query query, int n)
[Abp vNext 原始碼分析] - 8. 審計日誌
一、簡要說明
ABP vNext 當中的審計模組早在 依賴注入與攔截器一文中有所提及,但沒有詳細的對其進行分析。
審計模組是 ABP vNext 框架的一個基本元件,它能夠提供一些實用日誌記錄。不過這裡的日誌不是說系統日誌,而是說介面每次呼叫之後的執行情況(執行時間、傳入引數、異常資訊、請求 IP)。
除了常
Java中HashMap底層實現原理(JDK1.8)原始碼分析
在JDK1.6,JDK1.7中,HashMap採用位桶+連結串列實現,即使用連結串列處理衝突,同一hash值的連結串列都儲存在一個連結串列裡。但是當位於一個桶中的元素較多,即hash值相等的元素較多時,通過key值依次查詢的效率較低。而JDK1.8中,HashMap採用位桶+
(轉載)Java中HashMap底層實現原理(JDK1.8)原始碼分析
近期在看一些java底層的東西,看到一篇分析hashMap不錯的文章,跟大家分享一下。
在JDK1.6,JDK1.7中,HashMap採用位桶+連結串列實現,即使用連結串列處理衝突,同一hash值的連結串列都儲存在一個連結串列裡。但是當位於一個桶中的元素較多,即hash值
圖解Janusgraph系列-圖資料底層序列化原始碼分析(Data Serialize)
# 圖解Janusgraph系列-圖資料底層序列化原始碼分析(Data Serialize)
大家好,我是`洋仔`,JanusGraph圖解系列文章,`實時更新`~
#### 圖資料庫文章總目錄:
* **整理所有圖相關文章,請移步(超鏈):**[圖資料庫系列-文章總目錄 ](https://li
菜鳥帶你看原始碼——看不懂你打我ArrayList原始碼分析(基於java 8)
文章目錄
看原始碼並不難
軟體環境
成員變數:
構造方法
核心方法
get方法
remove方法
add方法
結束
看原始碼並不難
如何學好程式設計?如何寫出優質的程式碼?如
Mybatis 原始碼分析(8)—— 一二級快取
一級快取
其實關於 Mybatis 的一級快取是比較抽象的,並沒有什麼特別的配置,都是在程式碼中體現出來的。
當呼叫 Configuration 的 newExecutor 方法來建立 executor:
public Executor newExecutor(Transac
Java -- 基於JDK1.8的ArrayList原始碼分析
1,前言
很久沒有寫部落格了,很想念大家,18年都快過完了,才開始寫第一篇,爭取後面每週寫點,權當是記錄,因為最近在看JDK的Collection,而且ArrayList原始碼這一塊也經常被面試官問道,所以今天也就和大家一起來總結一下
2,原始碼解讀
當我們一般提到ArrayLi
java中排序原始碼分析(JDK1.8)
List排序
在開發過程中常用的是jdk自帶的排序
Collections.sort(List<T> list, Comparator<? super T> c);
開啟原始碼如下:
@SuppressWarnings({"unchecked",
HashMap的實現原理和底層結構 圖解+原始碼分析
雜湊表(hash table)也叫散列表,是一種非常重要的資料結構,應用場景及其豐富,許多快取技術(比如memcached)的核心其實就是在記憶體中維護一張大的雜湊表,而HashMap的實現原理也常常出現在各類的面試題中,重要性可見一斑。本文會對java集合框架中的對應實現HashMap的實現原理
Java_59_陣列_模擬ArrayList容器的底層實現JDK原始碼分析程式碼
package cn.pmcse.myCollection;
public class ArrayList { private Object[] value; private in
Tomcat 原始碼分析 WebappClassLoader 分析 (基於8.0.5)
0. 疑惑
在剛接觸 Tomcat 中的ClassLoader時心中不免冒出的疑惑: "Tomcat 裡面是怎麼樣設計ClassLoader的, 這樣設計有什麼好處?"; 我們先把這個問題留著, 到最後在看 !
1. Java 中 ClassLoader 類別
1. BootstrapC
【8】netty4原始碼分析-flush
轉自 http://xw-z1985.iteye.com/blog/1971904
Netty的寫操作由兩個步驟組成:
Write:將msg儲存到ChannelOutboundBuffer中 Flush:將msg從ChannelOutboundBuffer中flush到套接字的傳送緩