【譯】StackExchange.Redis 中文文件(八)流
概述
Stream 資料型別是在 Redis 版本 5.0 中新增的,它表示訊息的僅追加日誌。redis.io 上記錄的所有 stream related commands 已在 StackExchange.Redis 客戶端庫中實現。閱讀"Introduction to Redis Streams",以獲取有關原始 Redis 命令以及如何使用流的更多資訊。
寫入流
流中的每條訊息或條目均由 StreamEntry
型別表示。每個流的條目包含一個唯一的ID和一個名稱/值對陣列。名稱/值對由 NameValueEntry
型別表示。
使用以下命令將具有單個名稱/值對的簡單訊息新增到流中:
var db = redis.GetDatabase(); var messageId = db.StreamAdd("event_stream", "foo_name", "bar_value"); // messageId = 1518951480106-0
由 StreamAdd
返回的訊息ID由將訊息新增到流中的毫秒時間和序列號組成。如果在同一毫秒時間建立了兩個或更多訊息,則序列號用於防止ID衝突。
可以使用以下方法將多個名稱/值對寫入流:
var values = new NameValueEntry[] { new NameValueEntry("sensor_id", "1234"), new NameValueEntry("temp", "19.8") }; var db = redis.GetDatabase(); var messageId = db.StreamAdd("sensor_stream", values);
You also have the option to override the auto-generated message ID by passing your own ID to the StreamAdd
method. Other optional parameters allow you to trim the stream's length.
您還可以選擇通過將自己的ID傳遞給StreamAdd方法來覆蓋自動生成的訊息ID。其他可選引數使您可以調整流的長度。
db.StreamAdd("event_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
讀取流
通過使用 StreamRead
或 StreamRange
方法從流中進行讀取。
var messages = db.StreamRead("event_stream", "0-0");
上面的程式碼將從ID "0-0"
到流的末尾讀取所有訊息。你可以選擇使用可選的 count
引數來限制返回的訊息數。
StreamRead
方法還允許你一次從多個流中讀取:
var streams = db.StreamRead(new StreamPosition[]
{
new StreamPosition("event_stream", "0-0"),
new StreamPosition("score_stream", "0-0")
});
Console.WriteLine($"Stream = {streams.First().Key}");
Console.WriteLine($"Length = {streams.First().Entries.Length}");
你可以使用 countPerStream
可選引數來限制每個流返回的訊息數。
StreamRange
方法允許你返回流中的一系列條目。
var messages = db.StreamRange("event_stream", minId: "-", maxId: "+");
-
和 +
特殊字元表示可能的最小和最大ID。這些值是沒有設定引數時的預設值。你還可以選擇通過使用 messageOrder
引數來反向讀取流。StreamRange
方法還提供了通過使用 count
引數來限制返回的條目數的功能。
var messages = db.StreamRange("event_stream",
minId: "0-0",
maxId: "+",
count: 100,
messageOrder: Order.Descending);
流的資訊
StreamInfo
方法提供了讀取有關流的基本資訊的能力:流的第一個和最後一個條目,流的長度,使用者組的數量等。此資訊可用於以更有效的方式處理流。
var info = db.StreamInfo("event_stream");
Console.WriteLine(info.Length);
Console.WriteLine(info.FirstEntry.Id);
Console.WriteLine(info.LastEntry.Id);
消費者組
使用使用者組可讓你擴充套件跨多個工作人員或使用者的流的處理。請閱讀“Introduction to Redis Streams”,以獲取有關消費者群體的詳細資訊。
以下內容建立了一個使用者組,並告訴 Redis 從流中的哪個位置開始讀取。如果你在第一次建立流之前呼叫該方法,則預設情況下,StreamCreateConsumerGroup
方法將為你建立流。你可以通過為createStream可選引數傳遞false來覆蓋此預設行為。
// Returns true if created, otherwise false.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "$");
// or
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", StreamPosition.NewMessages);
特殊字元 "$"
表示使用者組將只讀取在建立使用者組之後建立的訊息。如果要閱讀流中已經存在的訊息,則可以提供流中的任何位置。
// Begin reading from the first position in the stream.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "0-0");
使用 StreamReadGroup
方法將訊息讀入使用者。此方法接受訊息ID作為引數之一。當ID傳遞給 StreamReadGroup
時,Redis 將僅返回給定使用者的待處理訊息,換句話說,它將僅返回使用者已讀取的訊息。
要將新訊息讀入使用者,請使用特殊字元 ">"
或 StreamPosition.NewMessages
。 ">"
特殊字元表示從未讀取過的訊息,從未傳遞給其他消費者。請注意,消費者組中的消費者是在呼叫 StreamReadGroup
方法時首次使用時自動建立的。
// Read 5 messages into two consumers.
var consumer_1_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", ">", count: 5);
var consumer_2_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_2", ">", count: 5);
消費者讀取了一條訊息後,其狀態對於該消費者變為“待處理”狀態,其他任何消費者都無法通過 StreamReadGroup
方法讀取該訊息。可以通過使用 StreamReadGroup
方法併為消費者提供在待處理訊息範圍內的ID來讀取消費者的待處理訊息。
// Read the first pending message for the "consumer_1" consumer.
var message = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", "0-0", count: 1);
還可以通過呼叫 StreamPending
和 StreamPendingMessages
方法來檢索待處理的訊息資訊。StreamPending
返回有關待處理訊息數,每個使用者的待處理訊息以及最高和最低待處理訊息ID的高階資訊。
var pendingInfo = db.StreamPending("events_stream", "events_cg");
Console.WriteLine(pendingInfo.PendingMessageCount);
Console.WriteLine(pendingInfo.LowestPendingMessageId);
Console.WriteLine(pendingInfo.HighestPendingMessageId);
Console.WriteLine($"Consumer count: {pendingInfo.Consumers.Length}.");
Console.WriteLine(pendingInfo.Consumers.First().Name);
Console.WriteLine(pendingInfo.Consumers.First().PendingMessageCount);
使用 StreamPendingMessages
方法檢索有關給定使用者的待處理訊息的詳細資訊。
// Read the first pending message for the consumer.
var pendingMessages = db.StreamPendingMessages("events_stream",
"events_cg",
count: 1,
consumerName: "consumer_1",
minId: pendingInfo.LowestPendingMessageId);
Console.WriteLine(pendingMessages.Single().MessageId);
Console.WriteLine(pendingMessages.Single().IdleTimeInMilliseconds);
訊息在等待消費者處理之前,直到通過呼叫 StreamAcknowledge
方法得到確認為止。訊息被確認後, StreamReadGroup
不能再訪問。
// Returns the number of messages acknowledged.
db.StreamAcknowledge("events_stream", "events_cg", pendingMessage.MessageId);
StreamClaim
方法可用於將消費者使用的訊息所有權更改為其他消費者。
// Change ownership to consumer_2 for the first 5 messages pending for consumer_1.
var pendingMessages = db.StreamPendingMessages("events_stream",
"events_cg",
count: 5,
consumerName: "consumer_1",
minId: "0-0");
db.StreamClaim("events_stream",
"events_cg",
claimingConsumer: "consumer_2",
minIdleTimeInMs: 0,
messageIds: pendingMessages.Select(m => m.MessageId).ToArray());
還有其他幾種使用使用者組處理流的方法。請參考 Streams 單元測試以瞭解這些方法及其用法。
原文地址:Stream