1. 程式人生 > >RabbitMQ例項詳解

RabbitMQ例項詳解

Shell程式碼  收藏程式碼
  1. git clone git://github.com/stephansun/samples.git  
samples包含7個模組,分別為
  1. samples-jms-plain:使用JMS原生API;
  2. samples-jms-spring:使用Spring對JMS原生API封裝後的spring-jms;
  3. samples-jms-spring-remoting:使用spring-jms實現JMS的請求/響應模式,需要用到spring提供的遠端呼叫框架;
  4. samples-spring-remoting:介紹spring的遠端呼叫框架;
  5. samples-amqp-plain:使用RabbitMQ提供的AMQP Java客戶端;
  6. samples-amqp-spring:使用spring對AMQP Java客戶端封裝後的spring-amqp-rabbit;
  7. samples-amqp-spring-remoting:使用spring-amqp-rabbit實現AMQP的請求/響應模式,需要用到spring提供的遠端呼叫框架;
下面逐一講解

samples-amqp-plain

pom.xml Xml程式碼  收藏程式碼
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>com.rabbitmq</groupId>
  4.         <
    artifactId>amqp-client</artifactId>
  5.         <version>2.5.0</version>
  6.         <exclusions>
  7.             <exclusion>
  8.                 <groupId>commons-cli</groupId>
  9.                 <artifactId>commons-cli</artifactId>
  10.             </exclusion>
  11.         </
    exclusions>
  12.     </dependency>
  13.   </dependencies>
 amqp-client-2.5.0.jar以及它依賴的commons-io-1.2.jar載入進來了,常用的類有: Java程式碼  收藏程式碼
  1. com.rabbitmq.client.BasicProperties  
  2. com.rabbitmq.client.Channel  
  3. com.rabbitmq.client.Connection  
  4. com.rabbitmq.client.ConnectionFactory  
  5. com.rabbitmq.client.Consumer  
  6. com.rabbitmq.client.MessageProperties  
  7. com.rabbitmq.client.QueueingConsumer  

helloworld

Send.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.helloworld;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. publicclass Send {  
  7.     privatefinalstatic String QUEUE_NAME = "hello";  
  8.     publicstaticvoid main(String[] args) throws IOException {  
  9.         // AMQP的連線其實是對Socket做的封裝, 注意以下AMQP協議的版本號,不同版本的協議用法可能不同。
  10.         ConnectionFactory factory = new ConnectionFactory();  
  11.         factory.setHost("localhost");  
  12.         Connection connection = factory.newConnection();  
  13.         // 下一步我們建立一個channel, 通過這個channel就可以完成API中的大部分工作了。
  14.         Channel channel = connection.createChannel();  
  15.         // 為了傳送訊息, 我們必須宣告一個佇列,來表示我們的訊息最終要發往的目的地。
  16.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  
  17.         String message = "Hello World!";  
  18.         // 然後我們將一個訊息發往這個佇列。
  19.         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
  20.         System.out.println("[" + message + "]");  
  21.         // 最後,我們關閉channel和連線,釋放資源。
  22.         channel.close();  
  23.         connection.close();  
  24.     }  
  25. }  
 RabbitMQ預設有一個exchange,叫default exchange,它用一個空字串表示,它是direct exchange型別,任何發往這個exchange的訊息都會被路由到routing key的名字對應的佇列上,如果沒有對應的佇列,則訊息會被丟棄。這就是為什麼程式碼中channel執行basicPulish方法時,第二個引數本應該為routing key,卻被寫上了QUEUE_NAME。 Recv.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.helloworld;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.ConsumerCancelledException;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8. import com.rabbitmq.client.ShutdownSignalException;  
  9. publicclass Recv {  
  10.     privatefinalstatic String QUEUE_NAME = "hello";  
  11.     publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {  
  12.         ConnectionFactory factory = new ConnectionFactory();  
  13.         factory.setHost("localhost");  
  14.         Connection connection = factory.newConnection();  
  15.         Channel channel = connection.createChannel();  
  16.         // 注意我們也在這裡聲明瞭一個queue,因為我們有可能在傳送者啟動前先啟動接收者。
  17.         // 我們要確保當從這個queue消費訊息時,這個queue是存在的。
  18.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  
  19.         System.out.println("CRTL+C");  
  20.         // 這個另外的QueueingConsumer類用來快取服務端推送給我們的訊息。
  21.         // 下面我們準備告訴服務端給我們傳遞存放在queue裡的訊息,因為訊息是由服務端推送過來的。
  22.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  23.         channel.basicConsume(QUEUE_NAME, true, consumer);  
  24.         while (true) {  
  25.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  26.             String message = new String(delivery.getBody());  
  27.             System.out.println("[" + message + "]");  
  28.         }  
  29.     }  
  30. }  
