[Apache Pulsar] 企業級分散式訊息系統-Pulsar快速上手
Pulsar快速上手
前言
如果你還不瞭解Pulsar訊息系統,可以先看上一篇文章 企業級分散式訊息系統-Pulsar入門基礎
Pulsar客戶端支援多個語言,包括Java,Go,Pytho和C++,本篇文章只講述Java客戶端。
Pulsar Java客戶端既可用於建立訊息的producers、consumers和readers ,也可用於執行管理任務。Java 客戶端的當前版本為 2.4.0。
1. 安裝
最新版本的Pulsar Java 客戶端庫可通過 Maven中央倉庫 使用。 要使用最新版本, 請將 pulsar-client 庫新增到構建配置中。
1.1 Maven
如果你使用maven,新增以下內容到你的 pom.xml 中:
<!-- 在你的 <properties> 部分--> <pulsar.version>2.4.0</pulsar.version> <!-- 在你的 <dependencies> 部分--> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency>
1.2 Gradle
如果你使用Gradle,新增以下內容到你的 build.gradle 中:
def pulsarVersion = '2.4.0' dependencies { compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion }
1.3 本地安裝Pulsar
Pulsar目前只支援MacOS和Linux系統,JDK版本1.8及以上。
下載地址見下載說明及配置,Windows的小夥伴們就不用下載了。
2.連線URL
要使用客戶端連線到Pulsar,你需要指定Pulsar 協議URL。
Pulsar協議URL分配給特定的叢集,使用pulsar
pulsar://localhost:6650
如果有多個broker,那麼URL如下:
pulsar://localhost:6550,localhost:6651,localhost:6652
生產環境的Pulsar 叢集URL如下:
pulsar://pulsar.us-west.example.com:6650
如果需要TLS認證,URL如下:
pulsar+ssl://pulsar.us-west.example.com:6651
3.客戶端配置
你可以用一個URL來例項化一個連線到指定的Pulsar 叢集的 PulsarClient 物件,像這樣:
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();
如果有多個brokers,例項化客戶端如下:
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652") .build();
預設的broker URL是單機叢集。 如果你使用單機模式執行一個叢集,broker將預設使用pulsar://localhost:6650
3.1 生產者
在Pulsar中,生產者寫訊息到topic中。 一旦你例項化一個Pulsar Client物件,你可以建立一個Producer 用於特定的topic。
Producer<byte[]> producer = client.newProducer() .topic("my-topic") .create(); // 然後你就可以傳送訊息到指定的broker 和topic上: producer.send("My message".getBytes());
預設情況下,生產者生成由位元組陣列組成的訊息。當然,你也可以指定訊息型別,例如下面的String型別:
Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic("my-topic") .create(); stringProducer.send("My message");
在不再使用時,你需要確保關閉生產者、消費者和客戶端
producer.close(); consumer.close(); client.close();
關閉操作也可以是非同步的: //...業務程式碼 producer.closeAsync() .thenRun(() -> System.out.println("Producer closed")); .exceptionally((ex) -> { System.err.println("Failed to close producer: " + ex); return ex; });
3.1.1 生產者配置
如果例項化生產者物件時僅指定topic名稱 (如上面的示例所示), 則生產者將使用預設配置。 要使用非預設配置, 你可以設定多種可配置的引數。詳情見ProducerBuilder的文件說明,下面是一個示例:
Producer<byte[]> producer = client.newProducer() .topic("my-topic") //主題名稱 .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) //最大發布延遲時間 .sendTimeout(10, TimeUnit.SECONDS) //超時時間 .blockIfQueueFull(true) //佇列滿了,是否阻塞 .create();
3.1.2 訊息路由 #####
使用分割槽主題時,當你使用生產者釋出訊息時你可以指定路由模式。
3.1.3 非同步傳送
你可以使用Java客戶端非同步釋出訊息。 使用非同步傳送,生產者將訊息放入阻塞佇列並立即返回。 然後,客戶端將在後臺將訊息傳送給broker。 如果佇列已滿(配置的最大值),則在呼叫API時,生產者可能會被阻塞或立即失敗,具體取決於傳遞給生產者的引數。
以下是非同步傳送操作的示例:
producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> { System.out.printf("Message with ID %s successfully sent", msgId); });
3.1.4 訊息配置
除了value之外, 還可以在特定訊息上設定其他選項:
producer.newMessage() .key("my-message-key") //訊息的key .value("my-async-message".getBytes()) //訊息內容的位元組陣列 .property("my-key", "my-value") //自定義的key/value .property("my-other-key", "my-other-value") .send();
3.2 消費者
在Pulsar中,消費者訂閱topic並處理生產者釋出到這些topic的訊息。 你可以首先例項化一個PulsarClient物件並傳給他一個borker URL(和生產樣的一樣)來例項化一個消費者。
一旦例項化一個PulsarClient 物件,你可以指定一個主題和一個訂閱來建立一個 Consumer 消費者。
Consumer consumer = client.newConsumer() .topic("my-topic") //生產者定義的topic .subscriptionName("my-subscription") //消費者自定義的訂閱名稱 .subscribe();
subscribe()方法將自動將訂閱消費者指定的主題, 一種讓消費者監聽主題的方法是使用while迴圈,示例如下:
while (true) { // 等待一個訊息 Message msg = consumer.receive(); try { // 對這個訊息的處理(業務) System.out.printf("Message received: %s", new String(msg.getData())); // 消費者確認訊息已消費,同時broker刪除該訊息 consumer.acknowledge(msg); } catch (Exception e) { // 訊息處理失敗,否定確認,該訊息稍後會重發 consumer.negativeAcknowledge(msg); } }
3.2.1 消費者配置
如果例項化 消費者物件, 僅指定主題和訂閱名稱, 如上面的示例所示, 消費者將採用預設配置。 要使用非預設配置, 你可以設定多種可配置的引數。詳情見ConsumerBuilder的說明,下面是一個示例:
Consumer consumer = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .ackTimeout(10, TimeUnit.SECONDS) //確認超時時間 .subscriptionType(SubscriptionType.Exclusive) //訂閱模式 .subscribe();
3.2.2 非同步接收
receive方法將非同步接受訊息(消費者處理器將被阻塞,直到有訊息到達)。 你也可以使用非同步接收方法,這將在一個新訊息到達時立即返回一個CompletableFuture物件。示例如下:
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
3.2.3 多主題訂閱
消費者除了訂閱單個Pulsar主題外,你還可以使用多主題訂閱訂閱多個主題。 若要使用多主題訂閱, 可以提供一個topic正則表示式 (regex) 或 主題List 。 如果通過 regex 選擇主題, 則所有主題都必須位於同一Pulsar名稱空間中。
下面是一些示例:
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; ConsumerBuilder consumerBuilder = pulsarClient.newConsumer() .subscriptionName(subscription); // 訂閱名稱空間中的所有主題 Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*"); Consumer allTopicsConsumer = consumerBuilder .topicsPattern(allTopicsInNamespace) .subscribe(); // 使用regex訂閱名稱空間中的主題子集 Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*"); Consumer allTopicsConsumer = consumerBuilder .topicsPattern(someTopicsInNamespace) .subscribe();
你還可以訂閱明確的主題列表 (可跨名稱空間):
List<String> topics = Arrays.asList( "topic-1", "topic-2", "topic-3" ); Consumer multiTopicConsumer = consumerBuilder .topics(topics) .subscribe(); // 或者: Consumer multiTopicConsumer = consumerBuilder .topics( "topic-1", "topic-2", "topic-3" ) .subscribe();
你也可以使用subscribeAsync 方法非同步訂閱多主題,下面是一個示例:
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*"); consumerBuilder .topics(topics) .subscribeAsync() .thenAccept(this::receiveMessageFromConsumer); private void receiveMessageFromConsumer(Consumer consumer) { consumer.receiveAsync().thenAccept(message -> { // 業務處理 receiveMessageFromConsumer(consumer); }); }
3.2.4 訂閱模型
Pulsar有多種訂閱模型來適用不同的場景,訂閱模型見Pulsar基礎概念,下面講述如何使用。
為了更好的描述他們之間的不同,假設你建立了一個topic,命名為"my-topic",生產者釋出了10條訊息,示例如下:
//建立生產者 Producer<String> producer = client.newProducer(Schema.STRING) .topic("my-topic") .enableBatch(false) .create(); // "key-1"的訊息有3條 // "key-2"的訊息有3條 // "key-3"的訊息有2條 // "key-4"的訊息有2條 producer.newMessage().key("key-1").value("message-1-1").send(); producer.newMessage().key("key-1").value("message-1-2").send(); producer.newMessage().key("key-1").value("message-1-3").send(); producer.newMessage().key("key-2").value("message-2-1").send(); producer.newMessage().key("key-2").value("message-2-2").send(); producer.newMessage().key("key-2").value("message-2-3").send(); producer.newMessage().key("key-3").value("message-3-1").send(); producer.newMessage().key("key-3").value("message-3-2").send(); producer.newMessage().key("key-4").value("message-4-1").send(); producer.newMessage().key("key-4").value("message-4-2").send();
Exclusive(獨佔模式):
建立一個消費者,以Exclusive模式訂閱訊息,程式碼如下:
Consumer consumer = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Exclusive) //獨佔模式 .subscribe()
只有第一個消費者可以訂閱,其他消費者訂閱會報錯。這就意味著第一個消費者可以收到所有的10條訊息,訊息消費的順序和生產的順序是一樣的。
Failover(災備):
建立一個消費者,以Exclusive模式訂閱訊息,程式碼如下:
//建立消費者1 Consumer consumer1 = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Failover) //災備模式 .subscribe() //建立消費者2 Consumer consumer2 = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Failover) //災備模式 .subscribe()
conumser1是起作用的消費者, consumer2是備用消費者。假設consumer1收到的5條訊息後突然崩了, 那麼consumer2接替,成了起作用的消費者。
當然多個消費者都可以訂閱,但是隻有第一個是可用,第一個消費者斷開連線後,下一個備用的消費者就起作用了。
Shared(共享):
建立一個消費者,以Exclusive模式訂閱訊息,程式碼如下:
Consumer consumer1 = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) //共享模式 .subscribe() Consumer consumer2 = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) .subscribe() //這兩個消費者都是可用的
在共享模式,多個消費者都可以訂閱,訊息在多個消費者之間是以輪詢的方式分發。
如果broke同一時間只發送一個訊息,那麼consume1收到5條訊息:
("key-1", "message-1-1") ("key-1", "message-1-3") ("key-2", "message-2-2") ("key-3", "message-3-1") ("key-4", "message-4-1")
消費者2收到另外5條訊息。
總之,共享模式和其他兩種模式不同,共享模式有更好的靈活性,但是不能保證訊息的順序。
Key_share
這是2.4.0版本後新出的訂閱模式,程式碼如下:
Consumer consumer1 = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Key_Shared) //key共享模式 .subscribe() Consumer consumer2 = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Key_Shared) .subscribe()
KeyShared和Shared模式類似,區別在於KeyShared模式下,具有相同key的訊息分發到同一個消費者。
消費者1最後收到5條訊息:
("key-1", "message-1-1") ("key-1", "message-1-2") ("key-1", "message-1-3") ("key-3", "message-3-1") ("key-3", "message-3-2")
消費者2收到另外5條。
如果該模式下訊息的key沒有指定,那麼所有的訊息預設分發到同一消費者。
3.2.5 Reader介面
使用 reader 介面, Pulsar客戶可以在topic中“手動定位”,從指定的訊息開始向前讀取所有訊息。Pulsar Java API 可以建立Reader物件,同時指定一個 topic, 一個MessageId ,和ReaderConfiguration。
下面是一個示例:
ReaderConfiguration conf = new ReaderConfiguration(); byte[] msgIdBytes = // 一些訊息ID 的位元組陣列 MessageId id = MessageId.fromByteArray(msgIdBytes); Reader reader = pulsarClient.newReader() .topic(topic) .startMessageId(id) .create(); while (true) { Message message = reader.readNext(); // 處理訊息 }
在上面的示例中,例項化一個Reader物件指定的主題和訊息(ID); reader將遍歷主題中msgIdBytes(取值方式取決於應用程式) 之後的訊息。
上面的示例程式碼展示了Reader物件指向特定的訊息(ID),但你也可以使用MessageId.earliest來指向topic上最早可用的訊息,使用MessageId.latest指向最新的訊息。
3.3 Schema
在Pulsar中,所有的訊息資料都在位元組陣列中,訊息schema允許在構造和處理訊息時使用其他型別的資料(從簡單型別(如String)到更復雜的型別)。如果在不指定schema的情況下構造生產者,則生產者只能生成型別為 byte[]的訊息。 下面是一個示例:
Producer<byte[]> producer = client.newProducer() .topic(topic) .create();
以下schema格式目前可用於 Java:
- 無schema 或者位元組陣列schema(使用Schema.BYTES)
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES) .topic("some-raw-bytes-topic") .create();
- String,UTF-8編碼,使用Schema.STRING
Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic("some-string-topic") .create();
- JSON 模式,建立POJO
Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class); Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema) .topic("some-pojo-topic") .create();
結語
Pulsar的特性還有很多,這裡重點介紹了Java客戶端的快速上手教程,後面有時間的話會繼續更新Pulsar系列。
參考文件 http://pulsar.apache.org/docs/en/client-libraries-java/
&n