1. 程式人生 > 其它 >MySQL 如何處理監聽連線的

MySQL 如何處理監聽連線的

以一個連線一個執行緒為例,也就是thread_handling為one-thread-per-connection模式,這也是社群版本的唯一模式。


[dba_yix@127.0.0.1][(none)]> show variables like '%thread_hand%';
+-----------------+---------------------------+
| Variable_name   | Value                     |
+-----------------+---------------------------+
| thread_handling | one-thread-per-connection |
+-----------------+---------------------------+
1 row in set (0.01 sec)

  1. 監聽連線

關鍵字:listen_for_connection_event,process_new_connection,poll

/**                                                                           
  連線接受器迴圈以接受來自客戶端的連線。             
*/                                                                            
void connection_event_loop()                                                  
{                                                                             
  Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
  while (!abort_loop)                                                          
  {                                                                           
    Channel_info *channel_info= m_listener->listen_for_connection_event();    
    /* 重點在這個listen_for_connection_event方法裡面,呼叫了poll處理這個監聽描述符,poll傳遞的引數timeout傳遞的是-1,也就是堵塞等待 */
   
    if (channel_info != NULL)      
    /* 處理一個新的連線,channel_info裡面包含了連線資訊。
    不出錯的情況下,會使用mysql_thread_create建立一個執行緒,執行緒的入口函式是handle_connection。見3
    如果有空閒的執行緒會複用。 */
      mgr->process_new_connection(channel_info);                              
  }                                                                           
}                                                                             


  1. 處理監聽
Channel_info* Mysqld_socket_listener::listen_for_connection_event()
{
    int retval= poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);
    /* setup_listener 中設定了和tcp socket監聽和unix socket監聽,存在map型別m_socket_map中。
    所以m_socket_map.size()為2,監聽兩個描述符的POLLIN事件。*/
    
    for (uint i= 0; i < m_socket_map.size(); ++i)  
    {                                              
      if (m_poll_info.m_fds[i].revents & POLLIN)   
      {                                            
        listen_sock= m_poll_info.m_pfs_fds[i];     
        is_unix_socket= m_socket_map[listen_sock]; 
        /* 遍歷監聽描述符,判斷是否是unix套接字。*/
        break;                                     
      }                                            
    }
    
    for (uint retry= 0; retry < MAX_ACCEPT_RETRY; retry++)                        
    {                                                                             
      socket_len_t length= sizeof(struct sockaddr_storage);   
      /* 由於觸發了POLLIN事件,所以這裡accept不會被堵塞。 */
      connect_sock= mysql_socket_accept(key_socket_client_connection, listen_sock,
                                        (struct sockaddr *)(&cAddr), &length);    
    }    
    
    
     Channel_info* channel_info= NULL;     
     /* 封裝連線通道,隱藏UNIX socket和tcp socket的差異。 */
     if (is_unix_socket)                                                        
       channel_info= new (std::nothrow) Channel_info_local_socket(connect_sock);
     else                                                                       
       channel_info= new (std::nothrow) Channel_info_tcpip_socket(connect_sock);
                                                                      
     return channel_info;                                                       
    
}
  1. 處理連線

程式碼路徑conn_handler/connection_handler_per_thread.cc:249

關鍵字:channel_info,thd_manager,handler_manager,init_new_thd,do_command,thd

程式碼概要:

void *handle_connection(void *arg)
{
    Global_THD_manager *thd_manager= Global_THD_manager::get_instance(); 
    // 獲取一個全域性的執行緒管理器(單例模式)
    // thd_manager類似一個門衛,對每個來訪的客人(連線)進行登記,註冊, 銷燬等操作。
    // 此外還包括其他後臺執行緒
    
    Connection_handler_manager *handler_manager= Connection_handler_manager::get_instance();
    // 獲取一個連線處理器(單例模式)。與上面不同的是,這個只處理連線執行緒。
    // 例如增加全域性的connection記數,通過reset_max_used_connections響應set max_collections = 2000這樣的請求等。
    
    Channel_info* channel_info= static_cast<Channel_info*>(arg); 
    // Channel_info抽象基類表示有關新連線的連線通道資訊。子類封裝了不同連線通道型別之間的差異。
    // 所謂的不同連線型別比如,本地套接字,tcp/ip連結、命名管道,共享記憶體等。(後兩者只存在windows平臺)
    
    
    
    for (;;){
    
        THD *thd= init_new_thd(channel_info); /* 使用連線通道初始化一個新的執行緒,
        這個執行緒裡面攜帶了連線資訊,以後每次處理query的時候,
        實際上就是這個thd裡面攜帶了客戶端以及客戶端請求的各種資訊。
        如果這裡出錯了,會增加Aborted_connects的狀態值。同時減少全域性的連線計數。
        */
        
        
        PSI_thread *psi= PSI_THREAD_CALL(get_thread)();/* 。psi用於performance監控。(PSI_THREAD_CALL為ps慣用巨集,pfs_get_thread_v1這個函式最終呼叫了pthread庫的pthread_getspecific,用於操作執行緒中的全域性變數。
        */
        thd->set_psi(psi); // 掛到執行緒中,以便後續用這個psi進行效能監控。
        
        thd_manager->add_thd(thd);// 通過執行緒管理器將這個執行緒物件加到執行緒連結串列中。
        
        if (thd_prepare_connection(thd)) // 連線前檢查,包括握手、授權客戶端和更新執行緒ACL// 判斷是tcpip還是本地套接字連結。設定長連結等。來訪者ip是否允許訪問等等。
            handler_manager->inc_aborted_connects(); // 如果檢查失敗需要中斷連結
        else 
        {
            while (thd_connection_alive(thd))
            {
                if (do_command(thd)) // ✨✨連*狀態是ok的話,會一直執行執行緒的命令請求。這裡就是我們函式的互動入口了。
                    break;
            }
            end_connection(thd); // 連線結束處理。
        }
        close_connection(thd, 0, false, false); // 關閉連線
        
        
        thd->get_stmt_da()->reset_diagnostics_area(); // 釋放資源
        thd->release_resources();  。
        
        
        thd_manager->remove_thd(thd);  // 執行緒管理器移除這個連線執行緒        
        Connection_handler_manager::dec_connection_count(); // 減少全域性connection計數。
        
        delete thd; // 釋放記憶體。
        
        if (abort_loop) // 伺服器關閉中需要結束pthread。正常來說,不會走這個路線。
            break;       // abort_loop 是一個全域性變數,當資料庫關閉時,會設定為true。
            
        channel_info= Per_thread_connection_handler::block_until_new_connection(); 
        // 連線重用。
        
    }
                                                  
    
    
}



綜上畫了一個圖: