1. 程式人生 > >BeeGFS原始碼分析1-元資料服務概要分析

BeeGFS原始碼分析1-元資料服務概要分析

元資料服務是BeeGFS中用來維護檔案和目錄關係及其屬性配置的服務,其多執行緒epoll設計實現非常高效,主要流程如下:

  1. ConnAcceptor(PThread)類(一個執行緒)負責監聽埠,並接受客戶端連線,然後把;連線資訊(包含接收的套接字)寫入管道;
  2. StreamListenerV2(PThread)類(多個執行緒,可配置)從管道讀取連線資訊,使用epoll輪詢接收資料,然後生成IncomingPreprocessedMsgWork(Work),寫入MultiWorkQueue先進先出佇列;
  3. Worker(PThread)類(多個執行緒,可配置)從MultiWorkQueue佇列取出訊息進行處理。

程式初始化

主函式

  • 建立App物件,App物件是程式的主要載體:
// fhgfs_meta\source\program\main.cpp
#include "Program.h"

int main(int argc, char** argv)
{
   return Program::main(argc, argv);
}

// fhgfs_meta\source\program\Program.cpp

#include <common/toolkit/BuildTypeTk.h>
#include "Program.h"

#include <testing/TestRunner.h>

App* Program::app = NULL;

int Program::main(int argc, char** argv)
{
   BuildTypeTk::checkDebugBuildTypes();

   AbstractApp::runTimeInitsAndChecks(); // must be called before creating a new App

   app = new App(argc, argv);
   
   app->startInCurrentThread();
   
   int appRes = app->getAppResult();
   
   delete app;
   
   return appRes;
}

建立ConnAcceptor

  • 主程式中會初始化一個執行緒,監聽服務埠,由ConnAcceptor類負責:
// fhgfs_meta\source\app\App.cpp

void App::initComponents(TargetConsistencyState initialConsistencyState)
   throw(ComponentInitException)
{
   this->log->log(Log_DEBUG, "Initializing components...");

   this->dgramListener = new DatagramListener(
      netFilter, localNicList, ackStore, cfg->getConnMetaPortUDP() );
   if(cfg->getTuneListenerPrioShift() )
      dgramListener->setPriorityShift(cfg->getTuneListenerPrioShift() );

   streamListenersInit();

   unsigned short listenPort = cfg->getConnMetaPortTCP();

   this->connAcceptor = new ConnAcceptor(this, localNicList, listenPort);

   this->statsCollector = new StatsCollector(workQueue, STATSCOLLECTOR_COLLECT_INTERVAL_MS,
      STATSCOLLECTOR_HISTORY_LENGTH);

   this->buddyResyncer = new BuddyResyncer();

   this->internodeSyncer = new InternodeSyncer(initialConsistencyState);

   this->timerQueue = new TimerQueue(1, 1);

   this->modificationEventFlusher = new ModificationEventFlusher();

   workersInit();
   commSlavesInit();

   this->log->log(Log_DEBUG, "Components initialized.");
}

建立StreamListener

  • 根據配置建立多個StreamListener例項,每個例項對應執行緒,用於從ConnAcceptor接收新連線,已及從從連線讀取資料,生成Work:
// fhgfs_meta\source\app\App.cpp

void App::streamListenersInit() throw(ComponentInitException)
{
   this->numStreamListeners = cfg->getTuneNumStreamListeners();

   for(unsigned i=0; i < numStreamListeners; i++)
   {
      StreamListenerV2* listener = new StreamListenerV2(
         std::string("StreamLis") + StringTk::uintToStr(i+1), this, workQueue);

      if(cfg->getTuneListenerPrioShift() )
         listener->setPriorityShift(cfg->getTuneListenerPrioShift() );

      if(cfg->getTuneUseAggressiveStreamPoll() )
         listener->setUseAggressivePoll();

      streamLisVec.push_back(listener);
   }
}

建立WorkQueue

  • 建立WorkQueue,用於儲存StreamListener生成的Work:
// fhgfs_meta\source\app\App.cpp

/**
 * Init basic shared objects like work queues, node stores etc.
 */
void App::initDataObjects() throw(InvalidConfigException)
{
...
   this->workQueue = new MultiWorkQueue();
   this->commSlaveQueue = new MultiWorkQueue();

   if(cfg->getTuneUsePerUserMsgQueues() )
      workQueue->setIndirectWorkList(new UserWorkContainer() );

...
}

建立Worker

  • 根據配置建立Worker執行緒,從WorkQueue讀取Work並進行處理:
// fhgfs_meta\source\app\App.cpp

void App::workersInit() throw(ComponentInitException)
{
   unsigned numWorkers = cfg->getTuneNumWorkers();

   for(unsigned i=0; i < numWorkers; i++)
   {
      Worker* worker = new Worker(
         std::string("Worker") + StringTk::uintToStr(i+1), workQueue, QueueWorkType_INDIRECT);

      worker->setBufLens(cfg->getTuneWorkerBufSize(), cfg->getTuneWorkerBufSize() );

      workerList.push_back(worker);
   }

   for(unsigned i=0; i < APP_WORKERS_DIRECT_NUM; i++)
   {
      Worker* worker = new Worker(
         std::string("DirectWorker") + StringTk::uintToStr(i+1), workQueue, QueueWorkType_DIRECT);

      worker->setBufLens(cfg->getTuneWorkerBufSize(), cfg->getTuneWorkerBufSize() );

      workerList.push_back(worker);
   }
}

連線監聽

監聽類ConnAcceptor

  • ConnAcceptor類的定義:
// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.h

class ConnAcceptor : public PThread
{
   public:
      ConnAcceptor(AbstractApp* app, NicAddressList& localNicList, unsigned short listenPort)
         throw(ComponentInitException);
      virtual ~ConnAcceptor();


   private:
      AbstractApp*      app;
      LogContext        log;

      StandardSocket*   tcpListenSock;
      StandardSocket*   sdpListenSock;
      RDMASocket*       rdmaListenSock;

      int               epollFD;

      bool initSocks(unsigned short listenPort, NicListCapabilities* localNicCaps);

      virtual void run();
      void listenLoop();

      void onIncomingStandardConnection(StandardSocket* sock);
      void onIncomingRDMAConnection(RDMASocket* sock);

      void applySocketOptions(StandardSocket* sock);


   public:
      // getters & setters

};

