dremio arrow flight 協議server實現——DremioFlightProducer程式碼簡單介紹
DremioFlightProducer 包含了dremio 關於arrow flight 實現的核心部分
FlightProducer 介面定義
對於producer 的實現主要是實現 FlightProducer,包含的方法如下
方法代表的意義
dremio 對於FlightProducer的實現處理
因為dremio 屬於一個查詢操作(但是對於特殊儲存也支援create table 操作,nas,hdfs,s3),目前對於部分flight 協議方法會直接提示未實現
未實現包含的方法 (注意這個並不是全部都是這樣的,部分支援flight 協議的就需要實現其他的,比如CoordinatorFlightProducer的),對於直接
dremio 的實現很簡單location 直接就是訪問的機器,所以實現也相對簡單,對於CoordinatorFlightProducer 需要實現的就比較多了,具體可以參考
實現
@Override
public Runnable acceptPut(CallContext callContext, FlightStream flightStream, StreamListener<PutResult> streamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("acceptPut is unimplemented").toRuntimeException();
}
@Override
public void doAction(CallContext callContext, Action action, StreamListener<Result> streamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("doAction is unimplemented").toRuntimeException();
}
@Override
public void listActions(CallContext callContext, StreamListener<ActionType> streamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("listActions is unimplemented").toRuntimeException();
}
@Override
public void listFlights(CallContext callContext, Criteria criteria, StreamListener<FlightInfo> streamListener) {
throw CallStatus.UNIMPLEMENTED.withDescription("listFlights is unimplemented").toRuntimeException();
}
所以dremio 主要實現了4個方法,但是預設doExchange(暫時不支援)以及getSchema在介面定義中使用了default ,所以
核心需要實現是兩個了,getStream以及getFlightInfo ,從api 定義上主要是處理部分data stream 資訊
以上兩個方法都依賴了session處理,核心是賬戶資訊,方便許可權認證以及獲取其他配置資訊,比如引擎相關的,以及路由,具體參考
sabot/kernel/src/main/java/com/dremio/sabot/rpc/user/UserSession.java,上邊兩個方法都依賴dremio 自己包裝的FlightWorkManager
getStream 以及getFlightInfo程式碼處理
@Override
public void getStream(CallContext callContext, Ticket ticket, ServerStreamListener serverStreamListener) {
try {
final CallHeaders headers = retrieveHeadersFromCallContext(callContext);
final UserSession session = sessionsManager.getUserSession(callContext.peerIdentity(), headers);
final TicketContent.PreparedStatementTicket preparedStatementTicket = TicketContent.PreparedStatementTicket.parseFrom(ticket.getBytes());
// 此方法基於userworker 包裝了一個具體的執行,並基於回撥獲取資料
flightWorkManager.runPreparedStatement(preparedStatementTicket, serverStreamListener, allocator, session);
} catch (InvalidProtocolBufferException ex) {
final RuntimeException error = CallStatus.INVALID_ARGUMENT.withCause(ex).withDescription("Invalid ticket used in getStream").toRuntimeException();
serverStreamListener.error(error);
throw error;
}
}
@Override
public FlightInfo getFlightInfo(CallContext callContext, FlightDescriptor flightDescriptor) {
final CallHeaders headers = retrieveHeadersFromCallContext(callContext);
final UserSession session = sessionsManager.getUserSession(callContext.peerIdentity(), headers);
final FlightPreparedStatement flightPreparedStatement = flightWorkManager
.createPreparedStatement(flightDescriptor, callContext::isCancelled, session);
// 獲取Flight 資訊也是userworker的rpc 呼叫,只是處理比較快,在包裝的使用時候了一個阻塞處理(while 迴圈,基於timeout 處理返回),保證可以獲取需要的資料
return flightPreparedStatement.getFlightInfo(location);
}
FlightPreparedStatement 對於阻塞資料獲取的處理
public FlightInfo getFlightInfo(Location location) {
//
final UserProtos.CreatePreparedStatementArrowResp createPreparedStatementResp = responseHandler.get();
final Schema schema = buildSchema(createPreparedStatementResp.getPreparedStatement().getArrowSchema());
final PreparedStatementTicket preparedStatementTicketContent = PreparedStatementTicket.newBuilder()
.setQuery(query)
.setHandle(createPreparedStatementResp.getPreparedStatement().getServerHandle())
.build();
final Ticket ticket = new Ticket(preparedStatementTicketContent.toByteArray());
final FlightEndpoint flightEndpoint = new FlightEndpoint(ticket, location);
return new FlightInfo(schema, flightDescriptor, ImmutableList.of(flightEndpoint), -1, -1);
}
FlightWorkManager 依賴UserWorker以及OptionManager UserWorker 主要是處理任務提交的(以後會有相關實現介紹)OptionManager
主要是關於flight 配置相關的,FlightWorkManager 主要包含兩個方法
public FlightPreparedStatement createPreparedStatement(FlightDescriptor flightDescriptor,
Supplier<Boolean> isRequestCancelled, UserSession userSession)
public void runPreparedStatement(TicketContent.PreparedStatementTicket ticket, FlightProducer.ServerStreamListener listener,
BufferAllocator allocator, UserSession userSession)
runPreparedStatement 方法是直接執行,使用了回撥機制進行結果處理,主要是RunQueryResponseHandler 實現了UserResponseHandler
對於提交的job 我們可以通過UserResponseHandler 回撥處理結果資料
public void runPreparedStatement(TicketContent.PreparedStatementTicket ticket, FlightProducer.ServerStreamListener listener,
BufferAllocator allocator, UserSession userSession) {
final UserBitShared.ExternalId runExternalId = ExternalIdHelper.generateExternalId();
final UserRequest userRequest =
new UserRequest(UserProtos.RpcType.RUN_QUERY,
UserProtos.RunQuery.newBuilder()
.setType(UserBitShared.QueryType.PREPARED_STATEMENT)
.setPriority(UserProtos.QueryPriority.newBuilder()
.setWorkloadType(UserBitShared.WorkloadType.FLIGHT)
.setWorkloadClass(UserBitShared.WorkloadClass.GENERAL))
.setSource(UserProtos.SubmissionSource.FLIGHT)
.setPreparedStatementHandle(ticket.getHandle())
.build());
// listener 包裝一個responseHandler
final UserResponseHandler responseHandler = runQueryResponseHandlerFactory.getHandler(runExternalId, userSession,
workerProvider, optionManagerProvider, listener, allocator);
workerProvider.get().submitWork(runExternalId, userSession, responseHandler, userRequest, TerminationListenerRegistry.NOOP);
}
createPreparedStatement 實際上也是一個任務的排程執行,只是包裝為一個FlightPreparedStatement
public FlightPreparedStatement createPreparedStatement(FlightDescriptor flightDescriptor,
Supplier<Boolean> isRequestCancelled, UserSession userSession) {
final String query = getQuery(flightDescriptor);
final UserProtos.CreatePreparedStatementArrowReq createPreparedStatementReq =
UserProtos.CreatePreparedStatementArrowReq.newBuilder()
.setSqlQuery(query)
.build();
final UserBitShared.ExternalId prepareExternalId = ExternalIdHelper.generateExternalId();
final UserRequest userRequest =
new UserRequest(UserProtos.RpcType.CREATE_PREPARED_STATEMENT_ARROW, createPreparedStatementReq);
final CreatePreparedStatementResponseHandler createPreparedStatementResponseHandler =
new CreatePreparedStatementResponseHandler(prepareExternalId, userSession, workerProvider, isRequestCancelled);
workerProvider.get().submitWork(prepareExternalId, userSession, createPreparedStatementResponseHandler,
userRequest, TerminationListenerRegistry.NOOP);
return new FlightPreparedStatement(flightDescriptor, query, createPreparedStatementResponseHandler);
}
說明
DremioFlightProducer 在dremio 實現flight service 協議中還是一個比較重要的東西,程式碼實際上並不是很多,dremio 對於flight 協議實現,實際上不少
下圖為當前實現了flight的類
參考資料
https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/
https://github.com/apache/arrow/blob/master/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/example/FlightSqlClientDemoApp.java
https://arrow.apache.org/docs/java/reference/index.html
https://github.com/apache/arrow/blob/master/format/Flight.proto