1. 程式人生 > >在 C/C++ 非同步 I/O 中使用 MariaDB 的非阻塞介面

在 C/C++ 非同步 I/O 中使用 MariaDB 的非阻塞介面

對 C/C++,MySQL 提供的庫傳統上都是阻塞操作,因此適合多執行緒 / 程序伺服器架構程式設計。但是如果用 C/C++ 編寫伺服器,往往對效能會有極致要求,此時採用非阻塞的非同步 I/O 才是更好的框架。

所幸,從 MySQL fork 出來的 MariaDB 提供了非同步的 C/C++ MySQL client 介面。下面是本人對官方文件的翻譯。後續我會在本人設計的 libcoevent 庫中新增非同步 MariaDB client 的支援。

概述

MariaDB 非阻塞 API 是基於普通的阻塞式的庫呼叫設計的,這就使得這些 PIA 便於學習和記憶;這也使得將使用阻塞式的程式碼改寫為非阻塞式的工作變得簡單許多(反之亦然)。同時,這也便於在同一個程式碼目錄中混合使用阻塞和非阻塞呼叫架構。

針對每一個可能阻塞套接字 I/O 的庫函式,比如 int mysql_real_query(mysql, query, query_length),我們會引入兩個非阻塞呼叫:

int mysql_real_query_start(&status, MYSQL, query, query_length)
int mysql_real_query_cont(&status, MYSQL, query_status)

為了做到非阻塞的操作,應用程式首先呼叫 mysql_real_query_start() 而不是 mysql_real_query(),除了第一個引數之外,剩餘引數兩者相同。

如果 mysql_real_query_start() 返回 0,則表示函式操作完成了,同時 status 變數被設定為通常 mysql_real_query() 的返回值。否則如果 mysql_real_query_start() 返回非零,則返回值表示一個位掩碼值,表示當前庫正在等待中的標誌位。這些標誌可以是 MYSQL_WAIT_READ, MYSQL_WAIT_WRITE或者 MYSQL_WAIT_EXEP,對應於 select() 或者 poll() 等系統呼叫中的類似標誌位。同時,當正在等待超時的時候,也可以包含 MYSQL_WAIT_TIMEOUT 標誌。

這種情況下,應用程式可以繼續處理其他事件,並且定期檢查在套接字上的適當條件標誌或超時標誌。當事件發生時,應用程式可以通過呼叫 mysql_real_query_cont()

來恢復操作,並在 wait_status 變數中傳入實際發生的位掩碼。

正如 mysql_real_query_start() 一樣,當 mysql_real_query_cont() 操作結束時,返回 0,否則返回器需要繼續等待著的標誌位掩碼。因此,應用程式同樣需要繼續呼叫 mysql_real_query_cont(),並根據需要,混合處理其他事件,直到返回 0 為止。同樣地,返回值儲存在 status 變數中。

有些呼叫並不會做任何套接字 I/O 操作,也不會阻塞,比如 mysql_option()。對於這些介面,並不會新增獨立的 _start()_cont()函式。參見 “Non-blocking API reference” 頁面,檢視完整的阻塞與不阻塞函式的列表。

可以使用 select()poll() 等類似機制來檢查套接字或超時事件。不過實際上往往是用更高一層封裝的、提供註冊和處理這類事件的工具的框架中去完成這些工作(比如 libevent)。

可以通過呼叫 mysql_get_socket() 函式來獲得需要檢查的時間的套接字,超時時間則可以通過 mysql_get_timeout_value() 來獲得。

下面是一個使用非阻塞 API 進行一次查詢的簡單(但完整)的示例。這個例子在 MariaDB 程式碼樹中的 client/async_example.c 中;另一個比較大、但是更加貼近實際的、使用 libevent 的例子則是 tests/asyny_queries.c

static void run_query(const char *host, const char *user, const char *password)
{
  int err, status;
  MYSQL mysql, *ret;
  MYSQL_RES *res;
  MYSQL_ROW row;

  mysql_init(&mysql);
  mysql_options(&mysql, MYSQL_OPT_NONBLOCK, 0);

  status = mysql_real_connect_start(&ret, &mysql, host, user, password, NULL, 0, NULL, 0);
  while (status) {
    status = wait_for_mysql(&mysql, status);
    status = mysql_real_connect_cont(&ret, &mysql, status);
  }

  if (!ret)
    fatal(&mysql, "Failed to mysql_real_connect()");

  status = mysql_real_query_start(&err, &mysql, SL("SHOW STATUS"));
  while (status) {
    status = wait_for_mysql(&mysql, status);
    status = mysql_real_query_cont(&err, &mysql, status);
  }
  if (err)
    fatal(&mysql, "mysql_real_query() returns error");

  /* This method cannot block. */
  res= mysql_use_result(&mysql);
  if (!res)
    fatal(&mysql, "mysql_use_result() returns error");

  for (;;) {
    status= mysql_fetch_row_start(&row, res);
    while (status) {
      status= wait_for_mysql(&mysql, status);
      status= mysql_fetch_row_cont(&row, res, status);
    }
    if (!row)
      break;
    printf("%s: %s\n", row[0], row[1]);
  }
  if (mysql_errno(&mysql))
    fatal(&mysql, "Got error while retrieving rows");
  mysql_free_result(res);
  mysql_close(&mysql);
}