連線監聽迴圈

  • 使用epool來輪詢監聽埠,並建立新連線:
// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp

void ConnAcceptor::run()
{
   try
   {
      registerSignalHandler();

      listenLoop();

      log.log(Log_DEBUG, "Component stopped.");
   }
   catch(std::exception& e)
   {
      PThread::getCurrentThreadApp()->handleComponentException(e);
   }
}

void ConnAcceptor::listenLoop()
{
   const int epollTimeoutMS = 3000;

   struct epoll_event epollEvents[EPOLL_EVENTS_NUM];

   // (just to have these values on the stack...)
   const int epollFD = this->epollFD;
   RDMASocket* rdmaListenSock = this->rdmaListenSock;
   StandardSocket* sdpListenSock = this->sdpListenSock;
   StandardSocket* tcpListenSock = this->tcpListenSock;

   // wait for incoming events and handle them...

   while(!getSelfTerminate() )
   {
      //log.log(Log_DEBUG, std::string("Before poll(). pollArrayLen: ") +
      //   StringTk::uintToStr(pollArrayLen) );

      int epollRes = epoll_wait(epollFD, epollEvents, EPOLL_EVENTS_NUM, epollTimeoutMS);

      if(unlikely(epollRes < 0) )
      { // error occurred
         if(errno == EINTR) // ignore interruption, because the debugger causes this
            continue;

         log.logErr(std::string("Unrecoverable epoll_wait error: ") + System::getErrString() );
         break;
      }

      // handle incoming connection attempts
      for(size_t i=0; i < (size_t)epollRes; i++)
      {
         struct epoll_event* currentEvent = &epollEvents[i];
         Pollable* currentPollable = (Pollable*)currentEvent->data.ptr;

         //log.log(Log_DEBUG, std::string("Incoming data on FD: ") +
         //   StringTk::intToStr(pollArray[i].fd) ); // debug in

         if(currentPollable == rdmaListenSock)
            onIncomingRDMAConnection(rdmaListenSock);
         else
         if(currentPollable == tcpListenSock)
            onIncomingStandardConnection(tcpListenSock);
         else
         if(currentPollable == sdpListenSock)
            onIncomingStandardConnection(sdpListenSock);
         else
         { // unknown connection => should never happen
            log.log(Log_WARNING, "Should never happen: Ignoring event for unknown connection. "
               "FD: " + StringTk::uintToStr(currentPollable->getFD() ) );
         }
      }

   }
}

套接字監聽處理(派發給流)

  • 把建立的套接字傳送給指定的StreamListener:
// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp

/**
 * Accept the incoming connection and add new socket to StreamListenerV2 queue.
 *
 * Note: This is for standard sockets like TCP and SDP.
 */
void ConnAcceptor::onIncomingStandardConnection(StandardSocket* sock)
{
   try
   {
      struct sockaddr_in peerAddr;
      socklen_t peerAddrLen = sizeof(peerAddr);

      StandardSocket* acceptedSock =
         (StandardSocket*)sock->accept( (struct sockaddr*)&peerAddr, &peerAddrLen);

      // (note: level Log_DEBUG to avoid spamming the log until we have log topics)
      log.log(Log_DEBUG, std::string("Accepted new connection from " +
         Socket::endpointAddrToString(&peerAddr.sin_addr, ntohs(peerAddr.sin_port) ) ) +
         std::string(" [SockFD: ") + StringTk::intToStr(acceptedSock->getFD() ) +
         std::string("]") );

      applySocketOptions(acceptedSock);

      // hand the socket over to a stream listener

      StreamListenerV2* listener = app->getStreamListenerByFD(acceptedSock->getFD() );
      StreamListenerV2::SockReturnPipeInfo returnInfo(
         StreamListenerV2::SockPipeReturn_NEWCONN, acceptedSock);

      listener->getSockReturnFD()->write(&returnInfo, sizeof(returnInfo) );

   }
   catch(SocketException& se)
   {
      log.logErr(std::string("Trying to continue after connection accept error: ") +
         se.what() );
   }
}

流處理的選擇

  • 選擇StreamListener時,是根據fd的數值取模運算得來:
// fhgfs_meta\source\app\App.h

class App : public AbstractApp
{
   public:

      /**
       * Get one of the available stream listeners based on the socket file descriptor number.
       * This is to load-balance the sockets over all available stream listeners and ensure that
       * sockets are not bouncing between different stream listeners.
       *
       * Note that IB connections eat two fd numbers, so 2 and multiples of 2 might not be a good
       * value for number of stream listeners.
       */
      virtual StreamListenerV2* getStreamListenerByFD(int fd)
      {
         return streamLisVec[fd % numStreamListeners];
      }

}

資料包流處理

流處理類StreamListenerV2

  • StreamListener的定義:
// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.h

class StreamListenerV2 : public PThread
{
   public:

      /**
       * This is what we will send over the socket return pipe
       */
      struct SockReturnPipeInfo
      {
         /**
          * Standard constructor for creating/sending a returnInfo.
          */
         SockReturnPipeInfo(SockPipeReturnType returnType, Socket* sock) :
            returnType(returnType), sock(sock) {}

         /**
          * For receiving only (no initialization of members).
          */
         SockReturnPipeInfo() {}

         SockPipeReturnType returnType;
         Socket* sock;
      };
}

流處理迴圈

  • StreamListener使用epoll同時處理新連線以及資料接收:
// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp

void StreamListenerV2::run()
{
   try
   {
      registerSignalHandler();

      listenLoop();

      log.log(Log_DEBUG, "Component stopped.");
   }
   catch(std::exception& e)
   {
      PThread::getCurrentThreadApp()->handleComponentException(e);
   }
}

