1. 程式人生 > >janusgraph原始碼分析8-底層互動

janusgraph原始碼分析8-底層互動

反向分析

cassandra 寫資料 API

cassandra 的結構類似 bigtable ,資料實際上是多層巢狀的 map,第一個 key 是 rowkey,第二層key 是 columnFamily,第三層key 是 column,第四層(也可以忽略) 是 timestamp,然後是 value。

寫資料的 API 如下:

 CTConnection conn = null;
 try {
     conn = pool.borrowObject(keySpaceName);
     Cassandra.Client client = conn.getClient();
     if
(atomicBatch) { client.atomic_batch_mutate(batch, consistency); } else { client.batch_mutate(batch, consistency); } } catch (Exception ex) { throw CassandraThriftKeyColumnValueStore.convertException(ex); } finally { pool.returnObjectUnsafe(keySpaceName, conn)
; }

這裡的 batch 就是一個多層巢狀的map。final Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch = new HashMap<>(size);

這裡看起來只有兩層,第一層的 ByteBuffer 當然是 rowKey,第二層是 String 是 columnFamily。而 List<org.apache.cassandra.thrift.Mutation> 很明顯就是新增或者刪除的 key:value。

寫入 cassandra 的資料格式

上面是寫 cassandra 的 API,而最終呼叫這段程式碼的位置在 CassandraThriftStoreManager.mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) 方法。

我們需要了解的就是 Map<String, Map<StaticBuffer, KCVMutation>> mutationsMap<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch 的對應關係。

從程式碼可以看出:

final Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch = new HashMap<>(size);

for (final Map.Entry<String, Map<StaticBuffer, KCVMutation>> keyMutation : mutations.entrySet()) {
    
    // mutations 的 key 是 columnFamily
    final String columnFamily = keyMutation.getKey(); 
    
    for (final Map.Entry<StaticBuffer, KCVMutation> mutEntry : keyMutation.getValue().entrySet()) {
        
        // mutations 的第二層 key 是 rowKey
        ByteBuffer keyBB = mutEntry.getKey().asByteBuffer();

        // Get or create the single Cassandra Mutation object responsible for this key
        // Most mutations only modify the edgeStore and indexStore
        
        final Map<String, List<org.apache.cassandra.thrift.Mutation>> cfmutation
            = batch.computeIfAbsent(keyBB, k -> new HashMap<>(3));

        final KCVMutation mutation = mutEntry.getValue();
        final List<org.apache.cassandra.thrift.Mutation> thriftMutation = new ArrayList<>(mutations.size());
        
        // 省略刪除的程式碼。
        
        if (mutation.hasAdditions()) {
            
            for (final Entry ent : mutation.getAdditions()) {
                final ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
                
                // mutations 的第三層 key 是 column
                final Column column = new Column(ent.getColumnAs(StaticBuffer.BB_FACTORY));
                // mutations 的 value 是 value
                column.setValue(ent.getValueAs(StaticBuffer.BB_FACTORY));

                column.setTimestamp(commitTime.getAdditionTime(times));

                final Integer ttl = (Integer) ent.getMetaData().get(EntryMetaData.TTL);
                if (null != ttl && ttl > 0) {
                    column.setTtl(ttl);
                }

                columnOrSuperColumn.setColumn(column);
                org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
                m.setColumn_or_supercolumn(columnOrSuperColumn);
                thriftMutation.add(m);
            }
        }

        cfmutation.put(columnFamily, thriftMutation);
    }
}

我們可以看出 mutateMany 方法的引數和寫到 cassandra 的結果不是完全一致,主要是 rowkey 和 columnFamily 的位置是反的。

傳入 mutateMany 的資料

通過除錯可以看出,呼叫 mutateMany 的地方主要是 CacheTransation.persist ,而呼叫 persist 的就是 flushInternal 方法。相應程式碼:

// 成員變數: Map<KCVSCache, Map<StaticBuffer, KCVEntryMutation>> mutations

// 新建Map,這個 map 就是上面 mutateMany 的引數,key 分別是 columnFamily 和 rowKey ,
final Map<String, Map<StaticBuffer, KCVMutation>> subMutations = new HashMap<>(mutations.size());