channel.queueDeclare:第一個引數:佇列名字,第二個引數:佇列是否可持久化即重啟後該佇列是否依然存在,第三個引數:該佇列是否時獨佔的即連線上來時它佔用整個網路連線,第四個引數:是否自動銷燬即當這個佇列不再被使用的時候即沒有消費者對接上來時自動刪除,第五個引數:其他引數如TTL(佇列存活時間)等。 channel.basicConsume:第一個引數:佇列名字,第二個引數:是否自動應答,如果為真,訊息一旦被消費者收到,服務端就知道該訊息已經投遞,從而從佇列中將訊息剔除,否則,需要在消費者端手工呼叫channel.basicAck()方法通知服務端,如果沒有呼叫,訊息將會進入unacknowledged狀態,並且當消費者連線斷開後變成ready狀態重新進入佇列,第三個引數,具體消費者類。

work queues

Worker.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.workqueues;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.ConsumerCancelledException;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8. import com.rabbitmq.client.ShutdownSignalException;  
  9. publicclass Worker {  
  10.     privatefinalstatic String QUEUE_NAME = "task_queue";  
  11.     publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {  
  12.         ConnectionFactory factory = new ConnectionFactory();  
  13.         factory.setHost("localhost");  
  14.         Connection connection = factory.newConnection();  
  15.         Channel channel = connection.createChannel();  
  16.         channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  
  17.         System.out.println("CRTL+C");  
  18.         // 這條語句告訴RabbitMQ在同一時間不要給一個worker一個以上的訊息。
  19.         // 或者換一句話說, 不要將一個新的訊息分發給worker知道它處理完了並且返回了前一個訊息的通知標誌(acknowledged)
  20.         // 替代的,訊息將會分發給下一個不忙的worker。
  21.         channel.basicQos(1);  
  22.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  23.         // 自動通知標誌
  24.         boolean autoAck = false;  
  25.         channel.basicConsume(QUEUE_NAME, autoAck, consumer);  
  26.         while (true) {  
  27.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  28.             String message = new String(delivery.getBody());  
  29.             System.out.println("r[" + message + "]");  
  30.             doWord(message);  
  31.             System.out.println("r[done]");  
  32.             // 發出通知標誌
  33.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
  34.         }  
  35.     }  
  36.     privatestaticvoid doWord(String task) throws InterruptedException {  
  37.         for (char ch : task.toCharArray()) {  
  38.             if (ch == '.') {  
  39.                 Thread.sleep(1000);  
  40.             }  
  41.         }  
  42.     }  
  43. }  
