1. 程式人生 > 其它 >dremio arrow flight 協議server實現——DremioFlightProducer程式碼簡單介紹

dremio arrow flight 協議server實現——DremioFlightProducer程式碼簡單介紹

DremioFlightProducer 包含了dremio 關於arrow flight 實現的核心部分

FlightProducer 介面定義

對於producer 的實現主要是實現 FlightProducer,包含的方法如下

方法代表的意義

dremio 對於FlightProducer的實現處理

因為dremio 屬於一個查詢操作(但是對於特殊儲存也支援create table 操作,nas,hdfs,s3),目前對於部分flight 協議方法會直接提示未實現
未實現包含的方法 (注意這個並不是全部都是這樣的,部分支援flight 協議的就需要實現其他的,比如CoordinatorFlightProducer的),對於直接
dremio 的實現很簡單location 直接就是訪問的機器,所以實現也相對簡單,對於CoordinatorFlightProducer 需要實現的就比較多了,具體可以參考
實現

@Override
public Runnable acceptPut(CallContext callContext, FlightStream flightStream, StreamListener<PutResult> streamListener) {
  throw CallStatus.UNIMPLEMENTED.withDescription("acceptPut is unimplemented").toRuntimeException();
}
@Override
public void doAction(CallContext callContext, Action action, StreamListener<Result> streamListener) {
  throw CallStatus.UNIMPLEMENTED.withDescription("doAction is unimplemented").toRuntimeException();
}
@Override
public void listActions(CallContext callContext, StreamListener<ActionType> streamListener) {
  throw CallStatus.UNIMPLEMENTED.withDescription("listActions is unimplemented").toRuntimeException();
}
@Override
public void listFlights(CallContext callContext, Criteria criteria, StreamListener<FlightInfo> streamListener) {
  throw CallStatus.UNIMPLEMENTED.withDescription("listFlights is unimplemented").toRuntimeException();
}

所以dremio 主要實現了4個方法,但是預設doExchange(暫時不支援)以及getSchema在介面定義中使用了default ,所以
核心需要實現是兩個了,getStream以及getFlightInfo ,從api 定義上主要是處理部分data stream 資訊
以上兩個方法都依賴了session處理,核心是賬戶資訊,方便許可權認證以及獲取其他配置資訊,比如引擎相關的,以及路由,具體參考
sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/UserSession.java,上邊兩個方法都依賴dremio 自己包裝的FlightWorkManager
getStream 以及getFlightInfo程式碼處理

@Override
public void getStream(CallContext callContext, Ticket ticket, ServerStreamListener serverStreamListener) {
  try {
    final CallHeaders headers = retrieveHeadersFromCallContext(callContext);
    final UserSession session = sessionsManager.getUserSession(callContext.peerIdentity(), headers);
    final TicketContent.PreparedStatementTicket preparedStatementTicket = TicketContent.PreparedStatementTicket.parseFrom(ticket.getBytes());
    // 此方法基於userworker 包裝了一個具體的執行,並基於回撥獲取資料
    flightWorkManager.runPreparedStatement(preparedStatementTicket, serverStreamListener, allocator, session);
  } catch (InvalidProtocolBufferException ex) {
    final RuntimeException error = CallStatus.INVALID_ARGUMENT.withCause(ex).withDescription("Invalid ticket used in getStream").toRuntimeException();
    serverStreamListener.error(error);
    throw error;
  }
}
@Override
public FlightInfo getFlightInfo(CallContext callContext, FlightDescriptor flightDescriptor) {
  final CallHeaders headers = retrieveHeadersFromCallContext(callContext);
  final UserSession session = sessionsManager.getUserSession(callContext.peerIdentity(), headers);
  final FlightPreparedStatement flightPreparedStatement = flightWorkManager
    .createPreparedStatement(flightDescriptor, callContext::isCancelled, session);
  // 獲取Flight 資訊也是userworker的rpc 呼叫,只是處理比較快,在包裝的使用時候了一個阻塞處理(while 迴圈,基於timeout 處理返回),保證可以獲取需要的資料
  return flightPreparedStatement.getFlightInfo(location);
}

FlightPreparedStatement 對於阻塞資料獲取的處理

