1. 程式人生 > 實用技巧 >【譯】StackExchange.Redis 中文文件(八)流

【譯】StackExchange.Redis 中文文件(八)流

概述

Stream 資料型別是在 Redis 版本 5.0 中新增的,它表示訊息的僅追加日誌。redis.io 上記錄的所有 stream related commands 已在 StackExchange.Redis 客戶端庫中實現。閱讀"Introduction to Redis Streams",以獲取有關原始 Redis 命令以及如何使用流的更多資訊。

寫入流

流中的每條訊息或條目均由 StreamEntry 型別表示。每個流的條目包含一個唯一的ID和一個名稱/值對陣列。名稱/值對由 NameValueEntry 型別表示。

使用以下命令將具有單個名稱/值對的簡單訊息新增到流中:

var db = redis.GetDatabase();
var messageId = db.StreamAdd("event_stream", "foo_name", "bar_value");
// messageId = 1518951480106-0

StreamAdd 返回的訊息ID由將訊息新增到流中的毫秒時間和序列號組成。如果在同一毫秒時間建立了兩個或更多訊息,則序列號用於防止ID衝突。

可以使用以下方法將多個名稱/值對寫入流:

var values = new NameValueEntry[]
{
    new NameValueEntry("sensor_id", "1234"),
    new NameValueEntry("temp", "19.8")
};

var db = redis.GetDatabase();
var messageId = db.StreamAdd("sensor_stream", values);

You also have the option to override the auto-generated message ID by passing your own ID to the StreamAdd method. Other optional parameters allow you to trim the stream's length.

您還可以選擇通過將自己的ID傳遞給StreamAdd方法來覆蓋自動生成的訊息ID。其他可選引數使您可以調整流的長度。

db.StreamAdd("event_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);

讀取流

通過使用 StreamReadStreamRange 方法從流中進行讀取。

var messages = db.StreamRead("event_stream", "0-0");

上面的程式碼將從ID "0-0" 到流的末尾讀取所有訊息。你可以選擇使用可選的 count 引數來限制返回的訊息數。

StreamRead 方法還允許你一次從多個流中讀取:

var streams = db.StreamRead(new StreamPosition[]
{
    new StreamPosition("event_stream", "0-0"),
    new StreamPosition("score_stream", "0-0")
});

Console.WriteLine($"Stream = {streams.First().Key}");
Console.WriteLine($"Length = {streams.First().Entries.Length}");

你可以使用 countPerStream 可選引數來限制每個流返回的訊息數。

StreamRange 方法允許你返回流中的一系列條目。

var messages = db.StreamRange("event_stream", minId: "-", maxId: "+");

-+ 特殊字元表示可能的最小和最大ID。這些值是沒有設定引數時的預設值。你還可以選擇通過使用 messageOrder 引數來反向讀取流。StreamRange 方法還提供了通過使用 count 引數來限制返回的條目數的功能。

var messages = db.StreamRange("event_stream",
    minId: "0-0",
    maxId: "+",
    count: 100,
    messageOrder: Order.Descending);

流的資訊

StreamInfo 方法提供了讀取有關流的基本資訊的能力:流的第一個和最後一個條目,流的長度,使用者組的數量等。此資訊可用於以更有效的方式處理流。

var info = db.StreamInfo("event_stream");

Console.WriteLine(info.Length);
Console.WriteLine(info.FirstEntry.Id);
Console.WriteLine(info.LastEntry.Id);

消費者組

使用使用者組可讓你擴充套件跨多個工作人員或使用者的流的處理。請閱讀“Introduction to Redis Streams”,以獲取有關消費者群體的詳細資訊。

以下內容建立了一個使用者組,並告訴 Redis 從流中的哪個位置開始讀取。如果你在第一次建立流之前呼叫該方法,則預設情況下,StreamCreateConsumerGroup 方法將為你建立流。你可以通過為createStream可選引數傳遞false來覆蓋此預設行為。

// Returns true if created, otherwise false.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "$");
// or
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", StreamPosition.NewMessages);

特殊字元 "$" 表示使用者組將只讀取在建立使用者組之後建立的訊息。如果要閱讀流中已經存在的訊息,則可以提供流中的任何位置。

// Begin reading from the first position in the stream.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "0-0");

使用 StreamReadGroup 方法將訊息讀入使用者。此方法接受訊息ID作為引數之一。當ID傳遞給 StreamReadGroup 時,Redis 將僅返回給定使用者的待處理訊息,換句話說,它將僅返回使用者已讀取的訊息。

要將新訊息讀入使用者,請使用特殊字元 ">"StreamPosition.NewMessages">" 特殊字元表示從未讀取過的訊息,從未傳遞給其他消費者。請注意,消費者組中的消費者是在呼叫 StreamReadGroup 方法時首次使用時自動建立的。

// Read 5 messages into two consumers.
var consumer_1_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", ">", count: 5);
var consumer_2_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_2", ">", count: 5);

消費者讀取了一條訊息後,其狀態對於該消費者變為“待處理”狀態,其他任何消費者都無法通過 StreamReadGroup 方法讀取該訊息。可以通過使用 StreamReadGroup 方法併為消費者提供在待處理訊息範圍內的ID來讀取消費者的待處理訊息。

// Read the first pending message for the "consumer_1" consumer.
var message = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", "0-0", count: 1);

還可以通過呼叫 StreamPendingStreamPendingMessages 方法來檢索待處理的訊息資訊。StreamPending 返回有關待處理訊息數,每個使用者的待處理訊息以及最高和最低待處理訊息ID的高階資訊。

var pendingInfo = db.StreamPending("events_stream", "events_cg");

Console.WriteLine(pendingInfo.PendingMessageCount);
Console.WriteLine(pendingInfo.LowestPendingMessageId);
Console.WriteLine(pendingInfo.HighestPendingMessageId);
Console.WriteLine($"Consumer count: {pendingInfo.Consumers.Length}.");
Console.WriteLine(pendingInfo.Consumers.First().Name);
Console.WriteLine(pendingInfo.Consumers.First().PendingMessageCount);

使用 StreamPendingMessages 方法檢索有關給定使用者的待處理訊息的詳細資訊。

// Read the first pending message for the consumer.
var pendingMessages = db.StreamPendingMessages("events_stream",
    "events_cg",
    count: 1,
    consumerName: "consumer_1",
    minId: pendingInfo.LowestPendingMessageId);

Console.WriteLine(pendingMessages.Single().MessageId);
Console.WriteLine(pendingMessages.Single().IdleTimeInMilliseconds);

訊息在等待消費者處理之前,直到通過呼叫 StreamAcknowledge 方法得到確認為止。訊息被確認後, StreamReadGroup 不能再訪問。

// Returns the number of messages acknowledged.
db.StreamAcknowledge("events_stream", "events_cg", pendingMessage.MessageId);

StreamClaim 方法可用於將消費者使用的訊息所有權更改為其他消費者。

// Change ownership to consumer_2 for the first 5 messages pending for consumer_1.
var pendingMessages = db.StreamPendingMessages("events_stream",
    "events_cg",
    count: 5,
    consumerName: "consumer_1",
    minId: "0-0");

db.StreamClaim("events_stream",
    "events_cg",
    claimingConsumer: "consumer_2",
    minIdleTimeInMs: 0,
    messageIds: pendingMessages.Select(m => m.MessageId).ToArray());

還有其他幾種使用使用者組處理流的方法。請參考 Streams 單元測試以瞭解這些方法及其用法。

原文地址:Stream