/* Helper function to do the waiting for events on the socket. */
static int wait_for_mysql(MYSQL *mysql, int status) {
  struct pollfd pfd;
  int timeout, res;

  pfd.fd = mysql_get_socket(mysql);
  pfd.events =
    (status & MYSQL_WAIT_READ ? POLLIN : 0) |
    (status & MYSQL_WAIT_WRITE ? POLLOUT : 0) |
    (status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0);
  if (status & MYSQL_WAIT_TIMEOUT)
    timeout = 1000*mysql_get_timeout_value(mysql);
  else
    timeout = -1;
  res = poll(&pfd, 1, timeout);
  if (res == 0)
    return MYSQL_WAIT_TIMEOUT;
  else if (res < 0)
    return MYSQL_WAIT_TIMEOUT;
  else {
    int status = 0;
    if (pfd.revents & POLLIN) status |= MYSQL_WAIT_READ;
    if (pfd.revents & POLLOUT) status |= MYSQL_WAIT_WRITE;
    if (pfd.revents & POLLPRI) status |= MYSQL_WAIT_EXCEPT;
    return status;
  }
}

設定 MySQL 非阻塞標誌

在使用任意一個非阻塞操作之前,有必要通過設定 MYSQL_OPT_NONBLOCK選項來啟用非阻塞功能:

mysql_options(&mysql, MYSQL_OPTION_NONBLOCK, 0)

這個呼叫可以在任何時候呼叫,不過典型情況下是在最開始的時候完成,也就是在 mysql_real_connect() 之前。不過這依然可以在任何開始使用非阻塞操作的時候呼叫。如果在沒有使用 MYSQL_OPT_NONBLOCK 的情況下嘗試任何非阻塞操作,應用程式一般情況下會因為空指標異常崩潰。

MYSQL_OPTION_NONBLOCK 的引數是正在等待 I/O、並且應用程式正在做其他操作時用於儲存非阻塞操作的狀態(state)的棧大小。正常情況下,應用程式不需要修改這個值,可以傳入 0 以使用預設值。

混合阻塞和非阻塞操作

在同一個 MYSQL 連線中混合使用阻塞和非阻塞操作是完全可行的。

因此,應用程式可以做普通的阻塞式的 mysql_real_connect(),然後依序執行一個非阻塞的 mysql_real_query_start()。反之亦然:先做一個非阻塞的 mysql_real_connect_start(),然後晚些時間執行後續的 mysql_real_query()

混合操作允許程式碼在發生忙等待也影響不大的地方使用較為簡單的的阻塞式 API 時非常有用。比如在程式啟動的時候建立連線,或者是在多個大型的、長耗時的查詢中,執行短且快的小型查詢。

唯一的限制是,在開始一個新的阻塞式(或非阻塞)操作之前,上一個的非阻塞式操作必須已經完成。參見下一章節:”儘早終止非阻塞操作“。

提前終止非阻塞過程

當使用 mysql_real_query_start()或其他 _start() 函式啟動了一個非阻塞操作之後,它必須在啟動一個新的操作之前完成。因此,應用程式必須繼續呼叫 `mysql_real_query_cont() 直到返回 0 —— 表示目前操作已經完成。不允許在流程的中間掛起一個操作不管,然後啟動一個新的。

儘管如此,允許在出列非阻塞操作的流程的中途呼叫通過 mysql_close() 來完全中止連線。一個新的連線在發起查詢操作之前必須以 mysql_real_connect() 開始,這個連線可以使用新的 MYSQL 物件或者是複用舊的。

未來我們可能會實現一個 abort 機制,用於強制一個正在進行中的操作儘可能快地中止掉(不過疼然需要在 abort 之後呼叫一次 mysql_real_query_cont()),並且允許其進行清理操作並且立即返回合適的錯誤碼。

限制

DNS

當傳遞一個主機名給 mysql_real_connect_start() 時(相對於一個本地 unix 套接字或者是 IP 地址),它可能會需要在 DNS 中查詢這個主機名,取決於本地的配置(比如該名字不在 /etc/hosts 或快取中)。這一個 DNS 查詢並不會以非阻塞方式來完成。這就意味著 mysql_real_connect_start() 在等待 DNS 響應的時候可能不會將 CPU 控制權交還給應用程式。因此,如果 DNS 查詢很慢或不可用的時候,應用程式會 “掛起” 一段時間。

如果這是一個大問題的話,應用程式可以傳遞一個 IP 地址給 mysql_real_connect_start()而不是主機名以避免該情況的發生。應用程式可以採用作業系統或事件框架提供的任何非阻塞的 DNS 查詢機制來實現主機名的解析以實現 IP 地址的獲取。又或者一個簡單的解決方法是,將主機名新增到本地的主機查詢檔案中(在 Posix / Unix / Linux 機器中則是 /etc/hosts 檔案)。

Windows 命名管道和共享記憶體連線

對使用 Windows 命名管道和共享記憶體的連線,目前沒有非阻塞 API 可支援。

使用阻塞或者是非阻塞的 API,命名管道和共享記憶體連線依然是可用的。儘管如此,需要阻塞在命名管道的 I/O 的操作,仍然不會(想上文那樣)將 CPU 控制權交回給應用程式;相反,它們會 “掛起” 並等待操作完成,就像普通的阻塞 API 一樣。