1. 程式人生 > >BeeGFS原始碼分析4-元資料同步

BeeGFS原始碼分析4-元資料同步

同步任務初始化

// fhgfs_meta\source\app\App.cpp

void App::initComponents(TargetConsistencyState initialConsistencyState)
   throw(ComponentInitException)
{
...
    
   this->buddyResyncer = new BuddyResyncer();
    
   this->internodeSyncer = new InternodeSyncer(initialConsistencyState);

...
}

void App::startComponents()
{
...
    
    this->internodeSyncer->start();
    
...
}
// fhgfs_meta\source\app\App.h
class App : public AbstractApp
{
...
      InternodeSyncer* getInternodeSyncer() const
      {
         return internodeSyncer;
      }

      BuddyResyncer* getBuddyResyncer()
      {
         return this->buddyResyncer;
      }
...
}

節點狀態變化

// fhgfs_common\source\common\toolkit\OfflineWaitTimeoutTk.h

/**
 * Calculates the offline wait timeout from config variables. It consists of:
 *  5 sec InternodeSyncer syncLoop interval.
 *  3 * update interval:
 *      until target gets pofflined (2x)
 *      end of offline timeout until next target state update (worst case)
 *  plus the actual target offline timeout.
 *
 * Templated for the Config type because Storage and Meta server have different Config classes.
 */
template<typename Cfg>
class OfflineWaitTimeoutTk
{
public:
   static unsigned int calculate(Cfg* cfg)
   {
      const unsigned updateTargetStatesSecs = cfg->getSysUpdateTargetStatesSecs();

      if (updateTargetStatesSecs != 0)
      {
         // If sysUpdateTargetStateSecs is set in config, use that value.
         return (
            ( 5
              + 3 * updateTargetStatesSecs
              + cfg->getSysTargetOfflineTimeoutSecs()
            ) * 1000);
      }
      else
      {
         // If sysUpdateTargetStatesSecs hasn't been set in config, it defaults to 1/3 the value
         // of sysTargetOfflineTimeoutSecs -> we use 3 * 1/3 sysTargetOfflineTimeoutSecs.
         return (
            ( 5
              + 2 * cfg->getSysTargetOfflineTimeoutSecs()
            ) * 1000);
      }
   }
};

// fhgfs_meta\source\storage\NodeOfflineWait.h
class NodeOfflineWait
{
   public:
      NodeOfflineWait(Config* cfg)
         : waitTimeoutMS(OfflineWaitTimeoutTk<Config>::calculate(cfg) ),
           active(false)
      { }
}


// fhgfs_meta\source\components\InternodeSyncer.cpp

InternodeSyncer::InternodeSyncer(TargetConsistencyState initialConsistencyState)
   throw(ComponentInitException)
   : PThread("XNodeSync"),
     log("XNodeSync"),
     forcePoolsUpdate(true),
     forceTargetStatesUpdate(true),
     forcePublishCapacities(true),
     offlineWait(Program::getApp()->getConfig() ),
     nodeConsistencyState(initialConsistencyState),
     buddyResyncInProgress(false)
{
   MirrorBuddyGroupMapper* mbg = Program::getApp()->getMetaBuddyGroupMapper();
   MirrorBuddyState buddyState = mbg->getBuddyState(Program::getApp()->getLocalNodeNumID().val() );

   if ((buddyState == BuddyState_PRIMARY)
         && (nodeConsistencyState == TargetConsistencyState_NEEDS_RESYNC))
      offlineWait.startTimer();
}

void InternodeSyncer::syncLoop()
{
   App* app = Program::getApp();
   Config* cfg = app->getConfig();

   const int sleepIntervalMS = 3*1000; // 3sec

   // If (undocumented) sysUpdateTargetStatesSecs is set in config, use that value, otherwise
   // default to 1/6 sysTargetOfflineTimeoutSecs.
   const unsigned updateTargetStatesMS =
      (cfg->getSysUpdateTargetStatesSecs() != 0)
      ? cfg->getSysUpdateTargetStatesSecs() * 1000
      : cfg->getSysTargetOfflineTimeoutSecs() * 166;

   const unsigned updateCapacityPoolsMS = 4 * updateTargetStatesMS;

   const unsigned metaCacheSweepNormalMS = 5*1000; // 5sec
   const unsigned metaCacheSweepStressedMS = 2*1000; // 2sec
   const unsigned idleDisconnectIntervalMS = 70*60*1000; /* 70 minutes (must be less than half the
      streamlis idle disconnect interval to avoid cases where streamlis disconnects first) */
   const unsigned updateIDTimeMS = 60 * 1000; // 1 min
   const unsigned downloadNodesIntervalMS = 300000; // 5 min

   Time lastCapacityUpdateT;
   Time lastMetaCacheSweepT;
   Time lastIdleDisconnectT;
   Time lastTimeIDSet;
   Time lastTargetStatesUpdateT;
   Time lastDownloadNodesT;
   Time lastCapacityPublishedT;

   unsigned currentCacheSweepMS = metaCacheSweepNormalMS; // (adapted inside the loop below)


   while(!waitForSelfTerminateOrder(sleepIntervalMS) )
   {
      bool doCapacityPoolsUpdate = getAndResetForcePoolsUpdate()
            || (lastCapacityUpdateT.elapsedMS() > updateCapacityPoolsMS);
      bool doTargetStatesUpdate = getAndResetForceTargetStatesUpdate()
            || (lastTargetStatesUpdateT.elapsedMS() > updateTargetStatesMS);
      bool doPublishCapacities = getAndResetForcePublishCapacities()
            || (lastCapacityPublishedT.elapsedMS() > updateTargetStatesMS);

      // download & sync nodes
      if(lastDownloadNodesT.elapsedMS() > downloadNodesIntervalMS)
      {
         downloadAndSyncNodes();
         downloadAndSyncTargetMappings();

         lastDownloadNodesT.setToNow();
      }

      if(doCapacityPoolsUpdate)
      {
         downloadAndSyncCapacityPools();

         lastCapacityUpdateT.setToNow();
      }

      if(lastMetaCacheSweepT.elapsedMS() > currentCacheSweepMS)
      {
         bool flushTriggered = app->getMetaStore()->cacheSweepAsync();
         currentCacheSweepMS = (flushTriggered ? metaCacheSweepStressedMS : metaCacheSweepNormalMS);

         lastMetaCacheSweepT.setToNow();
      }

      if(lastIdleDisconnectT.elapsedMS() > idleDisconnectIntervalMS)
      {
         dropIdleConns();
         lastIdleDisconnectT.setToNow();
      }

      if(lastTimeIDSet.elapsedMS() > updateIDTimeMS)
      {
         StorageTk::resetIDCounterToNow();
         lastTimeIDSet.setToNow();
      }

      if(doTargetStatesUpdate)
      {
         if (this->offlineWait.hasTimeout() )
         {
            // if we're waiting to be offlined, set our local state to needs-resync and don't report
            // anything to the mgmtd
            setNodeConsistencyState(TargetConsistencyState_NEEDS_RESYNC);
         }
         else
         {
            TargetConsistencyState newConsistencyState;
            updateMetaStatesAndBuddyGroups(newConsistencyState, true);
            setNodeConsistencyState(newConsistencyState);
            downloadAndSyncTargetStatesAndBuddyGroups();
         }

         lastTargetStatesUpdateT.setToNow();
      }

      if (doPublishCapacities)
      {
         publishNodeCapacity();
         lastCapacityPublishedT.setToNow();
      }
   }
}



節點同步類(InternodeSyncer)

// fhgfs_meta\source\components\InternodeSyncer.h

class InternodeSyncer : public PThread
{
   public:
      InternodeSyncer(TargetConsistencyState initialConsistencyState) throw(ComponentInitException);
      virtual ~InternodeSyncer() { }

      static bool registerNode(AbstractDatagramListener* dgramLis);

      static bool updateMetaStatesAndBuddyGroups(TargetConsistencyState& outConsistencyState,
         bool publish);
      static void syncClients(const std::vector<NodeHandle>& clientsList, bool allowRemoteComm);
      static bool downloadAndSyncNodes();
      static bool downloadAndSyncTargetMappings();
      static bool downloadAndSyncTargetStatesAndBuddyGroups();
      static bool downloadAndSyncCapacityPools();

