1. 程式人生 > >Cassandra原始碼分析:資料寫入流程

Cassandra原始碼分析:資料寫入流程

org.apache.cassandra.thrift.CassandraServer類的add方法將接受客戶端的請求,該函式定義如下:

 public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)   
           throws InvalidRequestException, UnavailableException, TimedOutException, TException   
   {   
    // 資料驗證   

       logger.debug("add");   

       state().hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);   
       String keyspace = state().getKeyspace();   

       CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);   
       ThriftValidation.validateKey(metadata, key);   

       ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);   
       ThriftValidation.validateColumnParent(metadata, column_parent);   
       // SuperColumn field is usually optional, but not when we're adding   
       if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)   

       {   
           throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);   
       }   
       ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));   

       // 建立一個 RowMutation 物件,封裝使用者插入資料資訊   
       RowMutation rm = new RowMutation(keyspace, key);   
       try   
       {   
           rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value);   
       }   
       catch (MarshalException e)   
       {   
           throw new InvalidRequestException(e.getMessage());   
       }   
       // 插入資料   
doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));   
   }   

函式內部實現上首先將kv資訊封裝成RowMutation物件,之後建立QueryPath物件(主要是對資料進行封轉)

最後呼叫doInsert方法執行插入動作,doInsert函式定義如下: 

 // 執行資料插入操作    

private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException   
{   
    // 資料驗證   
    ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level);   
    if (mutations.isEmpty())   
        return;   
    try   
    {   
        schedule(DatabaseDescriptor.getRpcTimeout());   
        try   
        {   
 StorageProxy.mutate(mutations, consistency_level);   
        }   
        finally   
        {   
            release();   
        }   
    }   
    catch (TimeoutException e)   
    {   
        throw new TimedOutException();   
    }   
}   

函式內部首先進行資料檢查,呼叫StorageProxy.mutate(mutations, consistency_level);執行資料的插入操作。

mute方法定義如下: 

public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException   

{   
    logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);   
    // 本地資料中心   
    final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());   

    long startTime = System.nanoTime();   
    // 封裝條件變數   
    List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();   

    IMutation mostRecentMutation = null;   
    try   
    {   
        for (IMutation mutation : mutations)    // 對於每個Mutation   
        {   
            mostRecentMutation = mutation;   
            // CounterMutation:首先需要被寫入到replicas leader中,之後在向其他的replicas中去分發
            if (mutation instanceof CounterMutation)   
            {   
                responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));   
            }   
            else   
            {   
                // WritePerformer:普通型別的資料分發   
                responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));   
            }   
        }   

        // wait for writes.  throws TimeoutException if necessary   
        for (IWriteResponseHandler responseHandler : responseHandlers)   
        {   
            // 等待任務結束或者是丟擲異常   
            responseHandler.get();   
        }   

    }   
    catch (TimeoutException ex)     // 捕獲異常   
    {   
        if (logger.isDebugEnabled())   
        {   
            List<String> mstrings = new ArrayList<String>();   
            for (IMutation mutation : mutations)   
                mstrings.add(mutation.toString(true));   
            logger.debug("Write timeout {} for one (or more) of: ", ex.toString(), mstrings);   
        }   
        throw ex;   
    }   
    catch (IOException e)   
    {   
        assert mostRecentMutation != null;   
        throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e);   
    }   
    finally   
    {   
        writeStats.addNano(System.nanoTime() - startTime);   
    }   
}   


對於每個Mutation物件,如果是CounterMutation型別的Mutation的話,首先要確保一個replica的寫入成功,之後在向另外的N-1個replicas寫入;其他型別的Mutation的話,沒有這個要求,做法是首先得到N個replicas節點,向這個N個節點發送命令。


這兩種型別的Mutation是通過兩個函式mutateCounter和performWrite分別生成的,這裡我們僅僅來看一下performWrite的實現:首先得到複製策略,通過複製策略得到所有replica的endpoints,將任務交給代理WritePerformer.apply執行。程式碼如下:

   public static IWriteResponseHandler performWrite(IMutation mutation,   
                                                    ConsistencyLevel consistency_level,   
                                                    String localDataCenter,   
                                                    WritePerformer performer)   
   throws UnavailableException, TimeoutException, IOException   
   {   
// 得到複製策略 
       String table = mutation.getTable();   
       AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();   

      // 得到所有replica的endpoints
       Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());   

       // 滿足一致性的條件變數   
       IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);   

       // exit early if we can't fulfill the CL at this time   
       // 如果已經能夠確定不能滿足一致性的條件,例如live的節點數量小於W,直接返回   
       responseHandler.assureSufficientLiveNodes();   

       // 代理給WritePerformer執行   
   performer.apply(mutation, writeEndpoints, responseHandler, localDataCenter, consistency_level);   


       return responseHandler;   
   }   


同時需要注意的是在檔案org.apache.cassandra.service.StorageProxy.java中有三個實現而來WritePerformer介面的類,WritePerformer介面定義如下:

private interface WritePerformer   

{   
    public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException;   

}   

