1. 程式人生 > >RabbitMQ Java "Hello World" 點對點模式

RabbitMQ Java "Hello World" 點對點模式

本系列教程主要來自於官網入門教程的翻譯,然後自己進行了部分的修改與實驗,內容僅供參考。

Java入門例項

一個producer傳送訊息,一個接收者接收訊息,並在控制檯打印出來。如下圖:

                (P) -> [|||] -> (C)

傳送端:Send.java 連線到RabbitMQ(此時服務需要啟動),傳送一條資料,然後退出。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send
{
	//佇列名稱
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws java.io.IOException
	{
		/**
		 * 建立連線連線到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		//設定MabbitMQ所在主機ip或者主機名
		factory.setHost("localhost");
		//建立一個連線
		Connection connection = factory.newConnection();
		//建立一個頻道
		Channel channel = connection.createChannel();
		//指定一個佇列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//傳送的訊息
		String message = "hello world!";
		//往佇列中發出一條訊息
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		//關閉頻道和連線
		channel.close();
		connection.close();
	 }
}

值得注意的是佇列只會在它不存在的時候建立,多次宣告並不會重複建立。資訊的內容是位元組陣列,也就意味著你可以傳遞任何資料。

接收端:Recv.java 不斷等待伺服器推送訊息,然後在控制檯輸出。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv
{
	//佇列名稱
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws java.io.IOException,
			java.lang.InterruptedException
	{
		//開啟連線和建立頻道,與傳送端一樣
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		//宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		//建立佇列消費者
		QueueingConsumer consumer = new QueueingConsumer(channel);
		//指定消費佇列
		channel.basicConsume(QUEUE_NAME, true, consumer);
		while (true)
		{
			//nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
		}

	}
}

分別執行Send.java和Recv.java 順序無所謂。前提RabbitMQ服務開啟。

執行結果:

[x]Sent 'hello world!'

----------------------------------------

[*] Waiting for messages. To exitpress CTRL+C

[x] Received 'hello world!'