      static void downloadAndSyncClients(bool requeue);

      static bool downloadAllExceededQuotaLists();
      static bool downloadExceededQuotaList(QuotaDataType idType, QuotaLimitType exType,
         UIntList* outIDList, FhgfsOpsErr& error);

      static void printSyncResults(NodeType nodeType, NumNodeIDList* addedNodes,
         NumNodeIDList* removedNodes);

   private:
      LogContext log;

      Mutex forcePoolsUpdateMutex;
      Mutex forceTargetStatesUpdateMutex;
      Mutex forcePublishCapacitiesMutex;
      bool forcePoolsUpdate; // true to force update of capacity pools
      bool forceTargetStatesUpdate; // true to force update of node state
      bool forcePublishCapacities; // true to force publishing free capacity

      // Keeps track of the timeout during which the node may not send state reports because it is
      // waiting to be offlined by the mgmtd.
      NodeOfflineWait offlineWait;

      Mutex nodeConsistencyStateMutex;
      TargetConsistencyState nodeConsistencyState; // Node's own consistency state.
      // Note: This is initialized when updateMetaStates... is called from App::downloadMgmtInfo.
      AtomicUInt32 buddyResyncInProgress;

      virtual void run();
      void syncLoop();

      static bool updateMetaCapacityPools();
      static bool updateMetaBuddyCapacityPools();
      static bool updateStorageCapacityPools();
      static bool updateTargetBuddyCapacityPools();
      static bool downloadCapacityPools(CapacityPoolQueryType poolType, UInt16List* outListNormal,
         UInt16List* outListLow, UInt16List* outListEmergency);
      void publishNodeCapacity();

      void forceMgmtdPoolsRefresh();

      void dropIdleConns();
      unsigned dropIdleConnsByStore(NodeStoreServersEx* nodes);

      void getStatInfo(int64_t* outSizeTotal, int64_t* outSizeFree, int64_t* outInodesTotal,
         int64_t* outInodesFree);

      static TargetConsistencyState decideResync(const CombinedTargetState newState);
      static bool publishNodeStateChange(const TargetConsistencyState oldState,
         const TargetConsistencyState newState);

   public:
      // inliners
      void setForcePoolsUpdate()
      {
         std::lock_guard<Mutex> lock(forcePoolsUpdateMutex);
         forcePoolsUpdate = true;
      }

      void setForceTargetStatesUpdate()
      {
         std::lock_guard<Mutex> lock(forceTargetStatesUpdateMutex);
         forceTargetStatesUpdate = true;
      }

      void setForcePublishCapacities()
      {
         std::lock_guard<Mutex> lock(forcePublishCapacitiesMutex);
         forcePublishCapacities = true;
      }

      TargetConsistencyState getNodeConsistencyState()
      {
         std::lock_guard<Mutex> lock(nodeConsistencyStateMutex);
         return nodeConsistencyState;
      }

      void setNodeConsistencyState(TargetConsistencyState newState)
      {
         std::lock_guard<Mutex> lock(nodeConsistencyStateMutex);
         nodeConsistencyState = newState;
      }

      void setResyncInProgress(bool resyncInProgress)
      {
         this->buddyResyncInProgress.set(resyncInProgress);
      }

      bool getResyncInProgress()
      {
         return this->buddyResyncInProgress.read();
      }

   private:
      // inliners
      bool getAndResetForcePoolsUpdate()
      {
         std::lock_guard<Mutex> lock(forcePoolsUpdateMutex);

         bool retVal = forcePoolsUpdate;
         forcePoolsUpdate = false;

         return retVal;
      }

      bool getAndResetForceTargetStatesUpdate()
      {
         std::lock_guard<Mutex> lock(forceTargetStatesUpdateMutex);

         bool retVal = forceTargetStatesUpdate;
         forceTargetStatesUpdate = false;

         return retVal;
      }

      bool getAndResetForcePublishCapacities()
      {
         std::lock_guard<Mutex> lock(forcePublishCapacitiesMutex);

         bool retVal = forcePublishCapacities;
         forcePublishCapacities = false;

         return retVal;
      }
};

同步任務觸發

初始化時觸發

// fhgfs_meta\source\app\App.cpp

/**
 * @throw InvalidConfigException on error
 */
void App::runNormal()
{
...
    
   bool downloadRes = downloadMgmtInfo(initialConsistencyState);
   if (!downloadRes)
   {
      log->log(1, "Downloading target states from management node failed. Shutting down...");
      appResult = APPCODE_INITIALIZATION_ERROR;
      return;
   }

...
}


/**
 * Downloads the list of nodes, targets and buddy groups (for meta and storage servers) from the
 * mgmtd.
 *
 * @param outInitialConsistencyState The consistency state the local meta node has on the mgmtd
 *                                   before any state reports are sent.
 */
bool App::downloadMgmtInfo(TargetConsistencyState& outInitialConsistencyState)
{
   Config* cfg = this->getConfig();

   int retrySleepTimeMS = 10000; // 10sec
   unsigned udpListenPort = cfg->getConnMetaPortUDP();
   bool allSuccessful = false;

   // start temporary registration datagram listener
   RegistrationDatagramListener regDGramLis(netFilter, localNicList, ackStore, udpListenPort);
   regDGramLis.start();

   // loop until we're registered and everything is downloaded (or until we got interrupted)
   do
   {
      // register ourselves
      // (note: node registration needs to be done before downloads to get notified of updates)
      if (!InternodeSyncer::registerNode(&regDGramLis) )
         continue;

      // download all mgmt info the HBM cares for
      if (!InternodeSyncer::downloadAndSyncNodes() ||
          !InternodeSyncer::downloadAndSyncTargetMappings() ||
          !InternodeSyncer::downloadAndSyncTargetStatesAndBuddyGroups() ||
          !InternodeSyncer::downloadAndSyncCapacityPools())
         continue;

      InternodeSyncer::downloadAndSyncClients(false);

      // ...and then the InternodeSyncer's part.
      if (!InternodeSyncer::updateMetaStatesAndBuddyGroups(outInitialConsistencyState, false) )
         continue;

      if(!InternodeSyncer::downloadAllExceededQuotaLists() )
         continue;

      allSuccessful = true;
      break;

   } while(!waitForSelfTerminateOrder(retrySleepTimeMS) );

   // stop temporary registration datagram listener
   regDGramLis.selfTerminate();
   regDGramLis.sendDummyToSelfUDP(); // for faster termination

   regDGramLis.join();

   if(allSuccessful)
      log->log(Log_NOTICE, "Registration and management info download complete.");

   return allSuccessful;
}


定時更新觸發

// fhgfs_meta\source\components\InternodeSyncer.cpp

void InternodeSyncer::run()
{
   try
   {
      registerSignalHandler();

      syncLoop();

      log.log(Log_DEBUG, "Component stopped.");
   }
   catch(std::exception& e)
   {
      PThread::getCurrentThreadApp()->handleComponentException(e);
   }
}