在本程式碼中, channel執行basicConsume方法時autoAck為false,這就意味著接受者在收到訊息後需要主動通知RabbitMQ才能將該訊息從佇列中刪除,否則該在接收者跟MQ連線沒斷的情況下,訊息將會變為untracked狀態,一旦接收者斷開連線,訊息重新變為ready狀態。 通知MQ需要呼叫channel.basicAck(int, boolean),如果不呼叫,訊息永遠不會從佇列中消失。 該方法第一個引數為一個標誌,一般是delivery.getEnvelope().getDeliveryTag(),其實就是一個遞增的數字,它表示這個這個佇列中第幾個訊息。 以下解釋錯誤! 第二個引數為true表示通知所有untracked的訊息,false標誌只通知第一個引數對應的那個訊息。不管是true還是false,只要執行了channel.basicAck方法,訊息都會從佇列中刪除。 第二個引數 Java程式碼  收藏程式碼
  1. Parameters:  
  2. deliveryTag the tag from the received com.rabbitmq.client.AMQP.Basic.GetOk or com.rabbitmq.client.AMQP.Basic.Deliver  
  3. multiple true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.  
 我之前錯誤的將and作為的斷句點,認為true通知所有的untracked訊息,包含tag指定的那個,其實應該將 up to and including 作為一個整體理解,通知所有擁有相同tag的untracked訊息(暫時還沒有在程式碼中模擬出這種場景)。尼瑪英語不好害死人啊。參考這個版本的API   NewTask.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.workqueues;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.MessageProperties;  
  7. publicclass NewTask {  
  8.     // 使用Work Queues (也稱為Task Queues)最主要的想法是分流那些耗時,耗資源的任務,不至於使佇列擁堵。 
  9.     privatestatic String getMessage(String[] strings) {  
  10.         if (strings.length < 1) {  
  11.             return"Hello World!";  
  12.         }  
  13.         return joinStrings(strings, " ");  
  14.     }  
  15.     privatestatic String joinStrings(String[] strings, String delimiter) {  
  16.         int length = strings.length;  
  17.         if (length == 0) {  
  18.             return"";  
  19.         }  
  20.         StringBuilder words = new StringBuilder(strings[0]);  
  21.         for (int i = 1; i < length; i++) {  
  22.             words.append(delimiter).append(strings[i]);  
  23.         }  
  24.         return words.toString();  
  25.     }  
  26.     privatefinalstatic String QUEUE_NAME = "task_queue";  
  27.     publicstaticvoid main(String[] args) throws IOException {  
  28.         String[] strs = new String[] { "First message." };  
  29.         ConnectionFactory factory = new ConnectionFactory();  
  30.         factory.setHost("localhost");  
  31.         Connection connection = factory.newConnection();  
  32.         Channel channel = connection.createChannel();  
  33.         // 跟helloworld的不同點
  34.         boolean durable = true;  
  35.         // 下面這個宣告佇列的佇列名字改了,所以生產者和消費者兩邊的程式都要改成統一的佇列名字。
  36.         channel.queueDeclare(QUEUE_NAME, durable, falsefalsenull);  
  37.         // 有了durable為true,我們可以保證名叫task_queue的佇列即使在RabbitMQ重啟的情況下也不會消失。
  38.         String message = getMessage(strs);  
  39.         // 現在我們需要將訊息標記成可持久化的。
  40.         // 如果你需要更強大的保證訊息傳遞,你可以將釋出訊息的程式碼打包到一個事務裡。 
  41.         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
  42.         System.out.println("s[" + message + "]");  
  43.         channel.close();  
  44.         connection.close();  
  45.     }  
  46. }  

 publish subscribe