也就是說最終完成資料寫入任務的是WritePerformer的apply方法。StorageProxy的三個實現該介面的型別如下: 
// 最終的資料使用實現了WritePerformer介面的standardWritePerformer,counterWritePerformer   
      // 和counterWriteOnCoordinatorPerformer   
      standardWritePerformer = new WritePerformer()  
      {  
          public void apply(IMutation mutation,  
                            Collection<InetAddress> targets,  
                            IWriteResponseHandler responseHandler,  
                            String localDataCenter,  
                            ConsistencyLevel consistency_level)  
          throws IOException, TimeoutException  
          {  
              assert mutation instanceof RowMutation;  
              sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);  
          }  
      };  
  
      /* 
       * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or 
       * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage 
       * but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the 
       * underlying on the stage otherwise we risk a deadlock. Hence two different performer. 
       * 執行CounterMutation 
       */  
      counterWritePerformer = new WritePerformer()  
      {  
          public void apply(IMutation mutation,  
                            Collection<InetAddress> targets,  
                            IWriteResponseHandler responseHandler,  
                            String localDataCenter,  
                            ConsistencyLevel consistency_level)   
          throws IOException  
          {  
              if (logger.isDebugEnabled())  
                  logger.debug("insert writing local & replicate " + mutation.toString(true));  
  
              Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);  
              runnable.run();  
          }  
      };  
        
      // 執行CounterMutation   
      counterWriteOnCoordinatorPerformer = new WritePerformer()  
      {  
          public void apply(IMutation mutation,  
                            Collection<InetAddress> targets,  
                            IWriteResponseHandler responseHandler,  
                            String localDataCenter,  
                            ConsistencyLevel consistency_level)  
          throws IOException  
          {  
              if (logger.isDebugEnabled())  
                  logger.debug("insert writing local & replicate " + mutation.toString(true));  
  
              Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);  
              StageManager.getStage(Stage.MUTATION).execute(runnable);  
          }  
      };  


我們分別來看上面的幾個實現,standardWritePerformer的實現方式比較簡單,對於endpoints的集合,如果該節點還live,那麼其傳送寫命令,如果該節點dead,那麼這時執行hinted-handoff策略: 

/** 
    * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node 
    * is not available. 
    * 
    * Note about hints: 
    * 
    * | Hinted Handoff | Consist. Level | 
    * | on             |       >=1      | --> wait for hints. We DO NOT notify the handler with handler.response() for hints;  
    * | on             |       ANY      | --> wait for hints. Responses count towards consistency. 
    * | off            |       >=1      | --> DO NOT fire hints. And DO NOT wait for them to complete. 
    * | off            |       ANY      | --> DO NOT fire hints. And DO NOT wait for them to complete. 
    * 
    * @throws TimeoutException if the hints cannot be written/enqueued  
    */  
   private static void sendToHintedEndpoints(final RowMutation rm,   
                                             Collection<InetAddress> targets,  
                                             IWriteResponseHandler responseHandler,  
                                             String localDataCenter,  
                                             ConsistencyLevel consistency_level)  
   throws IOException, TimeoutException  
   {  
       // Multimap that holds onto all the messages and addresses meant for a specific datacenter   
       Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(targets.size());  
       MessageProducer producer = new CachingMessageProducer(rm);  
  
       for (InetAddress destination : targets)      // 對於每個endpoint   
       {  
           if (FailureDetector.instance.isAlive(destination))       // 如果endpoint還live   
           {  
               String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);  
  
               if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)  
               {  
                // 如果當前機器就是replicas中的一個,直接寫入到本地   
                   insertLocal(rm, responseHandler);  
               }  
               else  
               {  
                // 否則需要向遠端伺服器傳送命令    
                   // belongs on a different server   
                   if (logger.isDebugEnabled())  
                       logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);  
  
                   Multimap<Message, InetAddress> messages = dcMessages.get(dc);  
                   if (messages == null)  
                   {  
                      messages = HashMultimap.create();  
                      dcMessages.put(dc, messages);  
                   }  
  
                   messages.put(producer.getMessage(Gossiper.instance.getVersion(destination)), destination);  
               }  
           }  
           else     // 否則,這裡的話,可能是需要使用hinted-handoff機制   
           {  
               if (!shouldHint(destination))  
                   continue;  
  
               // Avoid OOMing from hints waiting to be written.  (Unlike ordinary mutations, hint   
               // not eligible to drop if we fall behind.)   
               if (hintsInProgress.get() > maxHintsInProgress)  
                   throw new TimeoutException();  
  
               // Schedule a local hint and let the handler know it needs to wait for the hint to complete too   
               Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);  
               responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));  
           }  
       }  
         
       // 向replicas傳送message   
       sendMessages(localDataCenter, dcMessages, responseHandler);  
   }  


到此我們已經完成了資料從StorageProxy到各個replicas的轉發工作,當然這裡還存在一些問題,會在下面的繼續:


1. 首先replicas收到命令之後的處理動作


2. cassandra中如何生成replicas,如何發現endpoints的拓撲結構,這就涉及到cassandra中snitch的實現


3. cassandra中如何實現DHT?