void InternodeSyncer::syncLoop()
{
   App* app = Program::getApp();
   Config* cfg = app->getConfig();

   const int sleepIntervalMS = 3*1000; // 3sec

   // If (undocumented) sysUpdateTargetStatesSecs is set in config, use that value, otherwise
   // default to 1/6 sysTargetOfflineTimeoutSecs.
   const unsigned updateTargetStatesMS =
      (cfg->getSysUpdateTargetStatesSecs() != 0)
      ? cfg->getSysUpdateTargetStatesSecs() * 1000
      : cfg->getSysTargetOfflineTimeoutSecs() * 166;

   const unsigned updateCapacityPoolsMS = 4 * updateTargetStatesMS;

   const unsigned metaCacheSweepNormalMS = 5*1000; // 5sec
   const unsigned metaCacheSweepStressedMS = 2*1000; // 2sec
   const unsigned idleDisconnectIntervalMS = 70*60*1000; /* 70 minutes (must be less than half the
      streamlis idle disconnect interval to avoid cases where streamlis disconnects first) */
   const unsigned updateIDTimeMS = 60 * 1000; // 1 min
   const unsigned downloadNodesIntervalMS = 300000; // 5 min

   Time lastCapacityUpdateT;
   Time lastMetaCacheSweepT;
   Time lastIdleDisconnectT;
   Time lastTimeIDSet;
   Time lastTargetStatesUpdateT;
   Time lastDownloadNodesT;
   Time lastCapacityPublishedT;

   unsigned currentCacheSweepMS = metaCacheSweepNormalMS; // (adapted inside the loop below)


   while(!waitForSelfTerminateOrder(sleepIntervalMS) )
   {
      bool doCapacityPoolsUpdate = getAndResetForcePoolsUpdate()
            || (lastCapacityUpdateT.elapsedMS() > updateCapacityPoolsMS);
      bool doTargetStatesUpdate = getAndResetForceTargetStatesUpdate()
            || (lastTargetStatesUpdateT.elapsedMS() > updateTargetStatesMS);
      bool doPublishCapacities = getAndResetForcePublishCapacities()
            || (lastCapacityPublishedT.elapsedMS() > updateTargetStatesMS);

      // download & sync nodes
      if(lastDownloadNodesT.elapsedMS() > downloadNodesIntervalMS)
      {
         downloadAndSyncNodes();
         downloadAndSyncTargetMappings();

         lastDownloadNodesT.setToNow();
      }

      if(doCapacityPoolsUpdate)
      {
         downloadAndSyncCapacityPools();

         lastCapacityUpdateT.setToNow();
      }

      if(lastMetaCacheSweepT.elapsedMS() > currentCacheSweepMS)
      {
         bool flushTriggered = app->getMetaStore()->cacheSweepAsync();
         currentCacheSweepMS = (flushTriggered ? metaCacheSweepStressedMS : metaCacheSweepNormalMS);

         lastMetaCacheSweepT.setToNow();
      }

      if(lastIdleDisconnectT.elapsedMS() > idleDisconnectIntervalMS)
      {
         dropIdleConns();
         lastIdleDisconnectT.setToNow();
      }

      if(lastTimeIDSet.elapsedMS() > updateIDTimeMS)
      {
         StorageTk::resetIDCounterToNow();
         lastTimeIDSet.setToNow();
      }

      if(doTargetStatesUpdate)
      {
         if (this->offlineWait.hasTimeout() )
         {
            // if we're waiting to be offlined, set our local state to needs-resync and don't report
            // anything to the mgmtd
            setNodeConsistencyState(TargetConsistencyState_NEEDS_RESYNC);
         }
         else
         {
            TargetConsistencyState newConsistencyState;
            updateMetaStatesAndBuddyGroups(newConsistencyState, true);
            setNodeConsistencyState(newConsistencyState);
            downloadAndSyncTargetStatesAndBuddyGroups();
         }

         lastTargetStatesUpdateT.setToNow();
      }

      if (doPublishCapacities)
      {
         publishNodeCapacity();
         lastCapacityPublishedT.setToNow();
      }
   }
}

更新元資料狀態

// fhgfs_meta\source\components\InternodeSyncer.cpp

/**
 * Download and sync metadata server target states and mirror buddy groups.
 *
 * @param outConsistencyState The new node consistency state.
 */
bool InternodeSyncer::updateMetaStatesAndBuddyGroups(TargetConsistencyState& outConsistencyState,
   bool publish)
{
   LOG_TOP(STATESYNC, DEBUG, "Starting state update.");

   App* app = Program::getApp();
   NodeStore* mgmtNodes = app->getMgmtNodes();
   TargetStateStore* metaStateStore = app->getMetaStateStore();
   MirrorBuddyGroupMapper* buddyGroupMapper = app->getMetaBuddyGroupMapper();

   static bool downloadFailedLogged = false; // to avoid log spamming
   static bool publishFailedLogged = false;

   NumNodeID localNodeID = app->getLocalNodeNumID();

   auto node = mgmtNodes->referenceFirstNode();
   if(!node)
   {
      LOG_TOP(STATESYNC, ERR, "Management node not defined.");
      return false;
   }

   UInt16List buddyGroupIDs;
   UInt16List primaryNodeIDs;
   UInt16List secondaryNodeIDs;

   UInt16List nodeIDs; // this should actually be targetIDs, but MDS doesn't have targets yet
   UInt8List reachabilityStates;
   UInt8List consistencyStates;

   unsigned numRetries = 10; // If publishing states fails 10 times, give up (-> POFFLINE).

   // Note: Publishing fails if between downloadStatesAndBuddyGroups and
   // publishLocalTargetStateChanges, a state on the mgmtd is changed (e.g. because the primary
   // sets NEEDS_RESYNC for the secondary). In that case, we will retry.

   LOG_TOP(STATESYNC, DEBUG, "Beginning target state update...");
   bool publishSuccess = false;

   while (!publishSuccess && (numRetries--) )
   {
      // In case we're already retrying, clear out leftover data from the lists.
      buddyGroupIDs.clear();
      primaryNodeIDs.clear();
      secondaryNodeIDs.clear();
      nodeIDs.clear();
      reachabilityStates.clear();
      consistencyStates.clear();


      bool downloadRes = NodesTk::downloadStatesAndBuddyGroups(*node, NODETYPE_Meta, &buddyGroupIDs,
         &primaryNodeIDs, &secondaryNodeIDs, &nodeIDs, &reachabilityStates, &consistencyStates,
         true);

      if (!downloadRes)
      {
         if (!downloadFailedLogged)
         {
            LOG_TOP(STATESYNC, WARNING,
               "Downloading target states from management node failed. "
               "Setting all target states to probably-offline.");
            downloadFailedLogged = true;
         }

         metaStateStore->setAllStates(TargetReachabilityState_POFFLINE);

         break;
      }

      downloadFailedLogged = false;

      UInt8List oldConsistencyStates = consistencyStates;

      // Sync buddy groups here, because decideResync depends on it.
      metaStateStore->syncStatesAndGroupsFromLists(buddyGroupMapper, nodeIDs, reachabilityStates,
         consistencyStates, buddyGroupIDs, primaryNodeIDs, secondaryNodeIDs, localNodeID);

      CombinedTargetState newStateFromMgmtd;
      // Find local state which was sent by mgmtd
      for (ZipIterRange<UInt16List, UInt8List, UInt8List>
           statesFromMgmtdIter(nodeIDs, reachabilityStates, consistencyStates);
           !statesFromMgmtdIter.empty(); ++statesFromMgmtdIter)
      {
         if (*(statesFromMgmtdIter()->first) == localNodeID.val())
         {
            newStateFromMgmtd = CombinedTargetState(
               TargetReachabilityState(*(statesFromMgmtdIter()->second) ),
               TargetConsistencyState(*(statesFromMgmtdIter()->third) ) );
         }
      }

      TargetConsistencyState localChangedState = decideResync(newStateFromMgmtd);

      if (!publish)
      {
         outConsistencyState = localChangedState;
         metaStateStore->setState(localNodeID.val(),
            CombinedTargetState(TargetReachabilityState_ONLINE, localChangedState) );

         return true;
      }


      // Note: In this case "old" means "before we changed it locally".
      TargetConsistencyState oldState = newStateFromMgmtd.consistencyState;

      publishSuccess = publishNodeStateChange(oldState, localChangedState);

      if (publishSuccess)
      {
         outConsistencyState = localChangedState;

         metaStateStore->setState(localNodeID.val(),
            CombinedTargetState(TargetReachabilityState_ONLINE, localChangedState) );

         BuddyCommTk::checkBuddyNeedsResync();
      }
   }

   if (!publishSuccess)
   {
      if (!publishFailedLogged)
      {
         LOG_TOP(STATESYNC, WARNING, "Pushing local state to management node failed.");
         publishFailedLogged = true;
      }
   }
   else
      publishFailedLogged = false;

   return true;
}

檢測元資料狀態

只有主節點才會做同步檢查。

// fhgfs_meta\source\toolkit\BuddyCommTk.cpp

namespace BuddyCommTk
{
   static const std::string BUDDY_NEEDS_RESYNC_FILENAME        = "buddyneedsresync";