EmitLog.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.publishsubscribe;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. publicclass EmitLog {  
  7.     // 在前面,我們使用queue,都給定了一個指定的名字。能夠對一個queue命名對於我們來說是很嚴肅的
  8.     // 下面我們需要將worker指定到同一個queue。
  9.     // echange的型別有: direct, topic, headers and fanout.
  10.     privatestaticfinal String EXCHANGE_NAME = "logs";  
  11.     publicstaticvoid main(String[] args) throws IOException {  
  12.         ConnectionFactory factory = new ConnectionFactory();  
  13.         factory.setHost("localhost");  
  14.         Connection connection = factory.newConnection();  
  15.         Channel channel = connection.createChannel();  
  16.         // fanout exchange 將它收的所有訊息廣播給它知道的所有佇列。
  17.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  18.         String message = getMessage(new String[] { "test" });  
  19.         // 如果routingkey存在的話,訊息通過一個routingkey指定的名字路由至佇列
  20.         channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes());  
  21.         System.out.println("sent [" + message + "]");  
  22.         channel.close();  
  23.         connection.close();  
  24.     }  
  25.     privatestatic String getMessage(String[] strings) {  
  26.         if (strings.length < 1) {  
  27.             return"Hello World!";  
  28.         }  
  29.         return joinStrings(strings, " ");  
  30.     }  
  31.     privatestatic String joinStrings(String[] strings, String delimiter) {  
  32.         int length = strings.length;  
  33.         if (length == 0) {  
  34.             return"";  
  35.         }  
  36.         StringBuilder words = new StringBuilder(strings[0]);  
  37.         for (int i = 1; i < length; i++) {  
  38.             words.append(delimiter).append(strings[i]);  
  39.         }  
  40.         return words.toString();  
  41.     }  
  42. }  
 ReceiveLogs.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.publishsubscribe;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.ConsumerCancelledException;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8. import com.rabbitmq.client.ShutdownSignalException;  
  9. publicclass ReceiveLogs {  
  10.     // 就像你看到的, 建立了連線後,我們聲明瞭一個exchange,這一步是必須的,因為將訊息傳送到一個並不存在的exchange上是不允許的。
  11.     // 如果還沒有queue繫結到exchange上,訊息將會丟失。
  12.     // 但那對我們來說是ok的。
  13.     // 如果沒有消費者在監聽,我們可以安全地丟棄掉訊息。
  14.     // RabbitMQ中有關訊息模型地核心觀點是,生產者永遠不會直接將訊息發往佇列。
  15.     // 事實上,相當多的生產者甚至根本不知道一個訊息是否已經傳遞給了一個佇列。
  16.     // 相反,生產者只能將訊息傳送給一個exchange。
  17.     // exchange是一個很簡單的東西。
  18.     // 一邊它接收來自生產者的訊息,另一邊它將這些訊息推送到佇列。
  19.     // exchagne必須明確地知道拿它收到的訊息來做什麼。把訊息附在一個特定的佇列上?把訊息附在很多佇列上?或者把訊息丟棄掉。
  20.     // 這些規則在exchange型別裡都有定義。
  21.     privatestaticfinal String EXCHANGE_NAME = "logs";  
  22.     publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {  
  23.         ConnectionFactory factory = new ConnectionFactory();  
  24.         factory.setHost("localhost");  
  25.         Connection connection = factory.newConnection();  
  26.         Channel channel = connection.createChannel();  
  27.         // 建立fanout型別的exchange, 我們叫它logs:
  28.         // 這種型別的exchange將它收到的所有訊息廣播給它知道的所有佇列。
  29.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  30.         // 臨時佇列(temporary queue)
  31.         // 首先,無論什麼時候連線Rabbit時,我們需要一個fresh的,空的佇列
  32.         // First, whenever we connect to Rabbit we need a fresh, empty queue.
  33.         // 為了做到這一點,我們可以建立一個隨機命名的佇列,或者更好的,就讓服務端給我們選擇一個隨機的佇列名字。
  34.         // 其次,一旦我們關閉消費者的連線,這個臨時佇列應該自動銷燬。
  35.         String queueName = channel.queueDeclare().getQueue();  
  36.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
  37.         System.out.println("CTRL+C");  
  38.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  39.         channel.basicConsume(queueName, true, consumer);  
  40.         while (true) {  
  41.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  42.             String message = new String(delivery.getBody());  
  43.             System.out.println("r[" + message + "]");  
  44.         }  
  45.     }     
  46. }  
 釋出訂閱,本程式碼演示的是fanout exchange,這種型別的exchange將它收到的所有訊息直接傳送給所有跟它繫結的佇列,這裡說了直接,是因為rouring key對於fanout exchange來說沒有任何意義!不管一個佇列以怎樣的routing key和fanout exhange繫結,只要他們綁定了,訊息就會送到佇列。程式碼中傳送端將訊息發到logs名字的fanout exchange,routing key為空字串,你可以將它改成任何其他值或者null試試看。另外,接收端程式碼使用channel聲明瞭一個臨時佇列,並將這個佇列通過空字串的routing key繫結到fanout exchange。這個臨時佇列的名字的隨機取的,如:amq.gen-U0srCoW8TsaXjNh73pnVAw==,臨時佇列在後面的請求響應模式中有用到。

