1. 程式人生 > >Thrift筆記(五)--Thrift server源碼分析

Thrift筆記(五)--Thrift server源碼分析

get proc rim ESS cep check eth new sset

從(四)server代碼跟進

public static void simple(MultiplicationService.Processor processor) {
        try {
            TServerTransport serverTransport = new TServerSocket(9090);
            TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));

            System.out.println("Starting the simple server...");
            server.serve();
        } 
catch (Exception e) { e.printStackTrace(); } }

跟進server.serve()

public void serve() {
    try {
      serverTransport_.listen();
    } catch (TTransportException ttx) {
      LOGGER.error("Error occurred during listening.", ttx);
      return;
    }

    // Run the preServe event
if (eventHandler_ != null) { eventHandler_.preServe(); } setServing(true); while (!stopped_) { TTransport client = null; TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null
; TProtocol outputProtocol = null; ServerContext connectionContext = null; try { client = serverTransport_.accept(); if (client != null) { processor = processorFactory_.getProcessor(client); inputTransport = inputTransportFactory_.getTransport(client); outputTransport = outputTransportFactory_.getTransport(client); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); if (eventHandler_ != null) { connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol); } while (true) { if (eventHandler_ != null) { eventHandler_.processContext(connectionContext, inputTransport, outputTransport); } if(!processor.process(inputProtocol, outputProtocol)) { break; } } } } catch (TTransportException ttx) { // Client died, just move on } catch (TException tx) { if (!stopped_) { LOGGER.error("Thrift error occurred during processing of message.", tx); } } catch (Exception x) { if (!stopped_) { LOGGER.error("Error occurred during processing of message.", x); } } if (eventHandler_ != null) { eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol); } if (inputTransport != null) { inputTransport.close(); } if (outputTransport != null) { outputTransport.close(); } } setServing(false); }

跟進accept方法

protected TSocket acceptImpl() throws TTransportException {
    if (serverSocket_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
    }
    try {
      Socket result = serverSocket_.accept();
      TSocket result2 = new TSocket(result);
      result2.setTimeout(clientTimeout_);
      return result2;
    } catch (IOException iox) {
      throw new TTransportException(iox);
    }
  }

由於(四)使用的是阻塞IO, 代碼也可以看到阻塞直到有客戶端連接

跟進process()方法

public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();
    ProcessFunction fn = processMap.get(msg.name);
    if (fn == null) {
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: ‘"+msg.name+"‘");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
      return true;
    }
    fn.process(msg.seqid, in, out, iface);
    return true;
  }

整體流程,readMessageBegin讀客戶端請求方法。請求方法在服務端沒有註冊,返回異常給客戶端。如果有方法,對應方法處理。跟進process方法

public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
    T args = getEmptyArgsInstance();
    try {
      args.read(iprot);
    } catch (TProtocolException e) {
      iprot.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
      oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
      x.write(oprot);
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
      return;
    }
    iprot.readMessageEnd();
    TSerializable result = null;
    byte msgType = TMessageType.REPLY;

    try {
      result = getResult(iface, args);
    } catch (TTransportException ex) {
      LOGGER.error("Transport error while processing " + getMethodName(), ex);
      throw ex;
    } catch (TApplicationException ex) {
      LOGGER.error("Internal application error processing " + getMethodName(), ex);
      result = ex;
      msgType = TMessageType.EXCEPTION;
    } catch (Exception ex) {
      LOGGER.error("Internal error processing " + getMethodName(), ex);
      if(!isOneway()) {
        result = new TApplicationException(TApplicationException.INTERNAL_ERROR,
            "Internal error processing " + getMethodName());
        msgType = TMessageType.EXCEPTION;
      }
    }

    if(!isOneway()) {
      oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));
      result.write(oprot);
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
    }
  }

跟進讀取參數方法

public void read(org.apache.thrift.protocol.TProtocol iprot, multiply_args struct) throws org.apache.thrift.TException {
        org.apache.thrift.protocol.TField schemeField;
        iprot.readStructBegin();
        while (true)
        {
          schemeField = iprot.readFieldBegin();
          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
            break;
          }
          switch (schemeField.id) {
            case 1: // N1
              if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
                struct.n1 = iprot.readI32();
                struct.setN1IsSet(true);
              } else { 
                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
              }
              break;
            case 2: // N2
              if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
                struct.n2 = iprot.readI32();
                struct.setN2IsSet(true);
              } else { 
                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
              }
              break;
            default:
              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
          }
          iprot.readFieldEnd();
        }
        iprot.readStructEnd();

        // check for required fields of primitive type, which can‘t be checked in the validate method
        struct.validate();
      }

服務端跟進處理

result = getResult(iface, args);

繼續跟進

public multiply_result getResult(I iface, multiply_args args) throws org.apache.thrift.TException {
        multiply_result result = new multiply_result();
        result.success = iface.multiply(args.n1, args.n2);
        result.setSuccessIsSet(true);
        return result;
      }

這裏拿到結果, 最後寫回給客戶端

if(!isOneway()) {
      oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid));
      result.write(oprot);
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
    }

這是最簡單的一種方式,使用阻塞io, 二進制協議序列化。還有分阻塞, 壓縮協議等。

Thrift筆記(五)--Thrift server源碼分析