void StreamListenerV2::listenLoop()
{
   const int epollTimeoutMS = useAggressivePoll ? 0 : 3000;

   struct epoll_event epollEvents[EPOLL_EVENTS_NUM];

   // (just to have these values on the stack...)
   const int epollFD = this->epollFD;
   FileDescriptor* sockReturnPipeReadEnd = this->sockReturnPipe->getReadFD();

   bool runRDMAConnIdleCheck = false; // true just means we call the method (not enforce the check)

   // wait for incoming events and handle them...

   while(!getSelfTerminate() )
   {
      //log.log(Log_DEBUG, std::string("Before poll(). pollArrayLen: ") +
      //   StringTk::uintToStr(pollArrayLen) );

      int epollRes = epoll_wait(epollFD, epollEvents, EPOLL_EVENTS_NUM, epollTimeoutMS);

      if(unlikely(epollRes < 0) )
      { // error occurred
         if(errno == EINTR) // ignore interruption, because the debugger causes this
            continue;

         log.logErr(std::string("Unrecoverable epoll_wait error: ") + System::getErrString() );
         break;
      }
      else
      if(unlikely(!epollRes || (rdmaCheckForceCounter++ > RDMA_CHECK_FORCE_POLLLOOPS) ) )
      { // epollRes==0 is nothing to worry about, just idle

         // note: we can't run idle check here directly because the check might modify the
         //    poll set, which will be accessed in the loop below
         runRDMAConnIdleCheck = true;
      }

      // handle incoming data & connection attempts
      for(size_t i=0; i < (size_t)epollRes; i++)
      {
         struct epoll_event* currentEvent = &epollEvents[i];
         Pollable* currentPollable = (Pollable*)currentEvent->data.ptr;

         if(currentPollable == sockReturnPipeReadEnd)
            onSockReturn();
         else
            onIncomingData( (Socket*)currentPollable);
      }

      if(unlikely(runRDMAConnIdleCheck) )
      { // note: whether check actually happens depends on elapsed time since last check
         runRDMAConnIdleCheck = false;
         rdmaConnIdleCheck();
      }

   }
}

新連線處理

  • 如果是新連線,則加入epoll的fd中:
// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp

/**
 * Receive pointer to returned socket through the sockReturnPipe and re-add it to the pollList.
 */
void StreamListenerV2::onSockReturn()
{
   SockReturnPipeInfo returnInfos[SOCKRETURN_SOCKS_NUM];

   // try to get multiple returnInfos at once (if available)

   ssize_t readRes = sockReturnPipe->getReadFD()->read(&returnInfos, sizeof(SockReturnPipeInfo) );

   // loop: walk over each info and handle the contained socket

   for(size_t i=0; ; i++)
   {
      SockReturnPipeInfo& currentReturnInfo = returnInfos[i];

      // make sure we have a complete SockReturnPipeInfo

      if(unlikely(readRes < (ssize_t)sizeof(SockReturnPipeInfo) ) )
      { // only got a partial SockReturnPipeInfo => recv the rest
         char* currentReturnInfoChar = (char*)&currentReturnInfo;

         sockReturnPipe->getReadFD()->readExact(
            &currentReturnInfoChar[readRes], sizeof(SockReturnPipeInfo)-readRes);

         readRes = sizeof(SockReturnPipeInfo);
      }

      // handle returnInfo depending on contained returnType...

      Socket* currentSock = currentReturnInfo.sock;
      SockPipeReturnType returnType = currentReturnInfo.returnType;

      switch(returnType)
      {
         case SockPipeReturn_MSGDONE_NOIMMEDIATE:
         { // most likely case: worker is done with a msg and now returns the sock to the epoll set

            struct epoll_event epollEvent;
            epollEvent.events = EPOLLIN | EPOLLONESHOT | EPOLLET;
            epollEvent.data.ptr = currentSock;

            int epollRes = epoll_ctl(epollFD, EPOLL_CTL_MOD, currentSock->getFD(), &epollEvent);

            if(likely(!epollRes) )
            { // sock was successfully re-armed in epoll set
               pollList.add(currentSock);

               break; // break out of switch
            }
            else
            if(errno != ENOENT)
            { // error
               log.logErr("Unable to re-arm sock in epoll set. "
                  "FD: " + StringTk::uintToStr(currentSock->getFD() ) + "; "
                  "SockTypeNum: " + StringTk::uintToStr(currentSock->getSockType() ) + "; "
                  "SysErr: " + System::getErrString() );
               log.log(Log_NOTICE, "Disconnecting: " + currentSock->getPeername() );

               delete(currentSock);

               break; // break out of switch
            }

            /* for ENOENT, we fall through to NEWCONN, because this socket appearently wasn't
               used with this stream listener yet, so we need to add it (instead of modify it) */

         } // might fall through here on ENOENT

         case SockPipeReturn_NEWCONN:
         { // new conn from ConnAcceptor (or wasn't used with this stream listener yet)

            // add new socket file descriptor to epoll set

            struct epoll_event epollEvent;
            epollEvent.events = EPOLLIN | EPOLLONESHOT | EPOLLET;
            epollEvent.data.ptr = currentSock;

            int epollRes = epoll_ctl(epollFD, EPOLL_CTL_ADD, currentSock->getFD(), &epollEvent);
            if(likely(!epollRes) )
            { // socket was successfully added to epoll set
               pollList.add(currentSock);
            }
            else
            { // adding to epoll set failed => unrecoverable error
               log.logErr("Unable to add sock to epoll set. "
                  "FD: " + StringTk::uintToStr(currentSock->getFD() ) + " "
                  "SockTypeNum: " + StringTk::uintToStr(currentSock->getSockType() ) + " "
                  "SysErr: " + System::getErrString() );
               log.log(Log_NOTICE, "Disconnecting: " + currentSock->getPeername() );

               delete(currentSock);
            }

         } break;

         case SockPipeReturn_MSGDONE_WITHIMMEDIATE:
         { // special case: worker detected that immediate data is available after msg processing
            // data immediately available => recv header and so on
            onIncomingData(currentSock);
         } break;

         default:
         { // should never happen: unknown/unhandled returnType
            log.logErr("Should never happen: "
               "Unknown socket return type: " + StringTk::uintToStr(returnType) );
         } break;

      } // end of switch(returnType)



      readRes -= sizeof(SockReturnPipeInfo);
      if(!readRes)
         break; // all received returnInfos have been processed

   } // end of "for each received SockReturnPipeInfo" loop

}

資料包處理(生成工作)

  • 生成Work(IncomingPreprocessedMsgWork),並放進佇列(MultiWorkQueue):
// fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp

/**
 * Receive msg header and add the socket to the work queue.
 */
