1. 程式人生 > >rabbitmq--helloworld案例(收-發模型)

rabbitmq--helloworld案例(收-發模型)

rabbitmq 官網教程 http://www.rabbitmq.com/getstarted.html

rabbitmq是什麼,按照官網的說法

RabbitMQ is the most widely deployed open source message broker.

With more than 35,000 production deployments of RabbitMQ world-wide at small startups and large enterprises, RabbitMQ is the most popular open source message broker.

RabbitMQ is lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

RabbitMQ是最廣泛部署的開源訊息代理。

隨著RabbitMQ在全球範圍內在小型初創企業和大型企業中的35000多個生產部署,RabbitMQ是最受歡迎的開源訊息代理。

RabbitMQ是輕量級的,易於部署在本地和雲上。它支援多個訊息傳遞協議。RabbitMQ可以部署在分散式和聯邦配置中,以滿足高規模、高可用性需求。

關於rabbitmq的安裝,我們在上一篇的文章中已經介紹了,本篇我們來寫一個以下rabbitmq的入門案例。

rabbitmq中最簡單的訊息收發模型就是:一方傳送一方直接接收(exchange為"",收發雙方根據相同的queue名稱關聯),如圖:

傳送方程式碼 Sender.java:

package com.tingcream.rabbitmq.helloworld;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 *  (AMQP DEFAULT)   
 * @author jelly
 *
 */
public class Sender {
	
	private final static String QUEUE_NAME = "helloWorld";
	
	public static void main(String[] args) throws  Exception {
		
		ConnectionFactory factory = new ConnectionFactory();
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
		
		
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
 
		/*queueDeclare 
		 * 第一個引數 queue: 佇列名稱
		 * 第二個引數 durable: 是否持久,  true則當rabbitmq broker重啟後佇列仍在
		 * 第三個引數 exclusive: 是否是獨佔佇列(建立者可以使用的私有佇列,斷開後自動刪除)
		 * 第四個引數 autoDelete ,當所有消費者連線斷開時,是否自動刪除佇列  ,true是
		 * 第五個引數 arguments  ,可選引數map
		 */
		//channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
		channel.queueDeclare(QUEUE_NAME, false, false,false, null);    
		String message = "Hello rabbitmq";
		
		//exchagne名稱預設為 (AMQP DEFAULT)  direct
		//channel.basicPublish(exchange, routingKey, props, body);
		
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		
		//關閉連線
		channel.close();
		connection.close();
		
	}

}

接收方程式碼

package com.tingcream.rabbitmq.helloworld;


import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
	
	private final static String QUEUE_NAME = "helloWorld";

	  public static void main(String[] argv)
	      throws Exception {

		    ConnectionFactory factory = new ConnectionFactory();
			//主機  埠  vhost 使用者名稱 密碼
			factory.setHost("192.168.9.102");
			factory.setUsername("rabbitmq");
			factory.setPassword("rabbitmq123");
			factory.setPort(AMQP.PROTOCOL.PORT);
			factory.setVirtualHost("/");
			
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();

		//	channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
		    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		    
		    Consumer consumer = new DefaultConsumer(channel) { 
		    	  @Override
		    	  public void handleDelivery(String consumerTag, Envelope envelope,
		    	                             AMQP.BasicProperties properties, byte[] body)
		    	      throws IOException {
		    	    String message = new String(body, "UTF-8");
		    	    System.out.println(" [x] Received '" + message + "'");
		    	  }
		    };
		    	
		    
		    //收取訊息  自動ack  autoAck為true
		    //channel.basicConsume(queue, autoAck, callback)
		    channel.basicConsume(QUEUE_NAME, true, consumer);
		    
	      
	    }

}

注意:

1、在我們這個案例中,沒有使用到交換機exchange,實際上exchange的名稱為空字元"",交換機型別為direct,收發雙方是通過一個相同的queue名稱——helloworld來產生關聯的。

2、 channel.queueDeclare(QUEUE_NAME, false, false,false, null); 這句宣告queue的程式碼很重要,並且這句程式碼在收發雙方中必須保持一致。若收發雙方宣告queue的特性不同,不僅無法正確地收發訊息,而且在收、發程式啟動時會丟擲異常,因為rabbitmq伺服器上不允許佇列(名稱、特性)的重複宣告。

3  收、發雙方程式碼中都有個channel.queueDeclare(QUEUE_NAME, false, false,false, null)這句程式碼很重要,裡面每個引數(最後一個引數除外)都具有著重要的作用,不同引數值的組合所宣告的佇列的特性也不一樣:

          第一個引數 queue: 佇列名稱
          第二個引數 durable: 是否持久,  true則當rabbitmq broker重啟後佇列仍在
          第三個引數 exclusive: 是否是獨佔佇列(建立者可以使用的私有佇列,斷開後自動刪除)
         第四個引數 autoDelete ,當所有消費者連線斷開時,是否自動刪除佇列  ,true是
        第五個引數 arguments  ,可選引數map,附加引數

4 、在接收方程式碼中有一句 channel.basicConsume(QUEUE_NAME, true, consumer) ,這句程式碼很重要,其中第二個引數true表示是否自動確認(autoAck)為true (這是示例程式,我們為了簡便起見就自動ack了)。然而在大多數情況下一般應該設定為false,讓我們的程式業務處理完畢後,再手動確認,告知rabbitmq伺服器這條訊息我們已確認處理(消費)完畢了。

最後我們啟動rabbitmq伺服器,執行傳送方和接收方程式:

./rabbitmq-server -detached   #後臺程序啟動服務  

http://192.168.9.102:15672/  #訪問管理後臺