分散式 PostgreSQL 叢集(Citus)官方示例 - 實時儀表盤
Citus
提供對大型資料集的實時查詢。我們在 Citus
常見的一項工作負載涉及為事件資料的實時儀表板提供支援。
例如,您可以是幫助其他企業監控其 HTTP
流量的雲服務提供商。每次您的一個客戶端收到 HTTP
請求時,您的服務都會收到一條日誌記錄。您想要攝取所有這些記錄並建立一個 HTTP
分析儀表板,為您的客戶提供洞察力,例如他們的網站服務的 HTTP 錯誤數量
。 重要的是,這些資料以儘可能少的延遲顯示出來,這樣您的客戶就可以解決他們網站的問題。 儀表板顯示歷史趨勢圖也很重要。
或者,也許您正在建立一個廣告網路,並希望向客戶展示其廣告系列的點選率。 在此示例中,延遲也很關鍵,原始資料量也很高,歷史資料和實時資料都很重要。
在本節中,我們將演示如何構建第一個示例的一部分,但該架構同樣適用於第二個和許多其他用例。
- real-time-analytics-Hands-On-Lab-Hyperscale-Citus
- Architecting Real-Time Analytics for your Customers
資料模型
我們正在處理的資料是不可變的日誌資料流。我們將直接插入 Citus
Kafka
之類的東西進行路由也很常見。 這樣做具有通常的優勢,並且一旦資料量變得難以管理,就可以更容易地預先聚合資料。
我們將使用一個簡單的 schema
來攝取 HTTP
事件資料。 這個 schema
作為一個例子來展示整體架構;一個真實的系統可能會使用額外的列。
-- this is run on the coordinator CREATE TABLE http_request ( site_id INT, ingest_time TIMESTAMPTZ DEFAULT now(), url TEXT, request_country TEXT, ip_address TEXT, status_code INT, response_time_msec INT ); SELECT create_distributed_table('http_request', 'site_id');
當我們呼叫 create_distributed_table 時,我們要求 Citus
使用 site_id
列對 http_request
進行 hash
分配。 這意味著特定站點的所有資料都將存在於同一個分片中。
- create_distributed_table
UDF
使用分片計數的預設配置值。我們建議在叢集中使用 2-4
倍於 CPU
核的分片。使用這麼多分片可以讓您在新增新的工作節點後重新平衡叢集中的資料。
Azure Database for PostgreSQL
— 超大規模 (Citus
) 使用流式複製來實現高可用性,因此維護分片副本將是多餘的。在任何流複製不可用的生產環境中,您應該將 citus.shard_replication_factor
設定為 2
或更高以實現容錯。
- Azure Database for PostgreSQL
- 流式複製
有了這個,系統就可以接受資料並提供查詢了! 在繼續執行本文中的其他命令時,讓以下迴圈在後臺的 psql
控制檯中執行。 它每隔一兩秒就會生成假資料。
DO $$
BEGIN LOOP
INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec
) VALUES (
trunc(random()*32), clock_timestamp(),
concat('http://example.com/', md5(random()::text)),
('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
concat(
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2), '.',
trunc(random()*250 + 2)
)::inet,
('{200,404}'::int[])[ceil(random()*2)],
5+trunc(random()*150)
);
COMMIT;
PERFORM pg_sleep(random() * 0.25);
END LOOP;
END $$;
攝取資料後,您可以執行儀表板查詢,例如:
SELECT
site_id,
date_trunc('minute', ingest_time) as minute,
COUNT(1) AS request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
WHERE date_trunc('minute', ingest_time) > now() - '5 minutes'::interval
GROUP BY site_id, minute
ORDER BY minute ASC;
上述設定有效,但有兩個缺點:
- 每次需要生成圖表時,您的 HTTP 分析儀表板都必須遍歷每一行。 例如,如果您的客戶對過去一年的趨勢感興趣,您的查詢將從頭開始彙總過去一年的每一行。
- 您的儲存成本將隨著攝取率和可查詢歷史的長度成比例增長。 在實踐中,您可能希望將原始事件保留較短的時間(一個月)並檢視較長時間(年)的歷史圖表。
彙總
您可以通過將原始資料彙總為預聚合形式來克服這兩個缺點。 在這裡,我們將原始資料彙總到一個表中,該表儲存 1 分鐘
間隔的摘要。 在生產系統中,您可能還需要類似 1 小時
和 1 天
的間隔,這些都對應於儀表板中的縮放級別。 當用戶想要上個月的請求時間時,儀表板可以簡單地讀取並繪製過去 30 天
每一天的值。
CREATE TABLE http_request_1min (
site_id INT,
ingest_time TIMESTAMPTZ, -- which minute this row represents
error_count INT,
success_count INT,
request_count INT,
average_response_time_msec INT,
CHECK (request_count = error_count + success_count),
CHECK (ingest_time = date_trunc('minute', ingest_time))
);
SELECT create_distributed_table('http_request_1min', 'site_id');
CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);
這看起來很像前面的程式碼塊。最重要的是:它還在 site_id
上進行分片,並對分片計數和複製因子使用相同的預設配置。因為這三個都匹配,所以 http_request
分片和 http_request_1min
分片之間存在一對一的對應關係,Citus
會將匹配的分片放在同一個 worker
上。這稱為協同定位(co-location)
; 它使諸如聯接(join)
之類的查詢更快,並使我們的彙總成為可能。
為了填充 http_request_1min
,我們將定期執行 INSERT INTO SELECT
。 這是可能的,因為這些表位於同一位置。為方便起見,以下函式將彙總查詢包裝起來。
-- single-row table to store when we rolled up last
CREATE TABLE latest_rollup (
minute timestamptz PRIMARY KEY,
-- "minute" should be no more precise than a minute
CHECK (minute = date_trunc('minute', minute))
);
-- initialize to a time long ago
INSERT INTO latest_rollup VALUES ('10-10-1901');
-- function to do the rollup
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
curr_rollup_time timestamptz := date_trunc('minute', now());
last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
) SELECT
site_id,
date_trunc('minute', ingest_time),
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
-- roll up only data new since last_rollup_time
WHERE date_trunc('minute', ingest_time) <@
tstzrange(last_rollup_time, curr_rollup_time, '(]')
GROUP BY 1, 2;
-- update the value in latest_rollup so that next time we run the
-- rollup it will operate on data newer than curr_rollup_time
UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;
上述函式應該每分鐘呼叫一次。您可以通過在 coordinator
節點上新增一個 crontab
條目來做到這一點:
* * * * * psql -c 'SELECT rollup_http_request();'
或者,諸如 pg_cron 之類的擴充套件允許您直接從資料庫安排週期性查詢。
之前的儀表板查詢現在好多了:
SELECT site_id, ingest_time as minute, request_count,
success_count, error_count, average_response_time_msec
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
過期的舊資料
彙總使查詢更快,但我們仍然需要使舊資料過期以避免無限的儲存成本。 只需決定您希望為每個粒度保留資料多長時間,然後使用標準查詢刪除過期資料。 在以下示例中,我們決定將原始資料保留一天,將每分鐘的聚合保留一個月:
DELETE FROM http_request WHERE ingest_time < now() - interval '1 day';
DELETE FROM http_request_1min WHERE ingest_time < now() - interval '1 month';
在生產中,您可以將這些查詢包裝在一個函式中,並在 cron job
中每分鐘呼叫一次。
通過在 Citrus
雜湊分佈之上使用表範圍分割槽,資料過期可以更快。 有關詳細示例,請參閱時間序列資料部分。
這些是基礎!我們提供了一種架構,可以攝取 HTTP 事件
,然後將這些事件彙總到它們的預聚合形式中。 這樣,您既可以儲存原始事件,也可以通過亞秒級查詢
為您的分析儀表板提供動力。
接下來的部分將擴充套件基本架構,並向您展示如何解決經常出現的問題。
近似不同計數
HTTP 分析
中的一個常見問題涉及近似的不同計數:上個月有多少獨立訪問者訪問了您的網站?準確地回答這個問題需要將所有以前見過的訪問者的列表儲存在彙總表中,這是一個令人望而卻步的資料量。然而,一個近似的答案更易於管理。
一種稱為 hyperloglog
或 HLL
的資料型別可以近似地回答查詢; 要告訴您一個集合中大約有多少個獨特元素,需要的空間非常小。其精度可以調整。 我們將使用僅使用 1280
位元組的那些,將能夠以最多 2.2%
的錯誤計算多達數百億的唯一訪問者。
如果您要執行全域性查詢,則會出現類似的問題,例如在上個月訪問您客戶的任何站點的唯一 IP
地址的數量。在沒有 HLL
的情況下,此查詢涉及將 IP
地址列表從 worker
傳送到 coordinator
以進行重複資料刪除。這既是大量的網路流量,也是大量的計算。 通過使用 HLL
,您可以大大提高查詢速度。
首先你必須安裝 HLL
擴充套件;github repo 有說明。接下來,您必須啟用它:
- postgresql-hll
CREATE EXTENSION hll;
這在 Hyperscale
上是不必要的,它已經安裝了 HLL
以及其他有用的擴充套件。
現在我們準備好在 HLL
彙總中跟蹤 IP
地址。首先向彙總表新增一列。
ALTER TABLE http_request_1min ADD COLUMN distinct_ip_addresses hll;
接下來使用我們的自定義聚合來填充列。 只需將它新增到我們彙總函式中的查詢中:
@@ -1,10 +1,12 @@
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
+ , distinct_ip_addresses
) SELECT
site_id,
minute,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
+ , hll_add_agg(hll_hash_text(ip_address)) AS distinct_ip_addresses
FROM http_request
儀表板查詢稍微複雜一些,您必須通過呼叫 hll_cardinality
函式讀出不同數量的 IP
地址:
SELECT site_id, ingest_time as minute, request_count,
success_count, error_count, average_response_time_msec,
hll_cardinality(distinct_ip_addresses) AS distinct_ip_address_count
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - interval '5 minutes';
HLL
不僅速度更快,還可以讓你做以前做不到的事情。 假設我們進行了彙總,但我們沒有使用 HLL
,而是儲存了確切的唯一計數。 這很好用,但您無法回答諸如在過去的一週內,我們丟棄了原始資料有多少不同的會話?
之類的問題。
使用 HLL
,這很容易。您可以使用以下查詢計算一段時間內的不同 IP
計數:
SELECT hll_cardinality(hll_union_agg(distinct_ip_addresses))
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;
您可以在專案的 GitHub 儲存庫中找到有關 HLL
的更多資訊。
- postgresql-hll
- https://github.com/aggregateknowledge/postgresql-hll
使用 JSONB 的非結構化資料
Citus
與 Postgres
對非結構化資料型別的內建支援配合得很好。 為了證明這一點,讓我們跟蹤來自每個國家/地區
的訪客數量。 使用半結構資料型別可以讓您不必為每個國家新增一列,並最終得到具有數百個稀疏填充列的行。我們有一篇博文解釋了半結構化資料使用哪種格式。這篇文章推薦使用 JSONB
,在這裡我們將演示如何將 JSONB
列合併到您的資料模型中。
首先,將新列新增到我們的彙總表中:
ALTER TABLE http_request_1min ADD COLUMN country_counters JSONB;
接下來,通過修改彙總函式將其包含在彙總中:
@@ -1,14 +1,19 @@
INSERT INTO http_request_1min (
site_id, ingest_time, request_count,
success_count, error_count, average_response_time_msec
+ , country_counters
) SELECT
site_id,
minute,
COUNT(1) as request_count,
SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count
SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count
SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
- FROM http_request
+ , jsonb_object_agg(request_country, country_count) AS country_counters
+ FROM (
+ SELECT *,
+ count(1) OVER (
+ PARTITION BY site_id, date_trunc('minute', ingest_time), request_country
+ ) AS country_count
+ FROM http_request
+ ) h
現在,如果您想在儀表板中獲取來自美國的請求數量,您可以將儀表板查詢修改為如下所示:
SELECT
request_count, success_count, error_count, average_response_time_msec,
COALESCE(country_counters->>'USA', '0')::int AS american_visitors
FROM http_request_1min
WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;