1. 程式人生 > >RabbitMQ訊息分發模式----"Hello World"模式

RabbitMQ訊息分發模式----"Hello World"模式

"Hello World"模式:即最簡單的分發模式:一個訊息生產者,一個訊息消費者。


生產者端程式碼:

<span style="font-size:14px;">public class MsgSender {  
            private final static String QUEUE_NAME = "hello";  
          
            public static void main(String[] args) throws IOException {  
                /** 
                 * 建立連線連線到RabbitMQ 
                 */  
                ConnectionFactory factory = new ConnectionFactory();  
                // 設定MabbitMQ所在主機ip或者主機名  
                factory.setHost("127.0.0.1");  
                // 建立一個連線  
                Connection connection = factory.newConnection();  
                // 建立一個頻道  
                Channel channel = connection.createChannel();  
                // 指定一個佇列  
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
                // 傳送的訊息  
                String message = "hello world!";  
                // 往佇列中發出一條訊息  
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
                System.out.println(" [x] Sent '" + message + "'");  
                // 關閉頻道和連線  
                channel.close();  
                connection.close();  
            }  
}</span>


ConnectionFactory、Connection、Channel

       ConnectionFactory、Connection、Channel,這三個都是RabbitMQ對外提供的API中最基本的物件。不管是伺服器端還是客戶端都會首先建立這三類物件。
       ConnectionFactory為Connection的製造工廠。

       Connection是與RabbitMQ伺服器的socket連結,它封裝了socket協議及身份驗證相關部分邏輯。

       Channel是我們與RabbitMQ打交道的最重要的一個介面,大部分的業務操作是在Channel這個介面中完成的,包括定義Queue、定義Exchange、繫結Queue與Exchange、釋出訊息等。

Queue

       Queue(佇列)是RabbitMQ的內部物件,用於儲存訊息,用下圖表示。


       RabbitMQ中的訊息都只能儲存在Queue中,生產者(上圖中的P---productor)生產訊息並最終投遞到Queue中,消費者(上圖中的C---Consumer)可以從這個(hello)Queue(Hello)中獲取訊息並消費。

       佇列是由Channel宣告的,而且這個操作是冪等的。同名的佇列多次宣告也只會建立一次。我們傳送訊息就是向這個宣告的佇列裡傳送訊息。

消費者端程式碼:

public class MsgReceiver {  
            private final static String QUEUE_NAME = "hello";  
          
            public static void main(String[] argv) throws IOException, InterruptedException {  
                ConnectionFactory factory = new ConnectionFactory();  
                factory.setHost("127.0.0.1");  
                // 開啟連線和建立頻道,與傳送端一樣  
                Connection connection = factory.newConnection();  
                Channel channel = connection.createChannel();  
          
                // 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。  
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
                // 建立佇列消費者  
                QueueingConsumer consumer = new QueueingConsumer(channel);  
                System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
                // 指定消費佇列  <span style="font-family: Arial, Helvetica, sans-serif;">true表示不用確認即可,只要接收,不管有沒有成功處理都會當做成功 false只有當成功處理完畢並確認後才算</span>
                channel.basicConsume(QUEUE_NAME, true, consumer);  
                while (true) {  
                    // nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)  
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
                    String message = new String(delivery.getBody());  
                    System.out.println(" [x] Received '" + message + "'"); 
                    // 返回確認狀態 當channel.basicConsume()第二個引數為false時
            <span style="white-space:pre">	</span>    // 如果沒有這一行確認接收則關閉消費者再次啟動還會收到已經接收的訊息。。。。
                    // 為true時,如果加上這行報錯channel error; protocol method: #method<channel.close>(reply-code=406,
                    // reply-text=PRECONDITION_FAILED - unknown delivery tag 1,
              <span style="white-space:pre">	</span>    // class-id=60, method-id=80)
                    // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
                }
            }
}


從消費者的程式碼中,我們可以看到ConnectionFactory、Connection、Channel這三個物件都還是會建立。而佇列消費者這裡再次宣告一遍,是為了防止先啟動消費者------當為消費者指定佇列時,如果RabbitMQ伺服器上未宣告過該佇列,就會丟擲IO異常。