RabbitMQ學習總結(3)——入門例項教程詳解
阿新 • • 發佈:2019-01-08
一、起航
本章節,柯南君將從幾個層面,用官網例子講解一下RabbitMQ的實操經典程式案例,讓大家重新回到經典“Hello world!”(The simplest thing that does something )時代,RabbitMQ 支援N多種客戶端(client),這裡無法一一講解,暫定java client,有時間的情況下,在彌補一下。事先,先普及一下圖示(我們會在下面的事例中,會大量用到,所以先普及一下,便於識別,最終更好理解事例的含義)
1、圖示概念
① producting(生產者):在程式中 傳送訊息的一端,我們暫且稱之為 生產者,在這裡用“p”表示
② queue(佇列):佇列是一個郵箱的名字。它住在RabbitMQ。儘管訊息流經RabbitMQ和您的應用程式,他們只可以儲存在一個佇列中。佇列是不受任何限制,它可以儲存儘可能多的資訊(按你興趣來了),它本質上是一個無限緩衝區。許多生產商可以傳送訊息到一個佇列,許多消費者可以嘗試接收資料從一個佇列。
③ consuming(消費者):消費者和生產者是對應的,較為相似的意思;在這裡,我用“C”表示
2、The Java client library
RabbitMQ 中AMQP這是一個開放的、通用的協議訊息。有許多客戶AMQP在許多不同的語言。我們將使用提供的Java客戶機RabbitMQ。我們需要下載(Download) client library package,並要核實每個jar包,解壓到相應位置,如下圖所示:
第二步:選擇合適的下載,比如我下載了zip包,如圖所示:
第三步:Unzip it(解壓它) 到你的working directory(工作目錄)中 and grab (並且獲得)你的jar包檔案
- $ unzip rabbitmq-java-client-bin-*.zip
- $ cp rabbitmq-java-client-bin-*/*.jar ./
二、程式案例
1) "Hello World"
在這部分教程中我們將用Java寫兩個程式,傳送一個訊息的生產者,消費者接收資訊並打印出來。我們會掩蓋一些細節的Java API,集中在這個非常簡單的東西開始。這是一個“Hello World”的訊息。在下面的圖中,“P”是我們的生產和“C”是我們的消費者。中間的框是一個佇列,訊息緩衝區RabbitMQ保持代表消費者。① sending (傳送)
首先 讓The sender(訊息傳送者)傳送訊息並且讓我們的receiver (訊息接收者)接收訊息,The sender(訊息傳送者)將會connect to(連線)RabbitMQ,傳送一個single message(單一的資訊),然後exit(退出)。- 在send.java 中,我們需要import一些class ,如下所示:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
- set up(設定)類和queue的name
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
- then 我們create 一個connection (連線)到server(服務端)
onnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();備註:
- 這個connection 是抽象的socket connection連結;
- 負責協議版本(protocol version negotigation)和身份認證(authentication );
- 我們在本地機器上連線到一個代理即 localhost ,如果我們想要連線到代理不同機器上我們簡單的指定其名稱或者IP地址即可;
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");訊息內容是一個位元組陣列,所以你可以編碼任何你喜歡的。最後,我們關閉通道和連線;
channel.close(); connection.close();問題: 如果 sending doesn‘t work! 我們將怎麼辦?why? 如果這是你第一次使用RabbitMQ並且你看不到“傳送的”訊息,那麼你可能抓耳撓腮沒有足夠的空閒磁碟空間(預設情況下它需要至少1 gb免費),因此拒絕接受訊息。檢查代理日誌檔案確認,如果有必要減少限制。配置檔案的文件將向您展示如何設定disk_free_limit。 接下來的是send.java所有原始碼: [java] view plaincopyprint?
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Channel;
- publicclass Send {
- privatefinalstatic String QUEUE_NAME = "hello";
- publicstaticvoid main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- channel.close();
- connection.close();
- }
- }
② Receiving (接收)
這就是我們的傳送者。我們的接收器是將訊息從RabbitMQ,所以不像傳送方釋出一個訊息,我們將保持執行監聽訊息並打印出來- The code (in Recv.java) has almost the same imports as Send:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer;額外的QueueingConsumer是一個類,我們將使用緩衝訊息推到我們的伺服器。設定傳送者一樣,我們開啟一個連線和一個通道,並宣佈我們將使用的佇列。注意這與佇列,傳送釋出。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }注意,我們在這裡宣告佇列。因為我們可能會在傳送方之前開始啟動接收方,我們要確保佇列存在之前我們嘗試使用訊息。我們要告訴伺服器提供我們從佇列的訊息。因為它將非同步訊息,我們提供一個回撥物件的形式,將緩衝的訊息,直到我們準備使用它們。 QueueingConsumer要做什麼呢?
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }
QueueingConsumer.nextDelivery()塊,直到另一個來自伺服器的訊息交付。 下面是Recv.java 原始碼:[java] view plaincopyprint?
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.QueueingConsumer;
- publicclass Recv {
- privatefinalstatic String QUEUE_NAME = "hello";
- publicstaticvoid main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(QUEUE_NAME, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
- }
- }
- }