void StreamListenerV2::onIncomingData(Socket* sock)
{
   // check whether this is just a false alarm from a RDMASocket
   if( (sock->getSockType() == NICADDRTYPE_RDMA) &&
      isFalseAlarm( (RDMASocket*)sock) )
   {
      return;
   }

   try
   {
      const int recvTimeoutMS = 5000;

      char msgHeaderBuf[NETMSG_HEADER_LENGTH];
      NetMessageHeader msgHeader;

      // receive & deserialize message header

      sock->recvExactT(msgHeaderBuf, NETMSG_HEADER_LENGTH, 0, recvTimeoutMS);

      NetMessage::deserializeHeader(msgHeaderBuf, NETMSG_HEADER_LENGTH, &msgHeader);

      /* (note on header verification: we leave header verification work to the worker threads to
         save CPU cycles in the stream listener and instead just take what we need to know here, no
         matter whether the header is valid or not.) */

      // create work and add it to queue

      //log.log(Log_DEBUG, "Creating new work for to the queue");

      IncomingPreprocessedMsgWork* work = new IncomingPreprocessedMsgWork(app, sock, &msgHeader);

      int sockFD = sock->getFD(); /* note: we store this here for delayed pollList removal, because
            worker thread might disconnect, so the sock gets deleted by the worker and thus "sock->"
            pointer becomes invalid */

      sock->setHasActivity(); // mark sock as active (for idle disconnect check)

      // (note: userID intToStr (not uint) because default userID (~0) looks better this way)
      LOG_DEBUG("StreamListenerV2::onIncomingData", Log_DEBUG,
         "Incoming message: " + NetMsgStrMapping().defineToStr(msgHeader.msgType) + "; "
         "from: " + sock->getPeername() + "; "
         "userID: " + StringTk::intToStr(msgHeader.msgUserID) +
         (msgHeader.msgTargetID
            ? "; targetID: " + StringTk::uintToStr(msgHeader.msgTargetID)
            : "") );

      if (sock->getIsDirect())
         getWorkQueue(msgHeader.msgTargetID)->addDirectWork(work, msgHeader.msgUserID);
      else
         getWorkQueue(msgHeader.msgTargetID)->addIndirectWork(work, msgHeader.msgUserID);

      /* notes on sock handling:
         *) no need to remove sock from epoll set, because we use edge-triggered mode with
            oneshot flag (which disables further events after first one has been reported).
         *) a sock that is closed by a worker is not a problem, because it will automatically be
            removed from the epoll set by the kernel.
         *) we just need to re-arm the epoll entry upon sock return. */

      pollList.removeByFD(sockFD);

      return;

   }
   catch(SocketTimeoutException& e)
   {
      log.log(Log_NOTICE, "Connection timed out: " + sock->getPeername() );
   }
   catch(SocketDisconnectException& e)
   {
      // (note: level Log_DEBUG here to avoid spamming the log until we have log topics)
      log.log(Log_DEBUG, std::string(e.what() ) );
   }
   catch(SocketException& e)
   {
      log.log(Log_NOTICE,
         "Connection error: " + sock->getPeername() + ": " + std::string(e.what() ) );
   }

   // socket exception occurred => cleanup

   pollList.removeByFD(sock->getFD() );

   IncomingPreprocessedMsgWork::invalidateConnection(sock); // also includes delete(sock)
}

工作處理

工人類(Worker)

// fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp

#define WORKER_BUFIN_SIZE     (1024*1024*4)
#define WORKER_BUFOUT_SIZE    WORKER_BUFIN_SIZE


class Worker : public PThread
{
   public:
      Worker(std::string workerID, MultiWorkQueue* workQueue, QueueWorkType workType)
         throw(ComponentInitException);
      
      virtual ~Worker()
      {
         SAFE_FREE(bufIn);
         SAFE_FREE(bufOut);
      }
      
      
   private:
      LogContext log;
      bool terminateWithFullQueue; // allow self-termination when queue not empty (see setter nodes)

      size_t bufInLen;
      char* bufIn;
      size_t bufOutLen;
      char* bufOut;

      MultiWorkQueue* workQueue;
      QueueWorkType workType;
      
      HighResolutionStats stats;

      
      virtual void run();
      
      void workLoopAnyWork();
      void workLoopDirectWork();
      
      void initBuffers();
      
      // inliners
      bool maySelfTerminateNow()
      {
         if(terminateWithFullQueue ||
            (!workQueue->getDirectWorkListSize() && !workQueue->getIndirectWorkListSize() ) )
            return true;
         
         return false;
      }
      
      
   public:
      // setters & getters

      /**
       * Note: Do not use this after the run method of this component has been called!
       */
      void setBufLens(size_t bufInLen, size_t bufOutLen)
      {
         this->bufInLen = bufInLen;
         this->bufOutLen = bufOutLen;
      }

      /**
       * WARNING: This will only work if there is only a single worker attached to a queue.
       * Otherwise the queue would need a getWorkAndDontWait() method that is used during the
       * termination phase of the worker, because the queue might become empty before the worker
       * calls waitForWork() after the maySelfTerminateNow check.
       */
      void disableTerminationWithFullQueue()
      {
         this->terminateWithFullQueue = false;
      }
};

工作類(Work)

// fhgfs_common\source\common\components\worker\Work.h

class Work
{
   public:
      Work()
      {
         HighResolutionStatsTk::resetStats(&stats);
      }

      virtual ~Work() {}

      Work(const Work&) = delete;
      Work(Work&&) = delete;
      Work& operator=(const Work&) = delete;
      Work& operator=(Work&&) = delete;

      virtual void process(char* bufIn, unsigned bufInLen, char* bufOut, unsigned bufOutLen) = 0;

   protected:
      HighResolutionStats stats;

   public:
      HighResolutionStats* getHighResolutionStats()
      {
         return &stats;
      }

#ifdef BEEGFS_DEBUG_PROFILING
      TimeFine* getAgeTime()
      {
         return &age;
      }

   private:
      TimeFine age;
#endif
};
// fhgfs_common\source\common\components\streamlistenerv2\IncomingPreprocessedMsgWork.h

class IncomingPreprocessedMsgWork : public Work
{
   public:
      /**
       * Note: Be aware that this class is only for stream connections that need to be returned
       * to a StreamListenerV2 after processing.
       *
       * @param msgHeader contents will be copied
       */
      IncomingPreprocessedMsgWork(AbstractApp* app, Socket* sock, NetMessageHeader* msgHeader)
      {
         this->app = app;
         this->sock = sock;
         this->msgHeader = *msgHeader;
      }
      