routing

EmitLogDirect.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.routing;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. publicclass EmitLogDirect {  
  7.     privatestaticfinal String EXCHANGE_NAME = "direct_logs";  
  8.     publicstaticvoid main(String[] args) throws IOException {  
  9.         ConnectionFactory factory = new ConnectionFactory();  
  10.         factory.setHost("localhost");  
  11.         Connection connection = factory.newConnection();  
  12.         Channel channel = connection.createChannel();  
  13.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  14.         // diff
  15.         String serverity = getServerity(new String[] { "test" });  
  16.         String message = getMessage(new String[] { "test" });  
  17.         channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes());  
  18.         System.out.println("s[" + serverity + "]:[" + message + "]");  
  19.         channel.close();  
  20.         connection.close();  
  21.     }  
  22.     privatestatic String getServerity(String[] strings) {  
  23.         return"info";  
  24.     }  
  25.     privatestatic String getMessage(String[] strings) {  
  26.         if (strings.length < 1) {  
  27.             return"Hello World!";  
  28.         }  
  29.         return joinStrings(strings, " ");  
  30.     }  
  31.     privatestatic String joinStrings(String[] strings, String delimiter) {  
  32.         int length = strings.length;  
  33.         if (length == 0) {  
  34.             return"";  
  35.         }  
  36.         StringBuilder words = new StringBuilder(strings[0]);  
  37.         for (int i = 1; i < length; i++) {  
  38.             words.append(delimiter).append(strings[i]);  
  39.         }  
  40.         return words.toString();  
  41.     }  
  42. }  
 ReceiveLogsDirect.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.routing;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.ConsumerCancelledException;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8. import com.rabbitmq.client.ShutdownSignalException;  
  9. publicclass ReceiveLogsDirect {  
  10.     privatestaticfinal String EXCHANGE_NAME = "direct_logs";  
  11.     publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {  
  12.         ConnectionFactory factory = new ConnectionFactory();  
  13.         factory.setHost("localhost");  
  14.         Connection connection = factory.newConnection();  
  15.         Channel channel = connection.createChannel();  
  16.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  17.         String queueName = channel.queueDeclare().getQueue();  
  18.         String[] strs = new String[] { "info""waring""error" };  
  19.         for (String str : strs) {  
  20.             channel.queueBind(queueName, EXCHANGE_NAME, str);  
  21.         }  
  22.         System.out.println("CTRL+C");  
  23.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  24.         channel.basicConsume(queueName, true, consumer);  
  25.         while (true) {  
  26.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  27.             String message = new String(delivery.getBody());  
  28.             String routingKey = delivery.getEnvelope().getRoutingKey();  
  29.             System.out.println("r:[" + routingKey + "]:[" + message + "]");  
  30.         }  
  31.     }  
  32. }  
 本程式碼演示了另外一種exchange,direct exchange,該exchange根據routing key將訊息發往使用該routing key和exchange繫結的一個或者多個佇列裡,如果沒找到,則訊息丟棄。本程式碼中可以啟動3個接收端,分別使用info,warning,error作為routing key,代表3種級別的日誌。只要將不同級別的日誌發往不同接收端只需將日誌級別當作routing key。

topics