   void checkBuddyNeedsResync()
   {
      App* app = Program::getApp();
      MirrorBuddyGroupMapper* metaBuddyGroups = app->getMetaBuddyGroupMapper();
      TargetStateStore* metaNodeStates = app->getMetaStateStore();
      InternodeSyncer* internodeSyncer = app->getInternodeSyncer();
      BuddyResyncer* buddyResyncer = app->getBuddyResyncer();

      NumNodeID localID = app->getLocalNodeNumID();
      bool isPrimary;
      NumNodeID buddyID = NumNodeID(metaBuddyGroups->getBuddyTargetID(localID.val(), &isPrimary) );

      if (isPrimary) // Only do the check if we are the primary.
      {
         const bool buddyNeedsResyncFileExists = getBuddyNeedsResync();

         if (buddyNeedsResyncFileExists)
         {
            LOG_DEBUG(__func__, Log_NOTICE, "buddyneedsresync file found.");

            CombinedTargetState state = CombinedTargetState(TargetReachabilityState_ONLINE,
               TargetConsistencyState_NEEDS_RESYNC);
            metaNodeStates->getState(buddyID.val(), state);

            // Only send message if buddy was still reported as GOOD before (otherwise the mgmtd
            // already knows it needs a resync, or it's BAD and shouldn't be resynced anyway).
            if (state.consistencyState == TargetConsistencyState_GOOD)
            {
               setBuddyNeedsResyncState(true);
               LogContext(__func__).log(Log_NOTICE, "Set needs-resync state for buddy node.");
            }
         }

         // check if the secondary is set to needs-resync by the mgmtd.
         TargetConsistencyState consistencyState = internodeSyncer->getNodeConsistencyState();

         // If our own state is not good, don't start resync (wait until InternodeSyncer sets us
         // good again).
         if (consistencyState != TargetConsistencyState_GOOD)
         {
            LOG_DEBUG(__func__, Log_DEBUG,
               "Local node state is not good, won't check buddy state.");
            return;
         }

         CombinedTargetState buddyState;
         if (!metaNodeStates->getState(buddyID.val(), buddyState) )
         {
            LOG_DEBUG(__func__, Log_DEBUG, "Buddy state is invalid for node ID "
               + buddyID.str() + ".");
            return;
         }

         if (buddyState == CombinedTargetState(TargetReachabilityState_ONLINE,
             TargetConsistencyState_NEEDS_RESYNC) )
         {
            FhgfsOpsErr resyncRes = buddyResyncer->startResync();

            if (resyncRes == FhgfsOpsErr_SUCCESS)
            {
               LOG(WARNING, "Starting buddy resync job.", as("Buddy node ID", buddyID.val()));
            }
            else if (resyncRes == FhgfsOpsErr_INUSE)
            {
               LOG(WARNING, "Resync job currently running.", as("Buddy node ID", buddyID.val()));
            }
            else
            {
               LOG(WARNING, "Starting buddy resync job failed.", as("Buddy node ID", buddyID.val()));
            }
         }
      }
   }

}

開始同步元資料

// fhgfs_meta\source\components\buddyresyncer\BuddyResyncer.cpp

FhgfsOpsErr BuddyResyncer::startResync()
{
   std::lock_guard<Mutex> lock(jobMutex);

   if (noNewResyncs)
      return FhgfsOpsErr_INTERRUPTED;

   if (!job)
   {
      job = new BuddyResyncJob();
      job->start();
      return FhgfsOpsErr_SUCCESS;
   }

   switch (job->getState())
   {
      case BuddyResyncJobState_NOTSTARTED:
      case BuddyResyncJobState_RUNNING:
         return FhgfsOpsErr_INUSE;

      default:
         // a job must never be started more than once. to ensure this, we create a new job for
         // every resync process, because doing so allows us to use NOTSTARTED and RUNNING as
         // "job is currently active" values. otherwise, a second resync may see state SUCCESS and
         // allow duplicate resyncer activity.
         // if a job is still active, don't wait for very long - it may take a while to finish. the
         // internode syncer will retry periodically, so this will work fine.
         if (!job->timedjoin(10))
            return FhgfsOpsErr_INUSE;

         delete job;
         job = new BuddyResyncJob();
         job->start();
         return FhgfsOpsErr_SUCCESS;
   }
}

同步工作類(BuddyResyncJob)

// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.h

class BuddyResyncJob : public PThread
{
   public:
      BuddyResyncJob();
      ~BuddyResyncJob();

      virtual void run();

      void abort();
      MetaBuddyResyncJobStatistics getJobStats();

   private:
      BuddyResyncJobState state;
      Mutex stateMutex;

      int64_t startTime;
      int64_t endTime;

      NumNodeID buddyNodeID;

      MetaSyncCandidateStore syncCandidates;

      std::unique_ptr<BuddyResyncerGatherSlave> gatherSlave;
      std::vector<std::unique_ptr<BuddyResyncerBulkSyncSlave>> bulkSyncSlaves;
      std::unique_ptr<BuddyResyncerModSyncSlave> modSyncSlave;
      std::unique_ptr<SessionStoreResyncer> sessionStoreResyncer;

      bool startGatherSlaves();
      bool startSyncSlaves();
      void joinGatherSlaves();

   public:
      BuddyResyncJobState getState()
      {
         std::lock_guard<Mutex> lock(stateMutex);
         return state;
      }

      bool isRunning()
      {
         std::lock_guard<Mutex> lock(stateMutex);
         return state == BuddyResyncJobState_RUNNING;
      }

      void enqueue(MetaSyncCandidateFile syncCandidate, PThread* caller)
      {
         syncCandidates.add(std::move(syncCandidate), caller);
      }

   private:
      void setState(const BuddyResyncJobState state)
      {
         LOG_DEBUG(__func__, Log_DEBUG, "Setting state: "
            + StringTk::uintToStr(static_cast<int>(state) ) );
         std::lock_guard<Mutex> lock(stateMutex);
         this->state = state;
      }

      TargetConsistencyState newBuddyState();
      void informBuddy(const TargetConsistencyState newTargetState);
      void informMgmtd(const TargetConsistencyState newTargetState);

      void stopAllWorkersOn(Barrier& barrier);
};
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.cpp

BuddyResyncJob::BuddyResyncJob() :
   PThread("BuddyResyncJob"),
   state(BuddyResyncJobState_NOTSTARTED),
   startTime(0), endTime(0),
   gatherSlave(boost::make_unique<BuddyResyncerGatherSlave>(&syncCandidates))
{
   App* app = Program::getApp();
   Config* cfg = app->getConfig();
   buddyNodeID =
      NumNodeID(app->getMetaBuddyGroupMapper()->getBuddyTargetID(app->getLocalNodeNumID().val()));

   const unsigned numSyncSlaves = std::max<unsigned>(cfg->getTuneNumResyncSlaves(), 1);

   for (size_t i = 0; i < numSyncSlaves; i++)
      bulkSyncSlaves.emplace_back(
            boost::make_unique<BuddyResyncerBulkSyncSlave>(*this, &syncCandidates, i, buddyNodeID));

   sessionStoreResyncer = boost::make_unique<SessionStoreResyncer>(buddyNodeID);
   modSyncSlave = boost::make_unique<BuddyResyncerModSyncSlave>(*this, &syncCandidates, 1, buddyNodeID);
}