      virtual void process(char* bufIn, unsigned bufInLen, char* bufOut, unsigned bufOutLen);
      
      static void releaseSocket(AbstractApp* app, Socket** sock, NetMessage* msg);
      static void invalidateConnection(Socket* sock);
      static bool checkRDMASocketImmediateData(AbstractApp* app, Socket* sock);


   private:
      AbstractApp* app;
      Socket* sock;
      NetMessageHeader msgHeader;
};

工作迴圈

  • 從WorkQueens獲取Work並進行處理:
// fhgfs_common\source\common\components\worker\Worker.cpp

void Worker::workLoop(QueueWorkType workType)
{
   LOG(DEBUG, "Ready", as("TID", System::getTID()), workType);

   workQueue->incNumWorkers(); // add this worker to queue stats

   while(!getSelfTerminate() || !maySelfTerminateNow() )
   {
      Work* work = waitForWorkByType(stats, personalWorkQueue, workType);

#ifdef BEEGFS_DEBUG_PROFILING
      TimeFine workStartTime;
#endif

      HighResolutionStatsTk::resetStats(&stats); // prepare stats

      // process the work packet
      work->process(bufIn, bufInLen, bufOut, bufOutLen);

      // update stats
      stats.incVals.workRequests = 1;
      HighResolutionStatsTk::addHighResIncStats(*work->getHighResolutionStats(), stats);

#ifdef BEEGFS_DEBUG_PROFILING
      TimeFine workEndTime;
      const auto workElapsedMS = workEndTime.elapsedSinceMS(&workStartTime);
      const auto workLatencyMS = workEndTime.elapsedSinceMS(work->getAgeTime());

      if (workElapsedMS >= 10)
      {
         if (workLatencyMS >= 10)
            LOG_TOP(WORKQUEUES, DEBUG, "Work processed.",
                  as("Elapsed ms", workElapsedMS), as("Total latency (ms)", workLatencyMS));
         else
            LOG_TOP(WORKQUEUES, DEBUG, "Work processed.", as("Elapsed ms", workElapsedMS),
                  as("Total latency (us)", workEndTime.elapsedSinceMicro(work->getAgeTime())));
      }
      else
      {
         if (workLatencyMS >= 10)
         {
            LOG_TOP(WORKQUEUES, DEBUG, "Work processed.",
                  as("Elapsed us", workEndTime.elapsedSinceMicro(&workStartTime)),
                  as("Total latency (ms)", workEndTime.elapsedSinceMS(work->getAgeTime())));

         }
         else
         {
            LOG_TOP(WORKQUEUES, DEBUG, "Work processed.",
                  as("Elapsed us", workEndTime.elapsedSinceMicro(&workStartTime)),
                  as("Total latency (us)", workEndTime.elapsedSinceMicro(work->getAgeTime())));
         }
      }
#endif

      // cleanup
      delete(work);
   }
}

工作處理(訊息生成和處理)

  • 處理Work時,使用Work基類的processIncoming虛擬函式進行處理:
// fhgfs_common\source\common\components\streamlistenerv2\IncomingPreprocessedMsgWork.cpp

void IncomingPreprocessedMsgWork::process(char* bufIn, unsigned bufInLen,
   char* bufOut, unsigned bufOutLen)
{
   const char* logContextStr = "Work (process incoming msg)";
   
   const int recvTimeoutMS = 5000;
   
   unsigned numReceived = NETMSG_HEADER_LENGTH; // (header actually received by stream listener)
   NetMessage* msg = NULL;
   

   try
   {
      // attach stats to sock (stream listener already received the msg header)

      stats.incVals.netRecvBytes += NETMSG_HEADER_LENGTH;
      sock->setStats(&stats);

      
      // make sure msg length fits into our receive buffer

      unsigned msgLength = msgHeader.msgLength;
      unsigned msgPayloadLength = msgLength - numReceived;

      if(unlikely(msgPayloadLength > bufInLen) )
      { // message too big
         LogContext(logContextStr).log(Log_NOTICE,
            std::string("Received a message that is too large. Disconnecting: ") +
            sock->getPeername() );

         sock->unsetStats();
         invalidateConnection(sock);
         
         return;
      }
      
      // receive the message payload

      if(msgPayloadLength)
         sock->recvExactT(bufIn, msgPayloadLength, 0, recvTimeoutMS);

      // we got the complete message buffer => create msg object
      
      AbstractApp* app = PThread::getCurrentThreadApp();
      ICommonConfig* cfg = app->getCommonConfig();
      AbstractNetMessageFactory* netMessageFactory = app->getNetMessageFactory();

      msg = netMessageFactory->createFromPreprocessedBuf(&msgHeader, bufIn, msgPayloadLength);
      
      if(unlikely(msg->getMsgType() == NETMSGTYPE_Invalid) )
      { // message invalid
         LogContext(logContextStr).log(Log_NOTICE,
            std::string("Received an invalid message. Disconnecting: ") + sock->getPeername() );

         sock->unsetStats();
         invalidateConnection(sock);
         delete(msg);

         return;
      }

      // process the received msg

      bool processRes = false;

      if(likely(!cfg->getConnAuthHash() ||
         sock->getIsAuthenticated() ||
         (msg->getMsgType() == NETMSGTYPE_AuthenticateChannel) ) )
      { // auth disabled or channel is auth'ed or this is an auth msg => process
         NetMessage::ResponseContext rctx(NULL, sock, bufOut, bufOutLen, &stats);
         processRes = msg->processIncoming(rctx);
      }
      else
         LogContext(logContextStr).log(Log_NOTICE,
            std::string("Rejecting message from unauthenticated peer: ") + sock->getPeername() );

      // processing completed => cleanup

      bool needSockRelease = msg->getReleaseSockAfterProcessing();

      delete(msg);
      msg = NULL;

      if(!needSockRelease)
         return; // sock release was already done within msg->processIncoming() method

      if(unlikely(!processRes) )
      { // processIncoming encountered messaging error => invalidate connection
         LogContext(logContextStr).log(Log_NOTICE,
            std::string("Problem encountered during processing of a message. Disconnecting: ") +
            sock->getPeername() );

         invalidateConnection(sock);

         return;
      }
      
      releaseSocket(app, &sock, NULL);

      return;

   }
   catch(SocketTimeoutException& e)
   {
      LogContext(logContextStr).log(Log_NOTICE,
         std::string("Connection timed out: ") + sock->getPeername() );
   }
   catch(SocketDisconnectException& e)
   {
      // (note: level Log_DEBUG here to avoid spamming the log until we have log topics)
      LogContext(logContextStr).log(Log_DEBUG, std::string(e.what() ) );
   }
   catch(SocketException& e)
   {
      LogContext(logContextStr).log(Log_NOTICE,
         std::string("Connection error: ") + sock->getPeername() + std::string(": ") +
         std::string(e.what() ) );
   }
   
   // socket exception occurred => cleanup

   if(msg && msg->getReleaseSockAfterProcessing() )
   {
      sock->unsetStats();
      invalidateConnection(sock);
   }

   SAFE_DELETE(msg);
}