public FlightInfo getFlightInfo(Location location) {
     // 
    final UserProtos.CreatePreparedStatementArrowResp createPreparedStatementResp = responseHandler.get();
    final Schema schema = buildSchema(createPreparedStatementResp.getPreparedStatement().getArrowSchema());
    final PreparedStatementTicket preparedStatementTicketContent = PreparedStatementTicket.newBuilder()
      .setQuery(query)
      .setHandle(createPreparedStatementResp.getPreparedStatement().getServerHandle())
      .build();
    final Ticket ticket = new Ticket(preparedStatementTicketContent.toByteArray());
    final FlightEndpoint flightEndpoint = new FlightEndpoint(ticket, location);
    return new FlightInfo(schema, flightDescriptor, ImmutableList.of(flightEndpoint), -1, -1);
  }

FlightWorkManager 依賴UserWorker以及OptionManager UserWorker 主要是處理任務提交的(以後會有相關實現介紹)OptionManager
主要是關於flight 配置相關的,FlightWorkManager 主要包含兩個方法

  public FlightPreparedStatement createPreparedStatement(FlightDescriptor flightDescriptor,
                                                         Supplier<Boolean> isRequestCancelled, UserSession userSession)
   public void runPreparedStatement(TicketContent.PreparedStatementTicket ticket, FlightProducer.ServerStreamListener listener,
                                   BufferAllocator allocator, UserSession userSession)

runPreparedStatement 方法是直接執行,使用了回撥機制進行結果處理,主要是RunQueryResponseHandler 實現了UserResponseHandler
對於提交的job 我們可以通過UserResponseHandler 回撥處理結果資料

public void runPreparedStatement(TicketContent.PreparedStatementTicket ticket, FlightProducer.ServerStreamListener listener,
                                   BufferAllocator allocator, UserSession userSession) {
    final UserBitShared.ExternalId runExternalId = ExternalIdHelper.generateExternalId();
    final UserRequest userRequest =
      new UserRequest(UserProtos.RpcType.RUN_QUERY,
        UserProtos.RunQuery.newBuilder()
          .setType(UserBitShared.QueryType.PREPARED_STATEMENT)
          .setPriority(UserProtos.QueryPriority.newBuilder()
            .setWorkloadType(UserBitShared.WorkloadType.FLIGHT)
            .setWorkloadClass(UserBitShared.WorkloadClass.GENERAL))
          .setSource(UserProtos.SubmissionSource.FLIGHT)
          .setPreparedStatementHandle(ticket.getHandle())
          .build());
     // listener 包裝一個responseHandler 
    final UserResponseHandler responseHandler = runQueryResponseHandlerFactory.getHandler(runExternalId, userSession,
      workerProvider, optionManagerProvider, listener, allocator);
    workerProvider.get().submitWork(runExternalId, userSession, responseHandler, userRequest, TerminationListenerRegistry.NOOP);
  }

createPreparedStatement 實際上也是一個任務的排程執行,只是包裝為一個FlightPreparedStatement

public FlightPreparedStatement createPreparedStatement(FlightDescriptor flightDescriptor,
                                                       Supplier<Boolean> isRequestCancelled, UserSession userSession) {
  final String query = getQuery(flightDescriptor);
  final UserProtos.CreatePreparedStatementArrowReq createPreparedStatementReq =
    UserProtos.CreatePreparedStatementArrowReq.newBuilder()
      .setSqlQuery(query)
      .build();
  final UserBitShared.ExternalId prepareExternalId = ExternalIdHelper.generateExternalId();
  final UserRequest userRequest =
    new UserRequest(UserProtos.RpcType.CREATE_PREPARED_STATEMENT_ARROW, createPreparedStatementReq);
  final CreatePreparedStatementResponseHandler createPreparedStatementResponseHandler =
    new CreatePreparedStatementResponseHandler(prepareExternalId, userSession, workerProvider, isRequestCancelled);
  workerProvider.get().submitWork(prepareExternalId, userSession, createPreparedStatementResponseHandler,
    userRequest, TerminationListenerRegistry.NOOP);
  return new FlightPreparedStatement(flightDescriptor, query, createPreparedStatementResponseHandler);
}

說明

DremioFlightProducer 在dremio 實現flight service 協議中還是一個比較重要的東西,程式碼實際上並不是很多,dremio 對於flight 協議實現,實際上不少
下圖為當前實現了flight的類

參考資料

https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/
https://github.com/apache/arrow/blob/master/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/example/FlightSqlClientDemoApp.java
https://arrow.apache.org/docs/java/reference/index.html
https://github.com/apache/arrow/blob/master/format/Flight.proto