int numSubMutations = 0;
// 遍歷 mutations
for (Map.Entry<KCVSCache,Map<StaticBuffer, KCVEntryMutation>> storeMutations : mutations.entrySet()) {
    final Map<StaticBuffer, KCVMutation> sub = new HashMap<>();
    
    // KCVSCache 的 getKey().getName() 就是 columnFamily
    subMutations.put(storeMutations.getKey().getName(),sub);
   
    // mutations 的 value
    for (Map.Entry<StaticBuffer,KCVEntryMutation> mutationsForKey : storeMutations.getValue().entrySet()) {
        if (mutationsForKey.getValue().isEmpty()) continue;
        
        // 將 mutationsForKey 放進去,這個 convert 做了啥沒有具體研究,可能只是一個適配。
        sub.put(mutationsForKey.getKey(), convert(mutationsForKey.getValue()));
        numSubMutations+=mutationsForKey.getValue().getTotalMutations();
        if (numSubMutations>= persistChunkSize) {
            numSubMutations = persist(subMutations);
            sub.clear();
            subMutations.put(storeMutations.getKey().getName(),sub);
        }
    }
}

mutations 的構造

上面我們看出了,其實基本上沒複雜處理,接下來我們看看 mutations 資料哪裡來的。

對於 mutations 的修改操作,來自於 mutate 方法,程式碼:

// 傳入的是 store(包含了columnFamily) key(rowKey) additions 和 deletions
void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry> deletions) throws BackendException {
    Preconditions.checkNotNull(store);
    if (additions.isEmpty() && deletions.isEmpty()) return;
    
    // 構造 KCVEntryMutation
    KCVEntryMutation m = new KCVEntryMutation(additions, deletions);
    
    // 這幾步就是簡單的合併所以的 additions 和 deletions
    final Map<StaticBuffer, KCVEntryMutation> storeMutation = mutations.computeIfAbsent(store, k -> new HashMap<>());
    KCVEntryMutation existingM = storeMutation.get(key);
    
    if (existingM != null) {
        existingM.merge(m);
    } else {
        storeMutation.put(key, m);
    }

    numMutations += m.getTotalMutations();

    if (batchLoading && numMutations >= persistChunkSize) {
        flushInternal();
    }
}

mutate 方法引數來源

mutate 方法傳入的是 store(包含了columnFamily) key(rowKey) additions 和 deletions,這幾個引數哪裡來的呢? KCVSCache 的 mutateEntries, mutateEdges 呼叫時機呢? edgeStore.mutateEntries(key, additions, deletions, storeTx); indexStore.mutateEntries(key, additions, deletions, storeTx); 我們先以 edgeStore 為例,在 StandardJanusGraph 的 prepareCommit 方法中,呼叫了 mutator.mutateEdges(vertexKey, additions, deletions); 程式碼如下,我們刪掉了部分程式碼,包括 索引和資料刪除。

ListMultimap<Long, InternalRelation> mutations = ArrayListMultimap.create();
ListMultimap<InternalVertex, InternalRelation> mutatedProperties = ArrayListMultimap.create();
List<IndexSerializer.IndexUpdate> indexUpdates = Lists.newArrayList();


//2) Collect added edges and their index updates and acquire edge locks
// add 是 InternalRelation ,包括 VertexProperty 和 Edge,前面分析過,VertexProperty 實際上就是頂點和一個 schema 的訂單建一條邊,Edge 就是兩個頂點建一條邊。
for (InternalRelation add : Iterables.filter(addedRelations,filter)) {
    Preconditions.checkArgument(add.isNew());
    
    // getLen 返回這個 Relation 的長度,如果是 VertexProperty 是1,Edge 是需要根據方向進行判斷
    for (int pos = 0; pos < add.getLen(); pos++) {
        // 得到對應的 vertex 
        InternalVertex vertex = add.getVertex(pos);
        if (pos == 0 || !add.isLoop()) {
        
            // mutatedProperties 的 key: InternalVertex,value:InternalRelation,mutatedProperties 是用於更新索引的,在我們這裡實際上沒什麼用。
            if (add.isProperty()) mutatedProperties.put(vertex,add);
            // mutations 的 key : vertexId, value : InternalRelation
            mutations.put(vertex.longId(), add);
        }
        if (!vertex.isNew() && acquireLock(add,pos,acquireLocks)) {
            Entry entry = edgeSerializer.writeRelation(add, pos, tx);
            mutator.acquireEdgeLock(idManager.getKey(vertex.longId()), entry.getColumn());
        }
    }
}


