RabbitMQ例項詳解
阿新 • • 發佈:2018-12-30
Shell程式碼
- git clone git://github.com/stephansun/samples.git
- samples-jms-plain:使用JMS原生API;
- samples-jms-spring:使用Spring對JMS原生API封裝後的spring-jms;
- samples-jms-spring-remoting:使用spring-jms實現JMS的請求/響應模式,需要用到spring提供的遠端呼叫框架;
- samples-spring-remoting:介紹spring的遠端呼叫框架;
- samples-amqp-plain:使用RabbitMQ提供的AMQP Java客戶端;
- samples-amqp-spring:使用spring對AMQP Java客戶端封裝後的spring-amqp-rabbit;
- samples-amqp-spring-remoting:使用spring-amqp-rabbit實現AMQP的請求/響應模式,需要用到spring提供的遠端呼叫框架;
samples-amqp-plain
pom.xml Xml程式碼- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <
- <version>2.5.0</version>
- <exclusions>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- </
- </dependency>
- </dependencies>
- com.rabbitmq.client.BasicProperties
- com.rabbitmq.client.Channel
- com.rabbitmq.client.Connection
- com.rabbitmq.client.ConnectionFactory
- com.rabbitmq.client.Consumer
- com.rabbitmq.client.MessageProperties
- com.rabbitmq.client.QueueingConsumer
helloworld
Send.java Java程式碼- package stephansun.github.samples.amqp.plain.helloworld;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- publicclass Send {
- privatefinalstatic String QUEUE_NAME = "hello";
- publicstaticvoid main(String[] args) throws IOException {
- // AMQP的連線其實是對Socket做的封裝, 注意以下AMQP協議的版本號,不同版本的協議用法可能不同。
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- // 下一步我們建立一個channel, 通過這個channel就可以完成API中的大部分工作了。
- Channel channel = connection.createChannel();
- // 為了傳送訊息, 我們必須宣告一個佇列,來表示我們的訊息最終要發往的目的地。
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- // 然後我們將一個訊息發往這個佇列。
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println("[" + message + "]");
- // 最後,我們關閉channel和連線,釋放資源。
- channel.close();
- connection.close();
- }
- }
- package stephansun.github.samples.amqp.plain.helloworld;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.ConsumerCancelledException;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.ShutdownSignalException;
- publicclass Recv {
- privatefinalstatic String QUEUE_NAME = "hello";
- publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 注意我們也在這裡聲明瞭一個queue,因為我們有可能在傳送者啟動前先啟動接收者。
- // 我們要確保當從這個queue消費訊息時,這個queue是存在的。
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println("CRTL+C");
- // 這個另外的QueueingConsumer類用來快取服務端推送給我們的訊息。
- // 下面我們準備告訴服務端給我們傳遞存放在queue裡的訊息,因為訊息是由服務端推送過來的。
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(QUEUE_NAME, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println("[" + message + "]");
- }
- }
- }
work queues
Worker.java Java程式碼- package stephansun.github.samples.amqp.plain.workqueues;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.ConsumerCancelledException;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.ShutdownSignalException;
- publicclass Worker {
- privatefinalstatic String QUEUE_NAME = "task_queue";
- publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println("CRTL+C");
- // 這條語句告訴RabbitMQ在同一時間不要給一個worker一個以上的訊息。
- // 或者換一句話說, 不要將一個新的訊息分發給worker知道它處理完了並且返回了前一個訊息的通知標誌(acknowledged)
- // 替代的,訊息將會分發給下一個不忙的worker。
- channel.basicQos(1);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 自動通知標誌
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME, autoAck, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println("r[" + message + "]");
- doWord(message);
- System.out.println("r[done]");
- // 發出通知標誌
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }
- privatestaticvoid doWord(String task) throws InterruptedException {
- for (char ch : task.toCharArray()) {
- if (ch == '.') {
- Thread.sleep(1000);
- }
- }
- }
- }
- Parameters:
- deliveryTag the tag from the received com.rabbitmq.client.AMQP.Basic.GetOk or com.rabbitmq.client.AMQP.Basic.Deliver
- multiple true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.
- package stephansun.github.samples.amqp.plain.workqueues;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- publicclass NewTask {
- // 使用Work Queues (也稱為Task Queues)最主要的想法是分流那些耗時,耗資源的任務,不至於使佇列擁堵。
- privatestatic String getMessage(String[] strings) {
- if (strings.length < 1) {
- return"Hello World!";
- }
- return joinStrings(strings, " ");
- }
- privatestatic String joinStrings(String[] strings, String delimiter) {
- int length = strings.length;
- if (length == 0) {
- return"";
- }
- StringBuilder words = new StringBuilder(strings[0]);
- for (int i = 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- privatefinalstatic String QUEUE_NAME = "task_queue";
- publicstaticvoid main(String[] args) throws IOException {
- String[] strs = new String[] { "First message." };
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 跟helloworld的不同點
- boolean durable = true;
- // 下面這個宣告佇列的佇列名字改了,所以生產者和消費者兩邊的程式都要改成統一的佇列名字。
- channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
- // 有了durable為true,我們可以保證名叫task_queue的佇列即使在RabbitMQ重啟的情況下也不會消失。
- String message = getMessage(strs);
- // 現在我們需要將訊息標記成可持久化的。
- // 如果你需要更強大的保證訊息傳遞,你可以將釋出訊息的程式碼打包到一個事務裡。
- channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
- System.out.println("s[" + message + "]");
- channel.close();
- connection.close();
- }
- }
publish subscribe
EmitLog.java Java程式碼- package stephansun.github.samples.amqp.plain.publishsubscribe;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- publicclass EmitLog {
- // 在前面,我們使用queue,都給定了一個指定的名字。能夠對一個queue命名對於我們來說是很嚴肅的
- // 下面我們需要將worker指定到同一個queue。
- // echange的型別有: direct, topic, headers and fanout.
- privatestaticfinal String EXCHANGE_NAME = "logs";
- publicstaticvoid main(String[] args) throws IOException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // fanout exchange 將它收的所有訊息廣播給它知道的所有佇列。
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String message = getMessage(new String[] { "test" });
- // 如果routingkey存在的話,訊息通過一個routingkey指定的名字路由至佇列
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- System.out.println("sent [" + message + "]");
- channel.close();
- connection.close();
- }
- privatestatic String getMessage(String[] strings) {
- if (strings.length < 1) {
- return"Hello World!";
- }
- return joinStrings(strings, " ");
- }
- privatestatic String joinStrings(String[] strings, String delimiter) {
- int length = strings.length;
- if (length == 0) {
- return"";
- }
- StringBuilder words = new StringBuilder(strings[0]);
- for (int i = 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
- package stephansun.github.samples.amqp.plain.publishsubscribe;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.ConsumerCancelledException;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.ShutdownSignalException;
- publicclass ReceiveLogs {
- // 就像你看到的, 建立了連線後,我們聲明瞭一個exchange,這一步是必須的,因為將訊息傳送到一個並不存在的exchange上是不允許的。
- // 如果還沒有queue繫結到exchange上,訊息將會丟失。
- // 但那對我們來說是ok的。
- // 如果沒有消費者在監聽,我們可以安全地丟棄掉訊息。
- // RabbitMQ中有關訊息模型地核心觀點是,生產者永遠不會直接將訊息發往佇列。
- // 事實上,相當多的生產者甚至根本不知道一個訊息是否已經傳遞給了一個佇列。
- // 相反,生產者只能將訊息傳送給一個exchange。
- // exchange是一個很簡單的東西。
- // 一邊它接收來自生產者的訊息,另一邊它將這些訊息推送到佇列。
- // exchagne必須明確地知道拿它收到的訊息來做什麼。把訊息附在一個特定的佇列上?把訊息附在很多佇列上?或者把訊息丟棄掉。
- // 這些規則在exchange型別裡都有定義。
- privatestaticfinal String EXCHANGE_NAME = "logs";
- publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 建立fanout型別的exchange, 我們叫它logs:
- // 這種型別的exchange將它收到的所有訊息廣播給它知道的所有佇列。
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 臨時佇列(temporary queue)
- // 首先,無論什麼時候連線Rabbit時,我們需要一個fresh的,空的佇列
- // First, whenever we connect to Rabbit we need a fresh, empty queue.
- // 為了做到這一點,我們可以建立一個隨機命名的佇列,或者更好的,就讓服務端給我們選擇一個隨機的佇列名字。
- // 其次,一旦我們關閉消費者的連線,這個臨時佇列應該自動銷燬。
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, EXCHANGE_NAME, "");
- System.out.println("CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println("r[" + message + "]");
- }
- }
- }
routing
EmitLogDirect.java Java程式碼- package stephansun.github.samples.amqp.plain.routing;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- publicclass EmitLogDirect {
- privatestaticfinal String EXCHANGE_NAME = "direct_logs";
- publicstaticvoid main(String[] args) throws IOException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- // diff
- String serverity = getServerity(new String[] { "test" });
- String message = getMessage(new String[] { "test" });
- channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes());
- System.out.println("s[" + serverity + "]:[" + message + "]");
- channel.close();
- connection.close();
- }
- privatestatic String getServerity(String[] strings) {
- return"info";
- }
- privatestatic String getMessage(String[] strings) {
- if (strings.length < 1) {
- return"Hello World!";
- }
- return joinStrings(strings, " ");
- }
- privatestatic String joinStrings(String[] strings, String delimiter) {
- int length = strings.length;
- if (length == 0) {
- return"";
- }
- StringBuilder words = new StringBuilder(strings[0]);
- for (int i = 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
- package stephansun.github.samples.amqp.plain.routing;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.ConsumerCancelledException;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.ShutdownSignalException;
- publicclass ReceiveLogsDirect {
- privatestaticfinal String EXCHANGE_NAME = "direct_logs";
- publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- String queueName = channel.queueDeclare().getQueue();
- String[] strs = new String[] { "info", "waring", "error" };
- for (String str : strs) {
- channel.queueBind(queueName, EXCHANGE_NAME, str);
- }
- System.out.println("CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println("r:[" + routingKey + "]:[" + message + "]");
- }
- }
- }
topics
EmitLogTopic.java Java程式碼- package stephansun.github.samples.amqp.plain.topics;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- publicclass EmitLogTopic {
- privatestaticfinal String EXCHANGE_NAME = "topic_logs";
- publicstaticvoid main(String[] args) throws IOException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // diff
- String routingKey = getServerity(new String[] { "test" });
- String message = getMessage(new String[] { "test" });
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
- System.out.println("s[" + routingKey + "]:[" + message + "]");
- channel.close();
- connection.close();
- }
- privatestatic String getServerity(String[] strings) {
- return"kern.critical";
- }
- privatestatic String getMessage(String[] strings) {
- if (strings.length < 1) {
- return"Hello World!";
- }
- return joinStrings(strings, " ");
- }
- privatestatic String joinStrings(String[] strings, String delimiter) {
- int length = strings.length;
- if (length == 0) {
- return"";
- }
- StringBuilder words = new StringBuilder(strings[0]);
- for (int i = 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
- package stephansun.github.samples.amqp.plain.topics;
- import java.io.IOException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.ConsumerCancelledException;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.ShutdownSignalException;
- publicclass ReceiveLogsTopic {
- // FIXME
- // Some teasers:
- // Will "*" binding catch a message sent with an empty routing key?
- // Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key?
- // How different is "a.*.#" from "a.#"?
- privatestaticfinal String EXCHANGE_NAME = "topic_logs";
- publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- String queueName = channel.queueDeclare().getQueue();
- String[] strs = new String[] { "kern.critical", "A critical kernel error" };
- for (String str : strs) {
- channel.queueBind(queueName, EXCHANGE_NAME, str);
- }
- System.out.println("CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println("r:[" + routingKey + "]:[" + message + "]");
- }
- }
- }
RPC
RPCClient.java Java程式碼- package stephansun.github.samples.amqp.plain.rpc;
- import java.io.IOException;
- import java.util.UUID;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.ConsumerCancelledException;
- import com.rabbitmq.client.QueueingConsumer;
- import com.rabbitmq.client.ShutdownSignalException;
- publicclass RPCClient {
- // FIXME
- // AMQP協議預定義了14種伴隨著訊息的屬性。大多數屬性很少使用到。除了以下這些異常情況:
- // deliveryMode:
- // contentType:
- // replyTo:
- // correlationId:
- // FIXME
- // 為什麼我們忽略掉callback佇列裡的訊息,而不是丟擲錯誤?
- // 這取決於服務端的競爭條件的可能