訊息工廠

訊息工廠類(NetMessageFactory)

  • StreamListener收到資料時使用訊息工廠類生成各種型別的訊息:
// fhgfs_meta\source\net\message\NetMessageFactory.h

class NetMessageFactory : public AbstractNetMessageFactory
{
   public:
      NetMessageFactory() {}

   protected:
      virtual NetMessage* createFromMsgType(unsigned short msgType);
} ;

訊息工廠初始化

// fhgfs_meta\source\app\App.cpp

/**
 * Init basic networking data structures.
 *
 * Note: no RDMA is detected here, because this needs to be done later
 */
void App::initBasicNetwork()
{
   // check if management host is defined
   if(!cfg->getSysMgmtdHost().length() )
      throw InvalidConfigException("Management host undefined");

   // prepare filter for outgoing packets/connections
   this->netFilter = new NetFilter(cfg->getConnNetFilterFile() );
   this->tcpOnlyFilter = new NetFilter(cfg->getConnTcpOnlyFilterFile() );

   // prepare filter for interfaces
   StringList allowedInterfaces;
   std::string interfacesList = cfg->getConnInterfacesList();
   if(!interfacesList.empty() )
   {
      log->log(Log_DEBUG, "Allowed interfaces: " + interfacesList);
      StringTk::explodeEx(interfacesList, ',', true, &allowedInterfaces);
   }

   // discover local NICs and filter them
   NetworkInterfaceCard::findAllInterfaces(allowedInterfaces, cfg->getConnUseSDP(), localNicList);

   if(localNicList.empty() )
      throw InvalidConfigException("Couldn't find any usable NIC");

   localNicList.sort(&NetworkInterfaceCard::nicAddrPreferenceComp);

   // prepare factory for incoming messages
   this->netMessageFactory = new NetMessageFactory();
}

生成訊息

  • 訊息例項的生成均根據msgType來確定:
// fhgfs_meta\source\net\message\NetMessageFactory.cpp

/**
 * @return NetMessage that must be deleted by the caller
 * (msg->msgType is NETMSGTYPE_Invalid on error)
 */
