MySQL 如何處理監聽連線的
阿新 • • 發佈:2021-11-03
以一個連線一個執行緒為例,也就是thread_handling為one-thread-per-connection模式,這也是社群版本的唯一模式。
[dba_yix@127.0.0.1][(none)]> show variables like '%thread_hand%';
+-----------------+---------------------------+
| Variable_name | Value |
+-----------------+---------------------------+
| thread_handling | one-thread-per-connection |
+-----------------+---------------------------+
1 row in set (0.01 sec)
- 監聽連線
關鍵字:listen_for_connection_event
,process_new_connection
,poll
/**
連線接受器迴圈以接受來自客戶端的連線。
*/
void connection_event_loop()
{
Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
while (!abort_loop)
{
Channel_info *channel_info= m_listener->listen_for_connection_event();
/* 重點在這個listen_for_connection_event方法裡面,呼叫了poll處理這個監聽描述符,poll傳遞的引數timeout傳遞的是-1,也就是堵塞等待 */
if (channel_info != NULL)
/* 處理一個新的連線,channel_info裡面包含了連線資訊。
不出錯的情況下,會使用mysql_thread_create建立一個執行緒,執行緒的入口函式是handle_connection。見3
如果有空閒的執行緒會複用。 */
mgr->process_new_connection(channel_info);
}
}
- 處理監聽
Channel_info* Mysqld_socket_listener::listen_for_connection_event()
{
int retval= poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);
/* setup_listener 中設定了和tcp socket監聽和unix socket監聽,存在map型別m_socket_map中。
所以m_socket_map.size()為2,監聽兩個描述符的POLLIN事件。*/
for (uint i= 0; i < m_socket_map.size(); ++i)
{
if (m_poll_info.m_fds[i].revents & POLLIN)
{
listen_sock= m_poll_info.m_pfs_fds[i];
is_unix_socket= m_socket_map[listen_sock];
/* 遍歷監聽描述符,判斷是否是unix套接字。*/
break;
}
}
for (uint retry= 0; retry < MAX_ACCEPT_RETRY; retry++)
{
socket_len_t length= sizeof(struct sockaddr_storage);
/* 由於觸發了POLLIN事件,所以這裡accept不會被堵塞。 */
connect_sock= mysql_socket_accept(key_socket_client_connection, listen_sock,
(struct sockaddr *)(&cAddr), &length);
}
Channel_info* channel_info= NULL;
/* 封裝連線通道,隱藏UNIX socket和tcp socket的差異。 */
if (is_unix_socket)
channel_info= new (std::nothrow) Channel_info_local_socket(connect_sock);
else
channel_info= new (std::nothrow) Channel_info_tcpip_socket(connect_sock);
return channel_info;
}
- 處理連線
程式碼路徑conn_handler/connection_handler_per_thread.cc:249
關鍵字:channel_info
,thd_manager
,handler_manager
,init_new_thd
,do_command
,thd
程式碼概要:
void *handle_connection(void *arg)
{
Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
// 獲取一個全域性的執行緒管理器(單例模式)
// thd_manager類似一個門衛,對每個來訪的客人(連線)進行登記,註冊, 銷燬等操作。
// 此外還包括其他後臺執行緒
Connection_handler_manager *handler_manager= Connection_handler_manager::get_instance();
// 獲取一個連線處理器(單例模式)。與上面不同的是,這個只處理連線執行緒。
// 例如增加全域性的connection記數,通過reset_max_used_connections響應set max_collections = 2000這樣的請求等。
Channel_info* channel_info= static_cast<Channel_info*>(arg);
// Channel_info抽象基類表示有關新連線的連線通道資訊。子類封裝了不同連線通道型別之間的差異。
// 所謂的不同連線型別比如,本地套接字,tcp/ip連結、命名管道,共享記憶體等。(後兩者只存在windows平臺)
for (;;){
THD *thd= init_new_thd(channel_info); /* 使用連線通道初始化一個新的執行緒,
這個執行緒裡面攜帶了連線資訊,以後每次處理query的時候,
實際上就是這個thd裡面攜帶了客戶端以及客戶端請求的各種資訊。
如果這裡出錯了,會增加Aborted_connects的狀態值。同時減少全域性的連線計數。
*/
PSI_thread *psi= PSI_THREAD_CALL(get_thread)();/* 。psi用於performance監控。(PSI_THREAD_CALL為ps慣用巨集,pfs_get_thread_v1)
這個函式最終呼叫了pthread庫的pthread_getspecific,用於操作執行緒中的全域性變數。
*/
thd->set_psi(psi); // 掛到執行緒中,以便後續用這個psi進行效能監控。
thd_manager->add_thd(thd);// 通過執行緒管理器將這個執行緒物件加到執行緒連結串列中。
if (thd_prepare_connection(thd)) // 連線前檢查,包括握手、授權客戶端和更新執行緒ACL,
// 判斷是tcpip還是本地套接字連結。設定長連結等。來訪者ip是否允許訪問等等。
handler_manager->inc_aborted_connects(); // 如果檢查失敗需要中斷連結
else
{
while (thd_connection_alive(thd))
{
if (do_command(thd)) // ✨✨連*狀態是ok的話,會一直執行執行緒的命令請求。這裡就是我們函式的互動入口了。
break;
}
end_connection(thd); // 連線結束處理。
}
close_connection(thd, 0, false, false); // 關閉連線
thd->get_stmt_da()->reset_diagnostics_area(); // 釋放資源
thd->release_resources(); 。
thd_manager->remove_thd(thd); // 執行緒管理器移除這個連線執行緒
Connection_handler_manager::dec_connection_count(); // 減少全域性connection計數。
delete thd; // 釋放記憶體。
if (abort_loop) // 伺服器關閉中需要結束pthread。正常來說,不會走這個路線。
break; // abort_loop 是一個全域性變數,當資料庫關閉時,會設定為true。
channel_info= Per_thread_connection_handler::block_until_new_connection();
// 連線重用。
}
}