void BuddyResyncJob::run()
{
   const char* logContext = "Run resync job";

   InternodeSyncer* internodeSyncer = Program::getApp()->getInternodeSyncer();
   App* app = Program::getApp();
   WorkerList* workers = app->getWorkers();
   NodeStore* metaNodes = app->getMetaNodes();
   const std::string metaPath = app->getMetaPath();
   const std::string metaBuddyMirPath = app->getMetaPath() + "/" + CONFIG_BUDDYMIRROR_SUBDIR_NAME;
   Barrier workerBarrier(workers->size() + 1);
   bool workersStopped = false;

   startTime = time(NULL);

   syncCandidates.clear();

   char* respBuf = NULL;
   NetMessage* respMsg = NULL;
   auto buddyNode = metaNodes->referenceNode(buddyNodeID);

   if (!buddyNode)
   {
      LOG(ERR, "Unable to resolve buddy node. Resync will not start.");
      setState(BuddyResyncJobState_FAILURE);
      goto cleanup;
   }

   DEBUG_ENV_VAR(unsigned, DIE_AT_RESYNC_N, 0, "BEEGFS_RESYNC_DIE_AT_N");
   if (DIE_AT_RESYNC_N) {
      static unsigned resyncs = 0;
      // for #479: terminating a server at this point caused the workers to terminate before the
      // resyncer had communicated with them, causing a deadlock on shutdown
      if (++resyncs == DIE_AT_RESYNC_N) {
         ::kill(0, SIGTERM);
         sleep(4);
      }
   }
   stopAllWorkersOn(workerBarrier);
   {
      // Notify buddy that resync started and wait for confirmation
      StorageResyncStartedMsg msg(buddyNodeID.val());
      const bool commRes = MessagingTk::requestResponse(*buddyNode, &msg,
            NETMSGTYPE_StorageResyncStartedResp, &respBuf, &respMsg);

      if (!commRes)
      {
         LogContext(logContext).logErr("Unable to notify buddy about resync attempt. "
               "Resync will not start.");
         setState(BuddyResyncJobState_FAILURE);
         workerBarrier.wait();
         goto cleanup;
      }

      SAFE_DELETE(respMsg);
      SAFE_DELETE(respBuf);

      // resync could have been aborted before we got here. if so, exit as soon as possible without
      // setting the resync job state to something else.
      {
         std::unique_lock<Mutex> lock(stateMutex);

         if (state == BuddyResyncJobState_INTERRUPTED)
         {
            lock.unlock();
            workerBarrier.wait();
            goto cleanup;
         }

         state = BuddyResyncJobState_RUNNING;
      }
      internodeSyncer->setResyncInProgress(true);

      const bool startGatherSlaveRes = startGatherSlaves();
      if (!startGatherSlaveRes)
      {
         setState(BuddyResyncJobState_FAILURE);
         workerBarrier.wait();
         goto cleanup;
      }

      const bool startResyncSlaveRes = startSyncSlaves();
      if (!startResyncSlaveRes)
      {
         setState(BuddyResyncJobState_FAILURE);
         workerBarrier.wait();
         goto cleanup;
      }
   }
   workerBarrier.wait();

   LOG_DEBUG(__func__, Log_DEBUG, "Going to join gather slaves.");
   joinGatherSlaves();
   LOG_DEBUG(__func__, Log_DEBUG, "Joined gather slaves.");

   LOG_DEBUG(__func__, Log_DEBUG, "Going to join sync slaves.");

   // gather slaves have finished. Tell sync slaves to stop when work packages are empty and wait.
   for (auto it = bulkSyncSlaves.begin(); it != bulkSyncSlaves.end(); ++it)
   {
      (*it)->setOnlyTerminateIfIdle(true);
      (*it)->selfTerminate();
   }

   for (auto it = bulkSyncSlaves.begin(); it != bulkSyncSlaves.end(); ++it)
      (*it)->join();

   // here we can be in one of two situations:
   //  1. bulk resync has succeeded. we then totally stop the workers: the session store must be in
   //     a quiescent state for resync, so for simplicitly, we suspend all client operations here.
   //     we do not want to do this any earlier than this point, because bulk syncers may take a
   //     very long time to complete.
   //  2. bulk resync has failed. in this case, the bulk syncers have aborted the currently running
   //     job, and the mod syncer is either dead or in the process of dying. here we MUST NOT stop
   //     the workers, because they are very likely blocked on the mod sync queue already and will
   //     not unblock before the queue is cleared.
   if (getState() == BuddyResyncJobState_RUNNING)
   {
      stopAllWorkersOn(workerBarrier);
      workersStopped = true;
   }

   modSyncSlave->setOnlyTerminateIfIdle(true);
   modSyncSlave->selfTerminate();
   modSyncSlave->join();

   // gatherers are done and the workers have been stopped, we can safely resync the session now.

   LOG_DEBUG(__func__, Log_DEBUG, "Joined sync slaves.");

   // Perform session store resync
   // the job may have been aborted or terminated by errors. in this case, do not resync the session
   // store. end the sync as quickly as possible.
   if (getState() == BuddyResyncJobState_RUNNING)
      sessionStoreResyncer->doSync();

   // session store is now synced, and future actions can be forwarded safely. we do not restart
   // the workers here because the resync may still enter FAILED state, and we don't want to forward
   // to the secondary in this case.

cleanup:
   bool syncErrors = false;

   {
      std::lock_guard<Mutex> lock(gatherSlave->stateMutex);
      while (gatherSlave->isRunning)
         gatherSlave->isRunningChangeCond.wait(&gatherSlave->stateMutex);

      syncErrors |= gatherSlave->getStats().errors != 0;
   }

   for (auto it = bulkSyncSlaves.begin(); it != bulkSyncSlaves.end(); ++it)
   {
      BuddyResyncerBulkSyncSlave* slave = it->get();
      std::lock_guard<Mutex> lock(slave->stateMutex);
      while (slave->isRunning)
         slave->isRunningChangeCond.wait(&slave->stateMutex);

      syncErrors |= slave->getStats().dirErrors != 0;
      syncErrors |= slave->getStats().fileErrors != 0;
   }

   syncErrors |= sessionStoreResyncer->getStats().errors;

   {
      while (modSyncSlave->isRunning)
         modSyncSlave->isRunningChangeCond.wait(&modSyncSlave->stateMutex);

      syncErrors |= modSyncSlave->getStats().errors != 0;
   }


   if (getState() == BuddyResyncJobState_RUNNING || getState() == BuddyResyncJobState_INTERRUPTED)
   {
      if (syncErrors)
         setState(BuddyResyncJobState_ERRORS);
      else if (getState() == BuddyResyncJobState_RUNNING)
         setState(BuddyResyncJobState_SUCCESS);

      // delete timestamp override file if it exists.
      BuddyCommTk::setBuddyNeedsResync(metaPath, false, buddyNodeID);

      const TargetConsistencyState buddyState = newBuddyState();
      informBuddy(buddyState);
      informMgmtd(buddyState);

      const bool interrupted = getState() != BuddyResyncJobState_SUCCESS;
      LOG(WARNING, "Resync finished.", interrupted, syncErrors);
   }

   internodeSyncer->setResyncInProgress(false);
   endTime = time(NULL);

   // restart all the worker threads
   if (workersStopped)
      workerBarrier.wait();

   // if the resync was aborted, the mod sync queue may still contain items. additionally, workers
   // may be waiting for a changeset slot, or they may have started executing after the resync was
   // aborted by the sync slaves, but before the resync was officially set to "not running".
   // we cannot set the resync to "not running" in abort() because we have no upper bound for the
   // number of worker threads. even if we did set the resync to "not running" in abort() and
   // cleared the sync queues at the same time, there may still be an arbitrary number of threads
   // waiting for a changeset slot.
   // instead, we have to wait for each thread to "see" that the resync is over, and periodically
   // clear the sync queue to unblock those workers that are still waiting for slots.
   if (syncErrors)
   {
      SynchronizedCounter counter;

      for (auto it = workers->begin(); it != workers->end(); ++it)
      {
         auto& worker = **it;

         worker.getWorkQueue()->addPersonalWork(
               new IncSyncedCounterWork(&counter),
               worker.getPersonalWorkQueue());
      }

      while (!counter.timedWaitForCount(workers->size(), 100))
      {
         while (!syncCandidates.isFilesEmpty())
         {
            MetaSyncCandidateFile candidate;
            syncCandidates.fetch(candidate, this);
            candidate.signal();
         }
      }
   }
}

目錄收集(BuddyResyncerGatherSlave)

// fhgfs_storage\source\components\buddyresyncer\BuddyResyncerGatherSlave.h

class BuddyResyncerGatherSlave : public PThread
{
   friend class BuddyResyncer; // (to grant access to internal mutex)
   friend class BuddyResyncJob; // (to grant access to internal mutex)

   public:
      BuddyResyncerGatherSlave(uint16_t targetID, ChunkSyncCandidateStore* syncCandidates,
         BuddyResyncerGatherSlaveWorkQueue* workQueue, uint8_t slaveID);
      virtual ~BuddyResyncerGatherSlave();

      void workLoop();

   private:
      AtomicSizeT onlyTerminateIfIdle; // atomic quasi-bool

      Mutex statusMutex; // protects isRunning
      Condition isRunningChangeCond;

      uint16_t targetID;

      AtomicUInt64 numChunksDiscovered;
      AtomicUInt64 numChunksMatched;

      AtomicUInt64 numDirsDiscovered;
      AtomicUInt64 numDirsMatched;

