clickhouse mysql引擎_ClickHouse和他的朋友們(2)MySQL Protocol和Read呼叫棧
技術標籤:clickhouse mysql引擎
原文出處:https://bohutang.me/2020/06/07/clickhouse-and-friends-mysql-protocol-read-stack/
作為一個 OLAP 的 DBMS 來說,有2個端非常重要:
使用者如何方便的鏈進來,這是入口端
- ClickHouse 除了自己的 client 外,還提供了 MySQL/PG/GRPC/HTTP 等接入方式
資料如何方便的掛上去,這是資料來源端
- ClickHouse 除了自己的引擎外,還可以掛載 MySQL/Kafka 等外部資料來源
這樣內外互通,多條朋友多條路,以實現“資料”級的編排能力。
今天談的是入口端的 MySQL 協議,也是本系列 ClickHouse 的第一個好朋友,使用者可通過 MySQL 客戶端或相關 Driver 直接連結到 ClickHouse,進行資料讀寫等操作。
本文通過 MySQL的 Query 請求,借用呼叫棧來了解下 ClickHouse 的資料讀取全過程。
如何實現?
入口檔案在:MySQLHandler.cpp
握手協議
- MySQLClient 傳送 Greeting 資料報文到 MySQLHandler
- MySQLHandler 回覆一個 Greeting-Response 報文
- MySQLClient 傳送認證報文
- MySQLHandler 對認證報文進行鑑權,並返回鑑權結果
MySQL Protocol 實現在:Core/MySQLProtocol.h
Query請求
當認證通過後,就可以進行正常的資料互動了。
- 當 MySQLClient 傳送請求:
mysql>SELECT*FROMsystem.numbersLIMIT5;
- MySQLHandler 的呼叫棧:
->MySQLHandler::comQuery->executeQuery->pipeline->execute->MySQLOutputFormat::consume
- MySQLClient 接收到結果
在步驟2裡,executeQuery(executeQuery.cpp)非常重要。它是所有前端 Server 和 ClickHouse 核心的接入口,第一個引數是 SQL 文字('select 1'),第二個引數是結果集要傳送到哪裡去(socket net)。
呼叫棧分析
SELECT*FROMsystem.numbersLIMIT5
1. 獲取資料來源
StorageSystemNumbers 資料來源:
DB::StorageSystemNumbers::read(std::__1::vector<:__1::basic_string>,std::__1::allocator>,std::__1::allocator<:__1::basic_string>,std::__1::allocator>>>const&,std::__1::shared_ptr<:storageinmemorymetadata>const&,DB::SelectQueryInfoconst&,DB::Contextconst&,DB::QueryProcessingStage::Enum,unsignedlong,unsignedint)StorageSystemNumbers.cpp:135
DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr<:rwlockimpl::lockholderimpl>,std::__1::shared_ptr<:storageinmemorymetadata>&,DB::SelectQueryOptions,
DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum,DB::QueryPlan&,std::__1::shared_ptr<:prewhereinfo>const&,std::__1::vector<:__1::basic_string>,std::__1::allocator>,std::__1::allocator<:__1::basic_string>,std::__1::allocator>>>const&)memory:3028
DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum,DB::QueryPlan&,std::__1::shared_ptr<:prewhereinfo>const&,std::__1::vector<:__1::basic_string>,std::__1::allocator>,std::__1::allocator<:__1::basic_string>,std::__1::allocator>>>const&)InterpreterSelectQuery.cpp:1361
DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&,std::__1::shared_ptr<:iblockinputstream>const&,std::__1::optional<:pipe>)InterpreterSelectQuery.cpp:791
DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&)InterpreterSelectQuery.cpp:472
DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&)InterpreterSelectWithUnionQuery.cpp:183
DB::InterpreterSelectWithUnionQuery::execute()InterpreterSelectWithUnionQuery.cpp:198
DB::executeQueryImpl(constchar*,constchar*,DB::Context&,bool,DB::QueryProcessingStage::Enum,bool,DB::ReadBuffer*)executeQuery.cpp:385
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:307
DB::MySQLHandler::run()MySQLHandler.cpp:141
這裡最主要的是 ReadFromStorageStep 函式,從不同 storage 裡獲取資料來源 pipe:
Pipespipes=storage->read(required_columns,metadata_snapshot,query_info,*context,processing_stage,max_block_size,max_streams);
2. Pipeline構造
DB::LimitTransform::LimitTransform(DB::Blockconst&,unsignedlong,unsignedlong,unsignedlong,bool,bool,std::__1::vector<:sortcolumndescription>>)LimitTransform.cpp:21
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:2214
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:2299
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:3570
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:4400
DB::LimitStep::transformPipeline(DB::QueryPipeline&)LimitStep.cpp:33
DB::ITransformingStep::updatePipeline(std::__1::vector<:__1::unique_ptr>>,std::__1::allocator<:__1::unique_ptr>>>>)ITransformingStep.cpp:21
DB::QueryPlan::buildQueryPipeline()QueryPlan.cpp:154
DB::InterpreterSelectWithUnionQuery::execute()InterpreterSelectWithUnionQuery.cpp:200
DB::executeQueryImpl(constchar*,constchar*,DB::Context&,bool,DB::QueryProcessingStage::Enum,bool,DB::ReadBuffer*)executeQuery.cpp:385
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&)>)executeQuery.cpp:722
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:307
DB::MySQLHandler::run()MySQLHandler.cpp:141
3. Pipeline執行
DB::LimitTransform::prepare(std::__1::vector>const&,std::__1::vector>const&)LimitTransform.cpp:67
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:291
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::initializeExecution(unsignedlong)PipelineExecutor.cpp:747
DB::PipelineExecutor::executeImpl(unsignedlong)PipelineExecutor.cpp:764
DB::PipelineExecutor::execute(unsignedlong)PipelineExecutor.cpp:479
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&)>)executeQuery.cpp:833
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:307
DB::MySQLHandler::run()MySQLHandler.cpp:141
4. Output執行傳送
DB::MySQLOutputFormat::consume(DB::Chunk)MySQLOutputFormat.cpp:53
DB::IOutputFormat::work()IOutputFormat.cpp:62
DB::executeJob(DB::IProcessor*)PipelineExecutor.cpp:155
operator()PipelineExecutor.cpp:172
DB::PipelineExecutor::executeStepImpl(unsignedlong,unsignedlong,std::__1::atomic*)PipelineExecutor.cpp:630
DB::PipelineExecutor::executeSingleThread(unsignedlong,unsignedlong)PipelineExecutor.cpp:546
DB::PipelineExecutor::executeImpl(unsignedlong)PipelineExecutor.cpp:812
DB::PipelineExecutor::execute(unsignedlong)PipelineExecutor.cpp:479
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&)>)executeQuery.cpp:800
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:311
DB::MySQLHandler::run()MySQLHandler.cpp:141
總結
ClickHouse 的模組化比較清晰,像樂高積木一樣可以組合拼裝,當我們執行:
SELECT*FROMsystem.numbersLIMIT5
首先核心解析 SQL 語句生成 AST,然後根據 AST 獲取資料來源 Source,pipeline.Add(Source)。其次根據 AST 資訊生成 QueryPlan,根據 QueryPlan 再生成相應的 Transform,pipeline.Add(LimitTransform)。然後新增 Output Sink 作為資料傳送物件,pipeline.Add(OutputSink)。執行 pipeline, 各個 Transformer 開始工作。
ClickHouse 的 Transformer 排程系統叫做 Processor,也是決定效能的重要模組,詳情見Pipeline 處理器和排程器。ClickHouse 是一輛手動擋的豪華跑車,免費擁有,海嘯們!
文內連結
- https://github.com/ClickHouse/ClickHouse/blob/master/src/Server/MySQLHandler.cpp
- https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/MySQLProtocol.h
- https://bohutang.me/2020/06/11/clickhouse-and-friends-processor/
全文完。
Enjoy ClickHouse:)
葉老師的「MySQL核心優化」大課已升級到MySQL 8.0,掃碼開啟MySQL 8.0修行之旅吧