1. 程式人生 > >RabbitMQ系列—Java使用RabbitMQ

RabbitMQ系列—Java使用RabbitMQ

RabbitMQ官網介紹了,它支援六種應用場景:簡單佇列、工作佇列、釋出/訂閱、路由模式、Topic主題模式、RPC,接下來分別介紹。

建立一個Maven專案命名rabbitmq,並引入rabbitmq依賴。

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.2.0</version>
</dependency>

簡單佇列

其中P是生產者,紅色部分是佇列,C是消費者。邏輯就是生產者生產訊息,將訊息放到佇列裡,消費者負責在佇列取出訊息進行消費,其中佇列是rabbitmq實現的,所以我們需要實現生產者和消費者。

建立連線工具類ConnectionUtils。

package com.rabbitmq.util;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * rabbitmq連線工具類
 * @author Administrator
 *
 */
public class ConnectionUtils {
	/**
	 * 獲取連線
	 * @return
	 * @throws IOException
	 * @throws TimeoutException
	 */
	public static Connection getConnection() throws IOException, TimeoutException{
		ConnectionFactory factory = new ConnectionFactory();
		// 設定服務地址
		factory.setHost("127.0.0.1");
		// 埠
		factory.setPort(5672);
		// vhost
		factory.setVirtualHost("/vhost_test");
		// 使用者名稱
		factory.setUsername("admin");
		// 密碼
		factory.setPassword("123456");
		return factory.newConnection();
	}
	/**
	 * 關閉連線
	 * @param channel
	 * @param con
	 */
	public static void close(Channel channel,Connection con){
		if(channel != null){
			try {
				channel.close();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
		}
		if(con != null){
			try {
				con.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

}

建立生產者Sender

package com.rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
/**
 * 簡單訊息佇列——生產者
 * @author Administrator
 *
 */
public class Sender {
	/**
	 * 佇列名稱
	 */
	private static final String QUEUE = "test_simple_queue";
	
	public static void main(String[] args) {
		Connection con = null;
		Channel channel = null;
		try {
			// 獲取連線
			con = ConnectionUtils.getConnection();
			// 從連線中建立通道
			channel = con.createChannel();
			// 宣告一個佇列
			channel.queueDeclare(QUEUE, false, false, false, null);
			// 訊息內容
			String msg = "simple queue hello!";
			// 傳送訊息
			channel.basicPublish("", QUEUE, null, msg.getBytes());
			System.out.println("send success");
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		} finally {
			// 關閉連線
			ConnectionUtils.close(channel, con);
		}
		
	}

}

channel.queueDeclare()方法的作用是宣告一個佇列,它只在所宣告的佇列不存在的情況下生效,如果佇列已經存在在不做任何操作,在此方法中具有如下引數:

  • String queue:佇列名稱。
  • boolean durable:是否持久化。佇列模式是在記憶體中的,如果重啟rabbitmq訊息會丟失,如果設定為true,會儲存到erlang自帶的資料庫,重啟後可以恢復。
  • boolean exclusive:是否排外。作用一,連線關閉後是否自動刪除當前佇列;作用二,是否私有佇列,如果為true,則其他通道不能訪問當前佇列。
  • boolean autoDelete:當所有消費者客戶端斷開連線時是否自動刪除佇列。
  • Map<String, Object> arguments:其他引數。

channel.basicPublish()方法的作用是傳送訊息到佇列,它具有如下引數:

  • String exchange:交換機名稱,簡單佇列用不到交換機,此處寫""空字串即可。
  • String routingKey:佇列對映的路由key,此處就是佇列名稱。
  • BasicProperties props:訊息的其他屬性。
  • byte[] body:傳送資訊的主體。rabbitmq一般不用來發送大資料型別的訊息。

接下來執行Sender生產者。

點選此佇列,點選Get Message,就可以消費這個訊息了,這說明訊息就成功傳送到隊列了。

接下來構建消費者,消費者有兩種寫法,一種是舊的API,一種是新的API,我當前使用的rabbitmq的jar包是5.2.0的,已經不支援舊的API了,就不介紹了,只介紹新的API。

消費者

package com.rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.util.ConnectionUtils;

/**
 * 簡單佇列——消費者
 * 
 * @author Administrator
 *
 */
public class Recver {
	/**
	 * 佇列名稱,和生產者的佇列名稱必須保持一致
	 */
	private static final String QUEUE = "test_simple_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 獲取連線
		Connection con = ConnectionUtils.getConnection();
		// 從連線中建立通道
		Channel channel = con.createChannel();
		// 宣告佇列
		channel.queueDeclare(QUEUE, false, false, false, null);
		// 獲取訊息
		Consumer consumer = new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg = new String(body, "utf-8");
				System.out.println("接收到訊息——" + msg);
			}

		};
		// 監聽佇列
		channel.basicConsume(QUEUE, true, consumer);
	}
}

接下來啟動消費者,再啟動生產者,發現消費者控制檯列印了訊息

在接下來關掉消費者,啟動兩次生產者生產兩條訊息,再次啟動消費者控制檯列印

簡單佇列缺點:

耦合度高,佇列名在一端改動,另一端也要改動。生產者和消費者一一對應,不支援多個消費者。