NetMessage* NetMessageFactory::createFromMsgType(unsigned short msgType)
{
   NetMessage* msg;

   switch(msgType)
   {
      // The following lines are grouped by "type of the message" and ordered alphabetically inside
      // the groups. There should always be one message per line to keep a clear layout (although
      // this might lead to lines that are longer than usual)

      // control messages
      case NETMSGTYPE_Ack: { msg = new AckMsgEx(); } break;
      case NETMSGTYPE_AuthenticateChannel: { msg = new AuthenticateChannelMsgEx(); } break;
      case NETMSGTYPE_GenericResponse: { msg = new GenericResponseMsg(); } break;
      case NETMSGTYPE_SetChannelDirect: { msg = new SetChannelDirectMsgEx(); } break;
      case NETMSGTYPE_PeerInfo: { msg = new PeerInfoMsgEx(); } break;

      // nodes messages
      case NETMSGTYPE_ChangeTargetConsistencyStatesResp: { msg = new ChangeTargetConsistencyStatesRespMsg(); } break;
      case NETMSGTYPE_GenericDebug: { msg = new GenericDebugMsgEx(); } break;
      case NETMSGTYPE_GetClientStats: { msg = new GetClientStatsMsgEx(); } break;
      case NETMSGTYPE_GetMirrorBuddyGroupsResp: { msg = new GetMirrorBuddyGroupsRespMsg(); } break;
      case NETMSGTYPE_GetNodeCapacityPools: { msg = new GetNodeCapacityPoolsMsgEx(); } break;
      case NETMSGTYPE_GetNodeCapacityPoolsResp: { msg = new GetNodeCapacityPoolsRespMsg(); } break;
      case NETMSGTYPE_GetNodes: { msg = new GetNodesMsgEx(); } break;
      case NETMSGTYPE_GetNodesResp: { msg = new GetNodesRespMsg(); } break;
      case NETMSGTYPE_GetStatesAndBuddyGroupsResp: { msg = new GetStatesAndBuddyGroupsRespMsg(); } break;
      case NETMSGTYPE_GetTargetMappings: { msg = new GetTargetMappingsMsgEx(); } break;
      case NETMSGTYPE_GetTargetMappingsResp: { msg = new GetTargetMappingsRespMsg(); } break;
      case NETMSGTYPE_GetTargetStatesResp: { msg = new GetTargetStatesRespMsg(); } break;
      case NETMSGTYPE_HeartbeatRequest: { msg = new HeartbeatRequestMsgEx(); } break;
      case NETMSGTYPE_Heartbeat: { msg = new HeartbeatMsgEx(); } break;
      case NETMSGTYPE_MapTargets: { msg = new MapTargetsMsgEx(); } break;
      case NETMSGTYPE_PublishCapacities: { msg = new PublishCapacitiesMsgEx(); } break;
      case NETMSGTYPE_RegisterNodeResp: { msg = new RegisterNodeRespMsg(); } break;
      case NETMSGTYPE_RemoveNode: { msg = new RemoveNodeMsgEx(); } break;
      case NETMSGTYPE_RemoveNodeResp: { msg = new RemoveNodeRespMsg(); } break;
      case NETMSGTYPE_RefreshCapacityPools: { msg = new RefreshCapacityPoolsMsgEx(); } break;
      case NETMSGTYPE_RefreshTargetStates: { msg = new RefreshTargetStatesMsgEx(); } break;
      case NETMSGTYPE_SetMirrorBuddyGroup: { msg = new SetMirrorBuddyGroupMsgEx(); } break;
      case NETMSGTYPE_SetRootNodeIDResp: { msg = new SetRootNodeIDRespMsg(); } break;
      case NETMSGTYPE_SetTargetConsistencyStates: { msg = new SetTargetConsistencyStatesMsgEx(); } break;
      case NETMSGTYPE_SetTargetConsistencyStatesResp: { msg = new SetTargetConsistencyStatesRespMsg(); } break;

      // storage messages
      case NETMSGTYPE_FindEntryname: { msg = new FindEntrynameMsgEx(); } break;
      case NETMSGTYPE_FindLinkOwner: { msg = new FindLinkOwnerMsgEx(); } break;
      case NETMSGTYPE_FindOwner: { msg = new FindOwnerMsgEx(); } break;
      case NETMSGTYPE_FindOwnerResp: { msg = new FindOwnerRespMsg(); } break;
      case NETMSGTYPE_GetChunkFileAttribsResp: { msg = new GetChunkFileAttribsRespMsg(); } break;
      case NETMSGTYPE_GetStorageTargetInfo: { msg = new GetStorageTargetInfoMsgEx(); } break;
      case NETMSGTYPE_GetEntryInfo: { msg = new GetEntryInfoMsgEx(); } break;
      case NETMSGTYPE_GetEntryInfoResp: { msg = new GetEntryInfoRespMsg(); } break;
      case NETMSGTYPE_GetHighResStats: { msg = new GetHighResStatsMsgEx(); } break;
      case NETMSGTYPE_GetMetaResyncStats: { msg = new GetMetaResyncStatsMsgEx(); } break;
      case NETMSGTYPE_RequestExceededQuotaResp: {msg = new RequestExceededQuotaRespMsg(); } break;
      case NETMSGTYPE_SetExceededQuota: {msg = new SetExceededQuotaMsgEx(); } break;
      case NETMSGTYPE_StorageResyncStarted: { msg = new StorageResyncStartedMsgEx(); } break;
      case NETMSGTYPE_StorageResyncStartedResp: { msg = new StorageResyncStartedRespMsg(); } break;
      case NETMSGTYPE_GetXAttr: { msg = new GetXAttrMsgEx(); } break;
      case NETMSGTYPE_Hardlink: { msg = new HardlinkMsgEx(); } break;
      case NETMSGTYPE_HardlinkResp: { msg = new HardlinkRespMsg(); } break;
      case NETMSGTYPE_ListDirFromOffset: { msg = new ListDirFromOffsetMsgEx(); } break;
      case NETMSGTYPE_ListDirFromOffsetResp: { msg = new ListDirFromOffsetRespMsg(); } break;
      case NETMSGTYPE_ListXAttr: { msg = new ListXAttrMsgEx(); } break;
      case NETMSGTYPE_LookupIntent: { msg = new LookupIntentMsgEx(); } break;
      case NETMSGTYPE_LookupIntentResp: { msg = new LookupIntentRespMsg(); } break;
      case NETMSGTYPE_MkDir: { msg = new MkDirMsgEx(); } break;
      case NETMSGTYPE_MkDirResp: { msg = new MkDirRespMsg(); } break;
      case NETMSGTYPE_MkFile: { msg = new MkFileMsgEx(); } break;
      case NETMSGTYPE_MkFileResp: { msg = new MkFileRespMsg(); } break;
      case NETMSGTYPE_MkFileWithPattern: { msg = new MkFileWithPatternMsgEx(); } break;
      case NETMSGTYPE_MkFileWithPatternResp: { msg = new MkFileWithPatternRespMsg(); } break;
      case NETMSGTYPE_MkLocalDir: { msg = new MkLocalDirMsgEx(); } break;
      case NETMSGTYPE_MkLocalDirResp: { msg = new MkLocalDirRespMsg(); } break;
      case NETMSGTYPE_MkLocalFileResp: { msg = new MkLocalFileRespMsg(); } break;
      case NETMSGTYPE_MovingDirInsert: { msg = new MovingDirInsertMsgEx(); } break;
      case NETMSGTYPE_MovingDirInsertResp: { msg = new MovingDirInsertRespMsg(); } break;
      case NETMSGTYPE_MovingFileInsert: { msg = new MovingFileInsertMsgEx(); } break;
      case NETMSGTYPE_MovingFileInsertResp: { msg = new MovingFileInsertRespMsg(); } break;
      case NETMSGTYPE_RefreshEntryInfo: { msg = new RefreshEntryInfoMsgEx(); } break;
      case NETMSGTYPE_RefreshEntryInfoResp: { msg = new RefreshEntryInfoRespMsg(); } break;
      case NETMSGTYPE_ResyncRawInodes: { msg = new ResyncRawInodesMsgEx(); } break;
      case NETMSGTYPE_ResyncRawInodesResp: { msg = new ResyncRawInodesRespMsg(); } break;
      case NETMSGTYPE_ResyncSessionStore: { msg = new ResyncSessionStoreMsgEx(); } break;
      case NETMSGTYPE_ResyncSessionStoreResp: { msg = new ResyncSessionStoreRespMsg(); } break;
      case NETMSGTYPE_RemoveXAttr: { msg = new RemoveXAttrMsgEx(); } break;
      case NETMSGTYPE_RemoveXAttrResp: { msg = new RemoveXAttrRespMsg(); } break;
      case NETMSGTYPE_Rename: { msg = new RenameV2MsgEx(); } break;
      case NETMSGTYPE_RenameResp: { msg = new RenameRespMsg(); } break;
      case NETMSGTYPE_RmChunkPathsResp: { msg = new RmChunkPathsRespMsg(); } break;
      case NETMSGTYPE_RmDirEntry: { msg = new RmDirEntryMsgEx(); } break;
      case NETMSGTYPE_RmDir: { msg = new RmDirMsgEx(); } break;
      case NETMSGTYPE_RmDirResp: { msg = new RmDirRespMsg(); } break;
      case NETMSGTYPE_RmLocalDir: { msg = new RmLocalDirMsgEx(); } break;
      case NETMSGTYPE_RmLocalDirResp: { msg = new RmLocalDirRespMsg(); } break;
      case NETMSGTYPE_SetAttr: { msg = new SetAttrMsgEx(); } break;
      case NETMSGTYPE_SetAttrResp: { msg = new SetAttrRespMsg(); } break;
      case NETMSGTYPE_SetDirPattern: { msg = new SetDirPatternMsgEx(); } break;
      case NETMSGTYPE_SetDirPatternResp: { msg = new SetDirPatternRespMsg(); } break;
      case NETMSGTYPE_SetLocalAttrResp: { msg = new SetLocalAttrRespMsg(); } break;
      case NETMSGTYPE_SetMetadataMirroring: { msg = new SetMetadataMirroringMsgEx(); } break;
      case NETMSGTYPE_SetStorageTargetInfoResp: { msg = new SetStorageTargetInfoRespMsg(); } break;
      case NETMSGTYPE_SetXAttr: { msg = new SetXAttrMsgEx(); } break;
      case NETMSGTYPE_SetXAttrResp: { msg = new SetXAttrRespMsg(); } break;
      case NETMSGTYPE_Stat: { msg = new StatMsgEx(); } break;
      case NETMSGTYPE_StatResp: { msg = new StatRespMsg(); } break;
      case NETMSGTYPE_StatStoragePath: { msg = new StatStoragePathMsgEx(); } break;
      case NETMSGTYPE_StatStoragePathResp: { msg = new StatStoragePathRespMsg(); } break;
      case NETMSGTYPE_TruncFile: { msg = new TruncFileMsgEx(); } break;
      case NETMSGTYPE_TruncFileResp: { msg = new TruncFileRespMsg(); } break;
      case NETMSGTYPE_TruncLocalFileResp: { msg = new TruncLocalFileRespMsg(); } break;
      case NETMSGTYPE_UnlinkFile: { msg = new UnlinkFileMsgEx(); } break;
      case NETMSGTYPE_UnlinkFileResp: { msg = new UnlinkFileRespMsg(); } break;
      case NETMSGTYPE_UnlinkLocalFileResp: { msg = new UnlinkLocalFileRespMsg(); } break;
      case NETMSGTYPE_UpdateBacklinkResp: { msg = new UpdateBacklinkRespMsg(); } break;
      case NETMSGTYPE_UpdateDirParent: { msg = new UpdateDirParentMsgEx(); } break;
      case NETMSGTYPE_UpdateDirParentResp: { msg = new UpdateDirParentRespMsg(); } break;

      // session messages
      case NETMSGTYPE_BumpFileVersion: { msg = new BumpFileVersionMsgEx(); } break;
      case NETMSGTYPE_BumpFileVersionResp: { msg = new BumpFileVersionRespMsg(); } break;
      case NETMSGTYPE_OpenFile: { msg = new OpenFileMsgEx(); } break;
      case NETMSGTYPE_OpenFileResp: { msg = new OpenFileRespMsg(); } break;
      case NETMSGTYPE_CloseFile: { msg = new CloseFileMsgEx(); } break;
      case NETMSGTYPE_CloseFileResp: { msg = new CloseFileRespMsg(); } break;
      case NETMSGTYPE_CloseChunkFileResp: { msg = new CloseChunkFileRespMsg(); } break;
      case NETMSGTYPE_WriteLocalFileResp: { msg = new WriteLocalFileRespMsg(); } break;
      case NETMSGTYPE_FSyncLocalFileResp: { msg = new FSyncLocalFileRespMsg(); } break;
      case NETMSGTYPE_FLockAppend: { msg = new FLockAppendMsgEx(); } break;
      case NETMSGTYPE_FLockEntry: { msg = new FLockEntryMsgEx(); } break;
      case NETMSGTYPE_FLockEntryResp: { msg = new FLockEntryRespMsg(); } break;
      case NETMSGTYPE_FLockRange: { msg = new FLockRangeMsgEx(); } break;
      case NETMSGTYPE_FLockRangeResp: { msg = new FLockRangeRespMsg(); } break;
      case NETMSGTYPE_GetFileVersion: { msg = new GetFileVersionMsgEx(); } break;
      case NETMSGTYPE_AckNotify: { msg = new AckNotifiyMsgEx(); } break;
      case NETMSGTYPE_AckNotifyResp: { msg = new AckNotifiyRespMsg(); } break;

      //admon messages
      case NETMSGTYPE_RequestMetaData: { msg = new RequestMetaDataMsgEx(); } break;
      case NETMSGTYPE_GetNodeInfo: { msg = new GetNodeInfoMsgEx(); } break;

      // fsck messages
      case NETMSGTYPE_RetrieveDirEntries: { msg = new RetrieveDirEntriesMsgEx(); } break;
      case NETMSGTYPE_RetrieveInodes: { msg = new RetrieveInodesMsgEx(); } break;
      case NETMSGTYPE_RetrieveFsIDs: { msg = new RetrieveFsIDsMsgEx(); } break;
      case NETMSGTYPE_DeleteDirEntries: { msg = new DeleteDirEntriesMsgEx(); } break;
      case NETMSGTYPE_CreateDefDirInodes: { msg = new CreateDefDirInodesMsgEx(); } break;
      case NETMSGTYPE_FixInodeOwners: { msg = new FixInodeOwnersMsgEx(); } break;
      case NETMSGTYPE_FixInodeOwnersInDentry: { msg = new FixInodeOwnersInDentryMsgEx(); } break;
      case NETMSGTYPE_LinkToLostAndFound: { msg = new LinkToLostAndFoundMsgEx(); } break;
      case NETMSGTYPE_CreateEmptyContDirs: { msg = new CreateEmptyContDirsMsgEx(); } break;
      case NETMSGTYPE_UpdateFileAttribs: { msg = new UpdateFileAttribsMsgEx(); } break;
      case NETMSGTYPE_UpdateDirAttribs: { msg = new UpdateDirAttribsMsgEx(); } break;
      case NETMSGTYPE_RemoveInodes: { msg = new RemoveInodesMsgEx(); } break;
      case NETMSGTYPE_RecreateFsIDs: { msg = new RecreateFsIDsMsgEx(); } break;
      case NETMSGTYPE_RecreateDentries: { msg = new RecreateDentriesMsgEx(); } break;
      case NETMSGTYPE_FsckSetEventLogging: { msg = new FsckSetEventLoggingMsgEx(); } break;
      case NETMSGTYPE_AdjustChunkPermissions: { msg = new AdjustChunkPermissionsMsgEx(); } break;

      default:
      {
         msg = new SimpleMsg(NETMSGTYPE_Invalid);
      } break;
   }

   re