//5) Add relation mutations
for (Long vertexId : mutations.keySet()) {
    Preconditions.checkArgument(vertexId > 0, "Vertex has no id: %s", vertexId);
    final List<InternalRelation> edges = mutations.get(vertexId);
    final List<Entry> additions = new ArrayList<>(edges.size());
    final List<Entry> deletions = new ArrayList<>(Math.max(10, edges.size() / 10));
    for (final InternalRelation edge : edges) {
        // 得到 InternalRelationType ,分為 PropertyKey 和 EdgeLabel 兩類
        final InternalRelationType baseType = (InternalRelationType) edge.getType();
        assert baseType.getBaseType()==null;

        for (InternalRelationType type : baseType.getRelationIndexes()) { 
            if (type.getStatus()== SchemaStatus.DISABLED) continue;
            // getArity 和 getLen 不一樣,
            for (int pos = 0; pos < edge.getArity(); pos++) {
                if (!type.isUnidirected(Direction.BOTH) && !type.isUnidirected(EdgeDirection.fromPosition(pos)))
                    continue; //Directionality is not covered
                
                // 如果是起始頂點
                if (edge.getVertex(pos).longId()==vertexId) {
                
                    // 根據 edge type pos tx 得到應該序列化的 StaticArrayEntry
                    StaticArrayEntry entry = edgeSerializer.writeRelation(edge, type, pos, tx);
                    if (edge.isRemoved()) {
                        deletions.add(entry);
                    } else {
                        Preconditions.checkArgument(edge.isNew());
                        int ttl = getTTL(edge);
                        if (ttl > 0) {
                            entry.setMetaData(EntryMetaData.TTL, ttl);
                        }
                        additions.add(entry);
                    }
                }
            }
        }
    }

    StaticBuffer vertexKey = idManager.getKey(vertexId);
    mutator.mutateEdges(vertexKey, additions, deletions);
}

edgeSerializer.writeRelation 到底做了什麼

我們現在就想知道,資料是怎麼被序列化話 entry 的,程式碼如下:

