RabbitMQ系列—Java使用RabbitMQ
阿新 • • 發佈:2018-12-14
RabbitMQ官網介紹了,它支援六種應用場景:簡單佇列、工作佇列、釋出/訂閱、路由模式、Topic主題模式、RPC,接下來分別介紹。
建立一個Maven專案命名rabbitmq,並引入rabbitmq依賴。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
簡單佇列
其中P是生產者,紅色部分是佇列,C是消費者。邏輯就是生產者生產訊息,將訊息放到佇列裡,消費者負責在佇列取出訊息進行消費,其中佇列是rabbitmq實現的,所以我們需要實現生產者和消費者。
建立連線工具類ConnectionUtils。
package com.rabbitmq.util; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * rabbitmq連線工具類 * @author Administrator * */ public class ConnectionUtils { /** * 獲取連線 * @return * @throws IOException * @throws TimeoutException */ public static Connection getConnection() throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); // 設定服務地址 factory.setHost("127.0.0.1"); // 埠 factory.setPort(5672); // vhost factory.setVirtualHost("/vhost_test"); // 使用者名稱 factory.setUsername("admin"); // 密碼 factory.setPassword("123456"); return factory.newConnection(); } /** * 關閉連線 * @param channel * @param con */ public static void close(Channel channel,Connection con){ if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(con != null){ try { con.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
建立生產者Sender
package com.rabbitmq.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; /** * 簡單訊息佇列——生產者 * @author Administrator * */ public class Sender { /** * 佇列名稱 */ private static final String QUEUE = "test_simple_queue"; public static void main(String[] args) { Connection con = null; Channel channel = null; try { // 獲取連線 con = ConnectionUtils.getConnection(); // 從連線中建立通道 channel = con.createChannel(); // 宣告一個佇列 channel.queueDeclare(QUEUE, false, false, false, null); // 訊息內容 String msg = "simple queue hello!"; // 傳送訊息 channel.basicPublish("", QUEUE, null, msg.getBytes()); System.out.println("send success"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 關閉連線 ConnectionUtils.close(channel, con); } } }
channel.queueDeclare()方法的作用是宣告一個佇列,它只在所宣告的佇列不存在的情況下生效,如果佇列已經存在在不做任何操作,在此方法中具有如下引數:
- String queue:佇列名稱。
- boolean durable:是否持久化。佇列模式是在記憶體中的,如果重啟rabbitmq訊息會丟失,如果設定為true,會儲存到erlang自帶的資料庫,重啟後可以恢復。
- boolean exclusive:是否排外。作用一,連線關閉後是否自動刪除當前佇列;作用二,是否私有佇列,如果為true,則其他通道不能訪問當前佇列。
- boolean autoDelete:當所有消費者客戶端斷開連線時是否自動刪除佇列。
- Map<String, Object> arguments:其他引數。
channel.basicPublish()方法的作用是傳送訊息到佇列,它具有如下引數:
- String exchange:交換機名稱,簡單佇列用不到交換機,此處寫""空字串即可。
- String routingKey:佇列對映的路由key,此處就是佇列名稱。
- BasicProperties props:訊息的其他屬性。
- byte[] body:傳送資訊的主體。rabbitmq一般不用來發送大資料型別的訊息。
接下來執行Sender生產者。
點選此佇列,點選Get Message,就可以消費這個訊息了,這說明訊息就成功傳送到隊列了。
接下來構建消費者,消費者有兩種寫法,一種是舊的API,一種是新的API,我當前使用的rabbitmq的jar包是5.2.0的,已經不支援舊的API了,就不介紹了,只介紹新的API。
消費者
package com.rabbitmq.simple;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.util.ConnectionUtils;
/**
* 簡單佇列——消費者
*
* @author Administrator
*
*/
public class Recver {
/**
* 佇列名稱,和生產者的佇列名稱必須保持一致
*/
private static final String QUEUE = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連線
Connection con = ConnectionUtils.getConnection();
// 從連線中建立通道
Channel channel = con.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE, false, false, false, null);
// 獲取訊息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("接收到訊息——" + msg);
}
};
// 監聽佇列
channel.basicConsume(QUEUE, true, consumer);
}
}
接下來啟動消費者,再啟動生產者,發現消費者控制檯列印了訊息
在接下來關掉消費者,啟動兩次生產者生產兩條訊息,再次啟動消費者控制檯列印
簡單佇列缺點:
耦合度高,佇列名在一端改動,另一端也要改動。生產者和消費者一一對應,不支援多個消費者。