      bool isRunning; // true if an instance of this component is currently running

      ChunkSyncCandidateStore* syncCandidates;
      BuddyResyncerGatherSlaveWorkQueue* workQueue;

      // nftw() callback needs access the slave threads
      static Mutex staticGatherSlavesMutex;
      static std::map<std::string, BuddyResyncerGatherSlave*> staticGatherSlaves;

      virtual void run();

      static int handleDiscoveredEntry(const char* path, const struct stat* statBuf,
         int ftwEntryType, struct FTW* ftwBuf);

   public:
      // getters & setters
      bool getIsRunning()
      {
         SafeMutexLock safeLock(&statusMutex);

         bool retVal = this->isRunning;

         safeLock.unlock();

         return retVal;
      }

      uint16_t getTargetID()
      {
         return targetID;
      }

      void getCounters(uint64_t& outNumChunksDiscovered, uint64_t& outNumChunksMatched,
         uint64_t& outNumDirsDiscovered, uint64_t& outNumDirsMatched)
      {
         outNumChunksDiscovered = numChunksDiscovered.read();
         outNumChunksMatched = numChunksMatched.read();
         outNumDirsDiscovered = numDirsDiscovered.read();
         outNumDirsMatched = numDirsMatched.read();
      }

      void setOnlyTerminateIfIdle(bool value)
      {
         if (value)
            onlyTerminateIfIdle.set(1);
         else
            onlyTerminateIfIdle.setZero();
      }

      bool getOnlyTerminateIfIdle()
      {
         if (onlyTerminateIfIdle.read() == 0)
            return false;
         else
            return true;
      }

   private:
      // getters & setters

      void setIsRunning(bool isRunning)
      {
         SafeMutexLock safeLock(&statusMutex);

         this->isRunning = isRunning;
         isRunningChangeCond.broadcast();

         safeLock.unlock();
      }

      bool getSelfTerminateNotIdle()
      {
         return ( (getSelfTerminate() && (!getOnlyTerminateIfIdle())) );
      }
};

// sfhgfs_meta\source\components\buddyresyncer\BuddyResyncerGatherSlave.cpp