public StaticArrayEntry writeRelation(InternalRelation relation, InternalRelationType type, int position,
                                      TypeInspector tx) {
    assert type==relation.getType() || (type.getBaseType() != null
            && type.getBaseType().equals(relation.getType()));
    // 得到方向,pos 是 0 就是 out,是 1 就是 in
    Direction dir = EdgeDirection.fromPosition(position);
    
    // 方向驗證
    Preconditions.checkArgument(type.isUnidirected(Direction.BOTH) || type.isUnidirected(dir));
    
    // 得到 typeId, 這個 type 是 VertexLabel 或者 PropertyKey
    long typeId = type.longId();
    // 得到 dirID 
    DirectionID dirID = getDirID(dir, relation.isProperty() ? RelationCategory.PROPERTY : RelationCategory.EDGE);
    
    // 得到 一個 out
    DataOutput out = serializer.getDataOutput(DEFAULT_CAPACITY);
    // key 和 value 的分割地址。
    int valuePosition;
    
    // 寫 typeId 和 dirID 
    IDHandler.writeRelationType(out, typeId, dirID, type.isInvisibleType());
    
    // 得到 multiplicity 和 sortKey
    Multiplicity multiplicity = type.multiplicity();
    long[] sortKey = type.getSortKey();
    
    assert !multiplicity.isConstrained() || sortKey.length==0: type.name();
    int keyStartPos = out.getPosition();
    if (!multiplicity.isConstrained()) {
        // 如果 multiplicity 是 沒有限制,也就是為 MULTI,必須要有 sortKey,寫出 sortKey。
        writeInlineTypes(sortKey, relation, out, tx, InlineType.KEY);
    }
    
    // 到這裡 key 就寫完了,得到 key 的 pos
    int keyEndPos = out.getPosition();

    long relationId = relation.longId();

    //How multiplicity is handled for edges and properties is slightly different
    if (relation.isEdge()) {
        // 得到另一個 vertex 的 id
        long otherVertexId = relation.getVertex((position + 1) % 2).longId();
        // 如果 multiplicity 有限制
        if (multiplicity.isConstrained()) {
            // isUnique
            if (multiplicity.isUnique(dir)) {
                // 得到 valuePosition ,寫出 otherVertexId 
                valuePosition = out.getPosition();
                VariableLong.writePositive(out, otherVertexId);
            } else {
                // 反方向寫 otherVertexId ,記下 valuePosition
                VariableLong.writePositiveBackward(out, otherVertexId);
                valuePosition = out.getPosition();
            }
            // 寫下 relationId
            VariableLong.writePositive(out, relationId);
        } else {
            // 沒有限制,反方向寫出 otherVertexId 和 relationId ,記下 valuePosition
            VariableLong.writePositiveBackward(out, otherVertexId);
            VariableLong.writePositiveBackward(out, relationId);
            valuePosition = out.getPosition();
        }
    } else { // PropertyKey
        assert relation.isProperty();
        Preconditions.checkArgument(relation.isProperty());
        // 得到 property 的值。
        Object value = ((JanusGraphVertexProperty) relation).value();
        Preconditions.checkNotNull(value);
        PropertyKey key = (PropertyKey) type;
        assert key.dataType().isInstance(value);
        
        // 寫出 value 得到 valuePosition
        if (multiplicity.isConstrained()) { // 沒有限制的 property
            if 
            
           

相關推薦

janusgraph原始碼分析8-底層互動

反向分析 cassandra 寫資料 API cassandra 的結構類似 bigtable ,資料實際上是多層巢狀的 map,第一個 key 是 rowkey,第二層key 是 columnFamily,第三層key 是 column,第四層(也可以忽略)

elasticSearch6原始碼分析(8)RepositoriesModule模組

1.RepositoriesModule概述 Sets up classes for Snapshot/Restore 1.1 snapshot概述 A snapshot is a backup taken from a running Elasticsearch cluster. Yo

Leveldb原始碼分析--8

6 SSTable之2 6.4 建立sstable檔案 瞭解了sstable檔案的儲存格式,以及Data Block的組織,下面就可以分析如何建立sstable檔案了。相關程式碼在table_builder.h/.cc以及block_builder.h/.cc(

Android 7.0 Gallery相簿原始碼分析8

在Android 7.0 Gallery相簿原始碼分析3 - 資料載入及顯示流程一文最後講了AlbumSetSlidingWindow的onContentChanged方法,專輯縮圖和縮圖下面的label的載入就是在此方法中完成的 public

dubbo原始碼分析8 -- DubboProtocol 之提供端釋出服務export

在前面提到,在RegistryProtocol釋出服務時,首先會dubbo對外提供介面 根據url的地址,協議是dubbo,呼叫protocol.export(…), 但是根據ExtensionLoader.getExtensionLoader獲取的到的pro

ffdshow 原始碼分析 8: 視訊解碼器類(TvideoCodecDec)

=====================================================ffdshow原始碼分析系列文章列表:=====================================================前面兩篇文章介紹了ffds

RTMPdump(libRTMP) 原始碼分析 8: 傳送訊息(Message)

=====================================================RTMPdump(libRTMP) 原始碼分析系列文章:=====================================================函式呼叫

lucene原始碼分析---8

lucene原始碼分析—查詢過程 本章開始介紹lucene的查詢過程,即IndexSearcher的search函式, IndexSearcher::search public TopDocs search(Query query, int n)

[Abp vNext 原始碼分析] - 8. 審計日誌

一、簡要說明 ABP vNext 當中的審計模組早在 依賴注入與攔截器一文中有所提及,但沒有詳細的對其進行分析。 審計模組是 ABP vNext 框架的一個基本元件,它能夠提供一些實用日誌記錄。不過這裡的日誌不是說系統日誌,而是說介面每次呼叫之後的執行情況(執行時間、傳入引數、異常資訊、請求 IP)。 除了常

Java中HashMap底層實現原理(JDK1.8)原始碼分析

在JDK1.6,JDK1.7中,HashMap採用位桶+連結串列實現,即使用連結串列處理衝突,同一hash值的連結串列都儲存在一個連結串列裡。但是當位於一個桶中的元素較多,即hash值相等的元素較多時,通過key值依次查詢的效率較低。而JDK1.8中,HashMap採用位桶+

(轉載)Java中HashMap底層實現原理(JDK1.8)原始碼分析

近期在看一些java底層的東西,看到一篇分析hashMap不錯的文章,跟大家分享一下。 在JDK1.6,JDK1.7中,HashMap採用位桶+連結串列實現,即使用連結串列處理衝突,同一hash值的連結串列都儲存在一個連結串列裡。但是當位於一個桶中的元素較多,即hash值

圖解Janusgraph系列-圖資料底層序列化原始碼分析(Data Serialize)

# 圖解Janusgraph系列-圖資料底層序列化原始碼分析(Data Serialize) 大家好,我是`洋仔`,JanusGraph圖解系列文章,`實時更新`~ #### 圖資料庫文章總目錄: * **整理所有圖相關文章,請移步(超鏈):**[圖資料庫系列-文章總目錄 ](https://li

菜鳥帶你看原始碼——看不懂你打我ArrayList原始碼分析(基於java 8

文章目錄 看原始碼並不難 軟體環境 成員變數: 構造方法 核心方法 get方法 remove方法 add方法 結束 看原始碼並不難 如何學好程式設計?如何寫出優質的程式碼?如

Mybatis 原始碼分析8)—— 一二級快取

一級快取 其實關於 Mybatis 的一級快取是比較抽象的,並沒有什麼特別的配置,都是在程式碼中體現出來的。 當呼叫 Configuration 的 newExecutor 方法來建立 executor: public Executor newExecutor(Transac

Java -- 基於JDK1.8的ArrayList原始碼分析

1,前言   很久沒有寫部落格了,很想念大家,18年都快過完了,才開始寫第一篇,爭取後面每週寫點,權當是記錄,因為最近在看JDK的Collection,而且ArrayList原始碼這一塊也經常被面試官問道,所以今天也就和大家一起來總結一下 2,原始碼解讀   當我們一般提到ArrayLi

java中排序原始碼分析(JDK1.8

List排序 在開發過程中常用的是jdk自帶的排序 Collections.sort(List<T> list, Comparator<? super T> c); 開啟原始碼如下: @SuppressWarnings({"unchecked",

HashMap的實現原理和底層結構 圖解+原始碼分析

 雜湊表(hash table)也叫散列表,是一種非常重要的資料結構,應用場景及其豐富,許多快取技術(比如memcached)的核心其實就是在記憶體中維護一張大的雜湊表,而HashMap的實現原理也常常出現在各類的面試題中,重要性可見一斑。本文會對java集合框架中的對應實現HashMap的實現原理

Java_59_陣列_模擬ArrayList容器的底層實現JDK原始碼分析程式碼

package cn.pmcse.myCollection; public class ArrayList {     private Object[] value;     private    in

Tomcat 原始碼分析 WebappClassLoader 分析 (基於8.0.5)

0. 疑惑 在剛接觸 Tomcat 中的ClassLoader時心中不免冒出的疑惑: "Tomcat 裡面是怎麼樣設計ClassLoader的, 這樣設計有什麼好處?"; 我們先把這個問題留著, 到最後在看 ! 1. Java 中 ClassLoader 類別 1. BootstrapC

8】netty4原始碼分析-flush

轉自 http://xw-z1985.iteye.com/blog/1971904 Netty的寫操作由兩個步驟組成: Write:將msg儲存到ChannelOutboundBuffer中 Flush:將msg從ChannelOutboundBuffer中flush到套接字的傳送緩