1. 程式人生 > 其它 >clickhouse mysql引擎_ClickHouse和他的朋友們(2)MySQL Protocol和Read呼叫棧

clickhouse mysql引擎_ClickHouse和他的朋友們(2)MySQL Protocol和Read呼叫棧

技術標籤:clickhouse mysql引擎

原文出處:https://bohutang.me/2020/06/07/clickhouse-and-friends-mysql-protocol-read-stack/

作為一個 OLAP 的 DBMS 來說,有2個端非常重要:

  • 使用者如何方便的鏈進來,這是入口端

    • ClickHouse 除了自己的 client 外,還提供了 MySQL/PG/GRPC/HTTP 等接入方式
  • 資料如何方便的掛上去,這是資料來源端

    • ClickHouse 除了自己的引擎外,還可以掛載 MySQL/Kafka 等外部資料來源

這樣內外互通,多條朋友多條路,以實現“資料”級的編排能力。

今天談的是入口端的 MySQL 協議,也是本系列 ClickHouse 的第一個好朋友,使用者可通過 MySQL 客戶端或相關 Driver 直接連結到 ClickHouse,進行資料讀寫等操作。

本文通過 MySQL的 Query 請求,借用呼叫棧來了解下 ClickHouse 的資料讀取全過程。

如何實現?

入口檔案在:MySQLHandler.cpp

握手協議

  1. MySQLClient 傳送 Greeting 資料報文到 MySQLHandler
  2. MySQLHandler 回覆一個 Greeting-Response 報文
  3. MySQLClient 傳送認證報文
  4. MySQLHandler 對認證報文進行鑑權,並返回鑑權結果

MySQL Protocol 實現在:Core/MySQLProtocol.h

Query請求

當認證通過後,就可以進行正常的資料互動了。

  1. 當 MySQLClient 傳送請求:
mysql>SELECT*FROMsystem.numbersLIMIT5;
  1. MySQLHandler 的呼叫棧:
->MySQLHandler::comQuery->executeQuery->pipeline->execute->MySQLOutputFormat::consume
  1. MySQLClient 接收到結果

在步驟2裡,executeQuery(executeQuery.cpp)非常重要。它是所有前端 Server 和 ClickHouse 核心的接入口,第一個引數是 SQL 文字('select 1'),第二個引數是結果集要傳送到哪裡去(socket net)。

呼叫棧分析

SELECT*FROMsystem.numbersLIMIT5

1. 獲取資料來源

StorageSystemNumbers 資料來源:

DB::StorageSystemNumbers::read(std::__1::vector<:__1::basic_string>,std::__1::allocator>,std::__1::allocator<:__1::basic_string>,std::__1::allocator>>>const&,std::__1::shared_ptr<:storageinmemorymetadata>const&,DB::SelectQueryInfoconst&,DB::Contextconst&,DB::QueryProcessingStage::Enum,unsignedlong,unsignedint)StorageSystemNumbers.cpp:135
DB::ReadFromStorageStep::ReadFromStorageStep(std::__1::shared_ptr<:rwlockimpl::lockholderimpl>,std::__1::shared_ptr<:storageinmemorymetadata>&,DB::SelectQueryOptions,
DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum,DB::QueryPlan&,std::__1::shared_ptr<:prewhereinfo>const&,std::__1::vector<:__1::basic_string>,std::__1::allocator>,std::__1::allocator<:__1::basic_string>,std::__1::allocator>>>const&)memory:3028
DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum,DB::QueryPlan&,std::__1::shared_ptr<:prewhereinfo>const&,std::__1::vector<:__1::basic_string>,std::__1::allocator>,std::__1::allocator<:__1::basic_string>,std::__1::allocator>>>const&)InterpreterSelectQuery.cpp:1361
DB::InterpreterSelectQuery::executeImpl(DB::QueryPlan&,std::__1::shared_ptr<:iblockinputstream>const&,std::__1::optional<:pipe>)InterpreterSelectQuery.cpp:791
DB::InterpreterSelectQuery::buildQueryPlan(DB::QueryPlan&)InterpreterSelectQuery.cpp:472
DB::InterpreterSelectWithUnionQuery::buildQueryPlan(DB::QueryPlan&)InterpreterSelectWithUnionQuery.cpp:183
DB::InterpreterSelectWithUnionQuery::execute()InterpreterSelectWithUnionQuery.cpp:198
DB::executeQueryImpl(constchar*,constchar*,DB::Context&,bool,DB::QueryProcessingStage::Enum,bool,DB::ReadBuffer*)executeQuery.cpp:385
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:307
DB::MySQLHandler::run()MySQLHandler.cpp:141

這裡最主要的是 ReadFromStorageStep 函式,從不同 storage 裡獲取資料來源 pipe:

Pipespipes=storage->read(required_columns,metadata_snapshot,query_info,*context,processing_stage,max_block_size,max_streams);

2. Pipeline構造

DB::LimitTransform::LimitTransform(DB::Blockconst&,unsignedlong,unsignedlong,unsignedlong,bool,bool,std::__1::vector<:sortcolumndescription>>)LimitTransform.cpp:21
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:2214
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:2299
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:3570
DB::LimitStep::transformPipeline(DB::QueryPipeline&)memory:4400
DB::LimitStep::transformPipeline(DB::QueryPipeline&)LimitStep.cpp:33
DB::ITransformingStep::updatePipeline(std::__1::vector<:__1::unique_ptr>>,std::__1::allocator<:__1::unique_ptr>>>>)ITransformingStep.cpp:21
DB::QueryPlan::buildQueryPipeline()QueryPlan.cpp:154
DB::InterpreterSelectWithUnionQuery::execute()InterpreterSelectWithUnionQuery.cpp:200
DB::executeQueryImpl(constchar*,constchar*,DB::Context&,bool,DB::QueryProcessingStage::Enum,bool,DB::ReadBuffer*)executeQuery.cpp:385
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&)>)executeQuery.cpp:722
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:307
DB::MySQLHandler::run()MySQLHandler.cpp:141

3. Pipeline執行

DB::LimitTransform::prepare(std::__1::vector>const&,std::__1::vector>const&)LimitTransform.cpp:67
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:291
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::PipelineExecutor::Edge&,std::__1::queue<:pipelineexecutor::executionstate>>>&,unsignedlong)PipelineExecutor.cpp:264
DB::PipelineExecutor::prepareProcessor(unsignedlong,unsignedlong,std::__1::queue<:pipelineexecutor::executionstate>>>&,std::__1::unique_lock<:__1::mutex>)PipelineExecutor.cpp:373
DB::PipelineExecutor::initializeExecution(unsignedlong)PipelineExecutor.cpp:747
DB::PipelineExecutor::executeImpl(unsignedlong)PipelineExecutor.cpp:764
DB::PipelineExecutor::execute(unsignedlong)PipelineExecutor.cpp:479
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&)>)executeQuery.cpp:833
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:307
DB::MySQLHandler::run()MySQLHandler.cpp:141

4. Output執行傳送

DB::MySQLOutputFormat::consume(DB::Chunk)MySQLOutputFormat.cpp:53
DB::IOutputFormat::work()IOutputFormat.cpp:62
DB::executeJob(DB::IProcessor*)PipelineExecutor.cpp:155
operator()PipelineExecutor.cpp:172
DB::PipelineExecutor::executeStepImpl(unsignedlong,unsignedlong,std::__1::atomic*)PipelineExecutor.cpp:630
DB::PipelineExecutor::executeSingleThread(unsignedlong,unsignedlong)PipelineExecutor.cpp:546
DB::PipelineExecutor::executeImpl(unsignedlong)PipelineExecutor.cpp:812
DB::PipelineExecutor::execute(unsignedlong)PipelineExecutor.cpp:479
DB::executeQuery(DB::ReadBuffer&,DB::WriteBuffer&,bool,DB::Context&,std::__1::function,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&,std::__1::basic_string,std::__1::allocator>const&)>)executeQuery.cpp:800
DB::MySQLHandler::comQuery(DB::ReadBuffer&)MySQLHandler.cpp:311
DB::MySQLHandler::run()MySQLHandler.cpp:141

總結

ClickHouse 的模組化比較清晰,像樂高積木一樣可以組合拼裝,當我們執行:

SELECT*FROMsystem.numbersLIMIT5

首先核心解析 SQL 語句生成 AST,然後根據 AST 獲取資料來源 Source,pipeline.Add(Source)。其次根據 AST 資訊生成 QueryPlan,根據 QueryPlan 再生成相應的 Transform,pipeline.Add(LimitTransform)。然後新增 Output Sink 作為資料傳送物件,pipeline.Add(OutputSink)。執行 pipeline, 各個 Transformer 開始工作。

ClickHouse 的 Transformer 排程系統叫做 Processor,也是決定效能的重要模組,詳情見Pipeline 處理器和排程器。ClickHouse 是一輛手動擋的豪華跑車,免費擁有,海嘯們!

文內連結

  • https://github.com/ClickHouse/ClickHouse/blob/master/src/Server/MySQLHandler.cpp
  • https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/MySQLProtocol.h
  • https://bohutang.me/2020/06/11/clickhouse-and-friends-processor/

全文完。

Enjoy ClickHouse:)

葉老師的「MySQL核心優化」大課已升級到MySQL 8.0,掃碼開啟MySQL 8.0修行之旅吧

ed97e15468c4c97d6c0673dd66646db6.png