BuddyResyncerGatherSlave::BuddyResyncerGatherSlave(MetaSyncCandidateStore* syncCandidates) :
   PThread("BuddyResyncerGatherSlave"),
   isRunning(false),
   syncCandidates(syncCandidates)
{
   metaBuddyPath = Program::getApp()->getMetaPath() + "/" CONFIG_BUDDYMIRROR_SUBDIR_NAME;
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.cpp

bool BuddyResyncJob::startGatherSlaves()
{
   try
   {
      gatherSlave->resetSelfTerminate();
      gatherSlave->start();
      gatherSlave->setIsRunning(true);
   }
   catch (PThreadCreateException& e)
   {
      LogContext(__func__).logErr(std::string("Unable to start thread: ") + e.what());
      return false;
   }

   return true;
}
// fhgfs_storage\source\components\buddyresyncer\BuddyResyncerGatherSlave.cpp

void BuddyResyncerGatherSlave::run()
{
   setIsRunning(true);

   try
   {
      LOG(DEBUG, "Component started");
      registerSignalHandler();
      workLoop();
      LOG(DEBUG, "Component stopped");
   }
   catch (std::exception& e)
   {
      PThread::getCurrentThreadApp()->handleComponentException(e);
   }

   setIsRunning(false);
}

void BuddyResyncerGatherSlave::workLoop()
{
   crawlDir(metaBuddyPath + "/" META_INODES_SUBDIR_NAME, MetaSyncDirType::InodesHashDir);
   crawlDir(metaBuddyPath + "/" META_DENTRIES_SUBDIR_NAME, MetaSyncDirType::DentriesHashDir);
}

void BuddyResyncerGatherSlave::crawlDir(const std::string& path, const MetaSyncDirType type,
   const unsigned level)
{
   LOG_DBG(DEBUG, "Entering hash dir.", level, path);

   std::unique_ptr<DIR, StorageTk::CloseDirDeleter> dirHandle(::opendir(path.c_str()));

   if (!dirHandle)
   {
      LOG(ERR, "Unable to open path", path, sysErr());
      numErrors.increase();
      return;
   }

   while (!getSelfTerminate())
   {
      struct dirent* entry;

#if USE_READDIR_R
      struct dirent buffer;
      const int readRes = ::readdir_r(dirHandle.get(), &buffer, &entry);
#else
      errno = 0;
      entry = ::readdir(dirHandle.get());
      const int readRes = entry ? 0 : errno;
#endif
      if (readRes != 0)
      {
         LOG(ERR, "Could not read dir entry.", path, sysErr(readRes));
         numErrors.increase();
         return;
      }

      if (!entry)
         break;

      if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
         continue;

      const std::string& candidatePath = path + "/" + entry->d_name;

      struct stat statBuf;
      const int statRes = ::stat(candidatePath.c_str(), &statBuf);
      if (statRes)
      {
         // in a 2nd level dentry hashdir, content directories may disappear - this is not an error,
         // it was most likely caused by an rmdir issued by a user.
         if (!(errno == ENOENT && type == MetaSyncDirType::DentriesHashDir && level == 2))
         {
            LOG(ERR, "Could not stat dir entry.", candidatePath, sysErr());
            numErrors.increase();
         }

         continue;
      }

      if (!S_ISDIR(statBuf.st_mode))
      {
         LOG(ERR, "Found a non-dir where only directories are expected.", candidatePath,
               oct(statBuf.st_mode));
         numErrors.increase();
         continue;
      }

      // layout is: (dentries|inodes)/l1/l2/...
      //  -> level 0 correlates with type
      //  -> level 1 is not very interesting, except for reporting
      //  -> level 2 must be synced. if it is a dentry hashdir, its contents must also be crawled.
      if (level == 0)
      {
         crawlDir(candidatePath, type, level + 1);
         continue;
      }

      if (level == 1)
      {
         LOG_DBG(DEBUG, "Adding hashdir sync candidate.", candidatePath);
         addCandidate(candidatePath, type);

         if (type == MetaSyncDirType::DentriesHashDir)
            crawlDir(candidatePath, type, level + 1);

         continue;
      }

      // so here we read a 2nd level dentry hashdir. crawl that too, add sync candidates for each
      // entry we find - non-directories have already been reported, and the bulk resyncer will
      // take care of the fsids directories.
      numDirsDiscovered.increase();
      LOG_DBG(DEBUG, "Adding contdir sync candidate.", candidatePath);
      addCandidate(candidatePath, MetaSyncDirType::ContentDir);
   }
}

目錄儲存(MetaSyncCandidateStore)

SyncCandidateStore裡面有兩個std::list佇列MetaSyncCandidateDir和MetaSyncCandidateFile,分別用於主備不一致時Resync和元資料改動時實時同步。

// fhgfs_meta\source\components\buddyresyncer\SyncCandidate.h

typedef SyncCandidateStore<MetaSyncCandidateDir, MetaSyncCandidateFile> MetaSyncCandidateStore;
// fhgfs_common\source\common\storage\mirroring\SyncCandidateStore.h

template <typename SyncCandidateDir, typename SyncCandidateFile>
class SyncCandidateStore
{
   public:
       SyncCandidateStore()
          : numQueuedFiles(0), numQueuedDirs(0)
       { }

   private:
      typedef std::list<SyncCandidateFile> CandidateFileList;
      CandidateFileList candidatesFile;
      Mutex candidatesFileMutex;
      Condition filesAddedCond;
      Condition filesFetchedCond;

      typedef std::list<SyncCandidateDir> CandidateDirList;
      CandidateDirList candidatesDir;
      Mutex candidatesDirMutex;
      Condition dirsAddedCond;
      Condition dirsFetchedCond;

      // mainly used to avoid constant calling of size() method of lists
      unsigned numQueuedFiles;
      unsigned numQueuedDirs;

      static const unsigned MAX_QUEUE_SIZE = 50000;

   public:
      void add(SyncCandidateFile entry, PThread* caller)
      {
         static const unsigned waitTimeoutMS = 1000;

         std::lock_guard<Mutex> mutexLock(candidatesFileMutex);

         // wait if list is too big
         while (numQueuedFiles > MAX_QUEUE_SIZE)
         {
            if (caller && unlikely(caller->getSelfTerminate() ) )
               break; // ignore limit if selfTerminate was set to avoid hanging on shutdown

            filesFetchedCond.timedwait(&candidatesFileMutex, waitTimeoutMS);
         }

         this->candidatesFile.push_back(std::move(entry));
         numQueuedFiles++;

         filesAddedCond.signal();
      }

      void fetch(SyncCandidateFile& outCandidate, PThread* caller)
      {
         static const unsigned waitTimeMS = 3000;

         std::lock_guard<Mutex> mutexLock(candidatesFileMutex);

         while (candidatesFile.empty() )
         {
            if(caller && unlikely(caller->getSelfTerminate() ) )
            {
               outCandidate = SyncCandidateFile();
               return;
            }

            filesAddedCond.timedwait(&candidatesFileMutex, waitTimeMS);
         }

         outCandidate = std::move(candidatesFile.front());
         candidatesFile.pop_front();
         numQueuedFiles--;
         filesFetchedCond.signal();
      }

      void add(SyncCandidateDir entry, PThread* caller)
      {
         static const unsigned waitTimeoutMS = 3000;

         std::lock_guard<Mutex> mutexLock(candidatesDirMutex);

         // wait if list is too big
         while (numQueuedDirs > MAX_QUEUE_SIZE)
         {
            if (caller && unlikely(caller->getSelfTerminate() ) )
               break; // ignore limit if selfTerminate was set to avoid hanging on shutdown

            dirsFetchedCond.timedwait(&candidatesDirMutex, waitTimeoutMS);
         }

         this->candidatesDir.push_back(std::move(entry));
         numQueuedDirs++;

         dirsAddedCond.signal();
      }

      bool waitForFiles(PThread* caller)
      {
         static const unsigned waitTimeoutMS = 3000;

         std::lock_guard<Mutex> mutexLock(candidatesFileMutex);

         while (numQueuedFiles == 0)
         {
            if (caller && caller->getSelfTerminate())
               return false;

            filesAddedCond.timedwait(&candidatesFileMutex, waitTimeoutMS);
         }

         return true;
      }

      void fetch(SyncCandidateDir& outCandidate, PThread* caller)
      {
         static const unsigned waitTimeMS = 3000;

         std::lock_guard<Mutex> mutexLock(candidatesDirMutex);

         while (candidatesDir.empty() )
         {
            if(caller && unlikely(caller->getSelfTerminate() ) )
            {
               outCandidate = SyncCandidateDir();
               return;
            }

            dirsAddedCond.timedwait(&candidatesDirMutex, waitTimeMS);
         }

         outCandidate = std::move(candidatesDir.front());
         candidatesDir.pop_front();
         numQueuedDirs--;
         dirsFetchedCond.signal();
      }

      bool isFilesEmpty()
      {
         std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
         return candidatesFile.empty();
      }

      bool isDirsEmpty()
      {
         std::lock_guard<Mutex> mutexLock(candidatesDirMutex);
         return candidatesDir.empty();
      }

      void waitForFiles(const unsigned timeoutMS)
      {
         std::lock_guard<Mutex> mutexLock(candidatesFileMutex);

         if (candidatesFile.empty() )
         {
            if (timeoutMS == 0)
               filesAddedCond.wait(&candidatesFileMutex);
            else
               filesAddedCond.timedwait(&candidatesFileMutex, timeoutMS);
         }
      }

      void waitForDirs(const unsigned timeoutMS)
      {
         std::lock_guard<Mutex> mutexLock(candidatesDirMutex);

         if (candidatesDir.empty() )
         {
            if (timeoutMS == 0)
               dirsAddedCond.wait(&candidatesDirMutex);
            else
               dirsAddedCond.timedwait(&candidatesDirMutex, timeoutMS);
         }
      }

      size_t getNumFiles()
      {
         std::lock_guard<Mutex> mutexLock(candidatesFileMutex);
         return candidatesFile.size();
      }

      size_t getNumDirs()
      {
         std::lock_guard<Mutex> mutexLock(candidatesDirMutex);
         return candidatesDir.size();
      }

      void clear()
      {
         {
            std::lock_guard<Mutex> dirMutexLock(candidatesDirMutex);
            candidatesDir.clear();
            numQueuedDirs = 0;
         }

         {
            std::lock_guard<Mutex> fileMutexLock(candidatesFileMutex);
            candidatesFile.clear();
            numQueuedFiles = 0;
         }
      }
};

批量同步(BuddyResyncerBulkSyncSlave)

// fhgfs_meta\source\components\buddyresyncer\BuddyResyncerBulkSyncSlave.h

class BuddyResyncerModSyncSlave : public SyncSlaveBase
{
   friend class BuddyResyncer;
   friend class BuddyResyncJob;

   public:
      BuddyResyncerModSyncSlave(BuddyResyncJob& parentJob, MetaSyncCandidateStore* syncCanditates,
            uint8_t slaveID, const NumNodeID& buddyNodeID);

      struct Stats
      {
         uint64_t objectsSynced;
         uint64_t errors;
      };

      Stats getStats()
      {
         return Stats{ numObjectsSynced.read(), numErrors.read() };
      }

   private:
      MetaSyncCandidateStore* syncCandidates;

      AtomicUInt64 numObjectsSynced;
      AtomicUInt64 numErrors;

      void syncLoop();

      FhgfsOpsErr streamCandidates(Socket& socket);

   private:
      static FhgfsOpsErr streamCandidates(Socket* socket, void* context)
      {
         return static_cast<BuddyResyncerModSyncSlave*>(context)->streamCandidates(*socket);
      }
};
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncJob.cpp

bool BuddyResyncJob::startSyncSlaves()
{
   App* app = Program::getApp();
   const NumNodeID localNodeID = app->getLocalNodeNumID();
   const NumNodeID buddyNodeID(
      app->getMetaBuddyGroupMapper()->getBuddyTargetID(localNodeID.val(), NULL) );

   for (size_t i = 0; i < bulkSyncSlaves.size(); i++)
   {
      try
      {
         bulkSyncSlaves[i]->resetSelfTerminate();
         bulkSyncSlaves[i]->start();
         bulkSyncSlaves[i]->setIsRunning(true);
      }
      catch (PThreadCreateException& e)
      {
         LogContext(__func__).logErr(std::string("Unable to start thread: ") + e.what() );

         for (size_t j = 0; j < i; j++)
            bulkSyncSlaves[j]->selfTerminate();

         return false;
      }
   }

   try
   {
      modSyncSlave->resetSelfTerminate();
      modSyncSlave->start();
      modSyncSlave->setIsRunning(true);
   }
   catch (PThreadCreateException& e)
   {
      LogContext(__func__).logErr(std::string("Unable to start thread: ") + e.what() );

      for (size_t j = 0; j < bulkSyncSlaves.size(); j++)
         bulkSyncSlaves[j]->selfTerminate();

      return false;
   }

   return true;
}
// fhgfs_meta\source\components\buddyresyncer\BuddyResyncerBulkSyncSlave.cpp

void BuddyResyncerBulkSyncSlave::syncLoop()
{
   EntryLockStore* const lockStore = Program::getApp()->getMirroredSessions()->getEntryLockStore();

   while (!getSelfTerminateNotIdle())
   {
      MetaSyncCandidateDir candidate;
      syncCandidates->fetch(candidate, this);

      // the sync candidate we have retrieved may be invalid if this thread was ordered to
      // terminate and the sync candidate store has no more directories queued for us.
      // in this case, we may end the sync because we have no more candidates, and the resync job
      // guarantees that all gather threads have completed before the bulk syncers are ordered to
      // finish.
      if (syncCandidates->isDirsEmpty() && candidate.getRelativePath().empty() &&
            getSelfTerminate())
         return;

      if (candidate.getType() == MetaSyncDirType::InodesHashDir ||
            candidate.getType() == MetaSyncDirType::DentriesHashDir)
      {
         // lock the hash path in accordance with MkLocalDir, RmLocalDir and RmDir.
         const auto& hashDir = candidate.getRelativePath();
         auto slash1 = hashDir.find('/');
         auto slash2 = hashDir.find('/', slash1 + 1);
         auto hash1 = StringTk::strHexToUInt(hashDir.substr(slash1 + 1, slash2 - slash1 - 1));
         auto hash2 = StringTk::strHexToUInt(hashDir.substr(slash2 + 1));
         HashDirLock hashLock = {lockStore, {hash1, hash2}};

         const FhgfsOpsErr resyncRes = resyncDirectory(candidate, "");
         if (resyncRes == FhgfsOpsErr_SUCCESS)
            continue;

         numDirErrors.increase();
         parentJob->abort();
         return;
      }

      // not a hash dir, so it must be a content directory. sync the #fSiDs# first, then the actual
      // content directory. we lock the directory inode the content directory belongs to because we
      // must not allow a concurrent meta action to delete the content directory while we are
      // resyncing it. concurrent modification of directory contents could be allowed, though.

      const std::string dirInodeID = Path(candidate.getRelativePath()).back();
      const std::string fullPath = META_BUDDYMIRROR_SUBDIR_NAME "/" + candidate.getRelativePath();

      DirIDLock dirLock(lockStore, dirInodeID, false);

      // first ensure that the directory still exists - a concurrent modification may have deleted
      // it. this would not be an error; bulk resync should not touch it, an modification sync
      // would remove it completely.
      if (::access(fullPath.c_str(), F_OK) != 0 && errno == ENOENT)
      {
         numDirsSynced.increase(); // Count it anyway, so the sums match up.
         continue;
      }

      MetaSyncCandidateDir fsIDs(
            candidate.getRelativePath() + "/" + META_DIRENTRYID_SUB_STR,
            MetaSyncDirType::InodesHashDir);

      FhgfsOpsErr resyncRes = resyncDirectory(fsIDs, dirInodeID);
      if (resyncRes == FhgfsOpsErr_SUCCESS)
         resyncRes = resyncDirectory(candidate, dirInodeID);

      if (resyncRes != FhgfsOpsErr_SUCCESS)
      {
         numDirErrors.increase();
         parentJob->abort();
         return;
      }
      else
      {
         numDirsSynced.increase();
      }
   }
}

FhgfsOpsErr BuddyResyncerBulkSyncSlave::resyncDirectory(const MetaSyncCandidateDir& root,
   const std::string& inodeID)
{
   StreamCandidateArgs args(*this, root, inodeID);

   return resyncAt(Path(root.getRelativePath()), true, streamCandidateDir, &args);
}

FhgfsOpsErr BuddyResyncerBulkSyncSlave::streamCandidateDir(Socket& socket,
   const MetaSyncCandidateDir& candidate, const std::string& inodeID)
{
   EntryLockStore* const lockStore = Program::getApp()->getMirroredSessions()->getEntryLockStore();

   Path candidatePath(META_BUDDYMIRROR_SUBDIR_NAME "/" + candidate.getRelativePath());

   std::unique_ptr<DIR, StorageTk::CloseDirDeleter> dir(opendir(candidatePath.str().c_str()));

   if (!dir)
   {
      LOG(ERR, "Could not open candidate directory.", candidatePath, sysErr());
      return FhgfsOpsErr_INTERNAL;
   }

   int dirFD = ::dirfd(dir.get());
   if (dirFD < 0)
   {
      LOG(ERR, "Could not open candidate directory.", candidatePath, sysErr());
      return FhgfsOpsErr_INTERNAL;
   }

   while (true)
   {
      struct dirent* entry;

#if USE_READDIR_P
      struct dirent entryBuf;
      int err = ::readdir_r(dir.get(), &entryBuf, &entry);
#else
      errno = 0;
      entry = readdir(dir.get());
      int err = entry ? 0 : errno;
#endif
      if (err > 0)
      {
         LOG(ERR, "Could not read candidate directory.", candidatePath, sysErr());
         numDirErrors.increase();
         break;
      }

      if (!entry)
         break;

      if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
         continue;

      struct stat statData;
      if (::fstatat(dirFD, entry->d_name, &statData, AT_SYMLINK_NOFOLLOW) < 0)
      {
         // the file/directory may have gone away. this is not an error, and the secondary will
         // delete the file/directory as well.
         if (errno == ENOENT)
            continue;

         LOG(ERR, "Could not stat resync candidate.", candidatePath, entry->d_name, sysErr());
         numFileErrors.increase();
         continue;
      }

      if (!S_ISDIR(statData.st_mode) && !S_ISREG(statData.st_mode))
      {
         LOG(ERR, "Resync candidate is neither file nor directory.",
               candidatePath, entry->d_name, statData.st_mode);
         numFileErrors.increase();
         continue;
      }

      if (candidate.getType() == MetaSyncDirType::ContentDir)
      {
         // if it's in a content directory and a directory, it can really only be the fsids dir.
         // locking for this case is already sorted, so we only have to transfer the (empty)
         // inode metadata to tell the secondary that the directory may stay.
         if (S_ISDIR(statData.st_mode))
         {
            const FhgfsOpsErr streamRes = streamInode(socket, Path(entry->d_name), true);
            if (streamRes != FhgfsOpsErr_SUCCESS)
               return streamRes;
         }
         else
         {
            ParentNameLock dentryLock(lockStore, inodeID, entry->d_name);

            const auto streamRes = streamDentry(socket, Path(), entry->d_name);
            if (streamRes != FhgfsOpsErr_SUCCESS)
            {
               numFileErrors.increase();
               return streamRes;
            }
            else
            {
               numFilesSynced.increase();
            }
         }

         continue;
      }

      // we are now either in a fsids (file inode) directory or a second-level inode hash-dir,
      // which may contain either file or directory inodes. taking a lock unnecessarily is stilll
      // cheaper than reading the inode from disk to determine its type, so just lock the inode id
      // as both file and directory
      FileIDLock fileLock(lockStore, entry->d_name);
      DirIDLock dirLock(lockStore, entry->d_name, true);

      // access the file once more, because it may have been deleted in the meantime. a new entry
      // with the same name cannot appear in a sane filesystem (that would indicate an ID being
      // reused).
      if (faccessat(dirFD, entry->d_name, F_OK, 0) < 0 && errno == ENOENT)
         continue;

      const FhgfsOpsErr streamRes = streamInode(socket, Path(entry->d_name),
            S_ISDIR(statData.st_mode));
      if (streamRes != FhgfsOpsErr_SUCCESS)
      {
         numFileErrors.increase();
         return streamRes;
      }
      else
      {
         numFilesSynced.increase();
      }
   }

   return sendResyncPacket(socket, std::tuple<>());
}
// fhgfs_meta\source\components\buddyresyncer\SyncSlaveBase.cpp

FhgfsOpsErr SyncSlaveBase::resyncAt(const Path& basePath, bool wholeDirectory,
   FhgfsOpsErr (*streamFn)(Socket*, void*), void* context)
{
   const bool sendXAttrs = Program::getApp()->getConfig()->getStoreClientXAttrs();

   this->basePath = META_BUDDYMIRROR_SUBDIR_NAME / basePath;

   ResyncRawInodesMsgEx msg(basePath, sendXAttrs, wholeDirectory);

   RequestResponseNode rrNode(buddyNodeID, Program::getApp()->getMetaNodes());
   RequestResponseArgs rrArgs(nullptr, &msg, NETMSGTYPE_ResyncRawInodesResp,
         streamFn, context);

   // resync processing may take a very long time for each step, eg if a very large directory must
   // be cleaned out on the secondary. do not use timeouts for resync communication right now.
   // TODO move long-running tasks on the secondary into own threads and have the secondary notify
   //      the primary on completion instead of having the primary poll the secondary
   rrArgs.minTimeoutMS = -1;

   const auto commRes = MessagingTk::requestResponseNode(&rrNode, &rrArgs);

   if (commRes != FhgfsOpsErr_SUCCESS)
   {
      LOG(ERR, "Error during communication with secondary.", commRes);
      return commRes;
   }

   const auto resyncRes = static_cast<ResyncRawInodesRespMsg*>(rrArgs.outRespMsg)->getResult();

   if (resyncRes != FhgfsOpsErr_SUCCESS)
      LOG(ERR, "Error while resyncing directory.", basePath, resyncRes);

   return resyncRes;
}

改動同步(BuddyResyncerModSyncSlave)

  • BuddyResyncerModSyncSlave定義:
// fhgfs_meta\source\components\buddyresyncer\BuddyResync