EmitLogTopic.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.topics;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. publicclass EmitLogTopic {  
  7.     privatestaticfinal String EXCHANGE_NAME = "topic_logs";  
  8.     publicstaticvoid main(String[] args) throws IOException {  
  9.         ConnectionFactory factory = new ConnectionFactory();  
  10.         factory.setHost("localhost");  
  11.         Connection connection = factory.newConnection();  
  12.         Channel channel = connection.createChannel();  
  13.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  14.         // diff
  15.         String routingKey = getServerity(new String[] { "test" });  
  16.         String message = getMessage(new String[] { "test" });  
  17.         channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());  
  18.         System.out.println("s[" + routingKey + "]:[" + message + "]");  
  19.         channel.close();  
  20.         connection.close();  
  21.     }  
  22.     privatestatic String getServerity(String[] strings) {  
  23.         return"kern.critical";  
  24.     }  
  25.     privatestatic String getMessage(String[] strings) {  
  26.         if (strings.length < 1) {  
  27.             return"Hello World!";  
  28.         }  
  29.         return joinStrings(strings, " ");  
  30.     }  
  31.     privatestatic String joinStrings(String[] strings, String delimiter) {  
  32.         int length = strings.length;  
  33.         if (length == 0) {  
  34.             return"";  
  35.         }  
  36.         StringBuilder words = new StringBuilder(strings[0]);  
  37.         for (int i = 1; i < length; i++) {  
  38.             words.append(delimiter).append(strings[i]);  
  39.         }  
  40.         return words.toString();  
  41.     }  
  42. }  
 ReceiveLogsTopic.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.topics;  
  2. import java.io.IOException;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.ConsumerCancelledException;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8. import com.rabbitmq.client.ShutdownSignalException;  
  9. publicclass ReceiveLogsTopic {  
  10.     // FIXME
  11.     // Some teasers:
  12.     // Will "*" binding catch a message sent with an empty routing key?
  13.     // Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key?
  14.     // How different is "a.*.#" from "a.#"?
  15.     privatestaticfinal String EXCHANGE_NAME = "topic_logs";  
  16.     publicstaticvoid main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {  
  17.         ConnectionFactory factory = new ConnectionFactory();  
  18.         factory.setHost("localhost");  
  19.         Connection connection = factory.newConnection();  
  20.         Channel channel = connection.createChannel();  
  21.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  22.         String queueName = channel.queueDeclare().getQueue();  
  23.         String[] strs = new String[] { "kern.critical""A critical kernel error" };  
  24.         for (String str : strs) {  
  25.             channel.queueBind(queueName, EXCHANGE_NAME, str);  
  26.         }  
  27.         System.out.println("CTRL+C");  
  28.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  29.         channel.basicConsume(queueName, true, consumer);  
  30.         while (true) {  
  31.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  32.             String message = new String(delivery.getBody());  
  33.             String routingKey = delivery.getEnvelope().getRoutingKey();  
  34.             System.out.println("r:[" + routingKey + "]:[" + message + "]");  
  35.         }  
  36.     }  
  37. }  
 本程式碼演示了最後一種型別的exchange,topic exchange,topic exchange和direct exchange最大的不同就是它繫結的routing key是一種模式,而不是簡單的一個字串。為什麼要有模式(Patten)這個概念?模式可以理解為對事物描述的一種抽象。以程式碼種的日誌系統為例,使用direct exchange只能區別info,error,debug等等不同級別的日誌,但是實際上不光有不同級別的日誌,還有不同來源的日誌,如作業系統核心的日誌,定時指令碼等, 使用模式就可以用<level>.<source>表示,更強大的是,模式允許使用萬用字元,*代表一個單詞,#代表一個多個單詞。

RPC

RPCClient.java Java程式碼  收藏程式碼
  1. package stephansun.github.samples.amqp.plain.rpc;  
  2. import java.io.IOException;  
  3. import java.util.UUID;  
  4. import com.rabbitmq.client.AMQP.BasicProperties;  
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8. import com.rabbitmq.client.ConsumerCancelledException;  
  9. import com.rabbitmq.client.QueueingConsumer;  
  10. import com.rabbitmq.client.ShutdownSignalException;  
  11. publicclass RPCClient {  
  12.     // FIXME
  13.     // AMQP協議預定義了14種伴隨著訊息的屬性。大多數屬性很少使用到。除了以下這些異常情況:
  14.     // deliveryMode:
  15.     // contentType:
  16.     // replyTo:
  17.     // correlationId: 
  18.     // FIXME
  19.     // 為什麼我們忽略掉callback佇列裡的訊息,而不是丟擲錯誤?
  20.     // 這取決於服務端的競爭條件的可能