rabbitmq 工作模式 一
學習工作模式前,先看一下rabbitmq 給的helloworld案例
這是傳統的一對一,,,, 也就是一臺機器生產,一臺機器接收....
為了更好的瞭解程式碼....我這裡演示的話用底層的程式碼來演示....不整合框架了
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.3</version> </dependency>
這是rabbitmq 提供的依賴......匯入一下就可以測試了
下面是我寫的生產類...
測試是沒問題的
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer01 { //佇列名稱 private static final String QUEUE = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; //與 Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); //設定ip factory.setHost("localhost"); //設定埠 factory.setPort(5672); //設定賬號密碼 factory.setUsername("guest"); //預設賬號密碼都是guest factory.setPassword("guest"); //設定虛擬空間 factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器 //建立與RabbitMQ服務的TCP連線 connection = factory.newConnection(); //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話 channel = connection.createChannel(); //這有五個引數 /*** * 宣告佇列,如果Rabbit中沒有此佇列將自動建立* * param1:佇列名稱* * param2:是否持久化* rabbit 關閉了該佇列是否存在.. * param3:佇列是否獨佔此連線* 如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式 * param4:佇列不再使用時是否自動刪除此佇列* 該佇列不使用了就會刪除該佇列 * * param5:佇列引數*/ channel.queueDeclare(QUEUE, true, false, false,null ); String message = "你愛到極致的人,不會愛你"; //釋出訊息 /** * String exchange, String routingKey, BasicProperties props, byte[] body * * param1 : 交換機 後面我會講這裡是指定交換機,使用預設的交換機 * param2 : 路由key,這也是先不寫,,後面講 大概作用是用於Exchange(交換機)將訊息轉發到指定的訊息佇列 * param3 :訊息包含的屬性 * 訊息體 * */ channel.basicPublish("", QUEUE,null ,message.getBytes() ); System.out.println("Send Message is:'" + message + "'"); }catch (Exception e){ }finally { if (channel != null){ channel.close(); } if (connection != null){ connection.close(); } } } }
然後下面是我寫的消費類 ,, 連線mq程式碼都一樣來著...關注傳送訊息和接收訊息的方法就行 ...
同樣測試過,程式碼是可執行的
import com.rabbitmq.client.*; import java.io.IOException; public class Consumer01 { //佇列名稱 private static final String QUEUE = "helloworld"; public static void main(String[] args){ Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); //設定ip factory.setHost("localhost"); //設定埠 factory.setPort(5672); //設定賬號密碼 factory.setUsername("guest"); //預設賬號密碼都是guest factory.setPassword("guest"); //設定虛擬空間 factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器 //建立與RabbitMQ服務的TCP連線 connection = factory.newConnection(); //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話 channel = connection.createChannel(); //這有五個引數 /*** * 宣告佇列,如果Rabbit中沒有此佇列將自動建立* * param1:佇列名稱* * param2:是否持久化* rabbit 關閉了該佇列是否存在.. * param3:佇列是否獨佔此連線* 如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式 * param4:佇列不再使用時是否自動刪除此佇列* 該佇列不使用了就會刪除該佇列 * * param5:佇列引數*/ channel.queueDeclare(QUEUE, true, false, false,null ); //這裡其實可以不用宣告佇列的,因為 生產者已經宣告過了,但是如果生產者後釋出服務,佇列沒有宣告,消費者去監聽佇列..會報錯 // 建立預設消費方法 DefaultConsumer consumer = new DefaultConsumer(channel) { // 重寫監聽方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("receive message.."+new String(body,"utf-8")); } }; // 監聽佇列 /** * String queue, boolean autoAck, Consumer callback * * param1 : 佇列名稱 * param2 : 是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在 * param3 : 消費物件,,包含消費方法 * */ channel.basicConsume(QUEUE,true , consumer); }catch (Exception e){ } } }
以上的話就是rabbitmq提供的案例
一臺生產者 , 一臺消費者
emmm,,,
這裡講的工作的模式是 workqueues
WorkQueues
對比helloword案例,這裡多了個消費者..
應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度。
測試
我們啟動倆次消費者
然後用剛剛寫的生產者,傳送五條資訊
然後我們看消費者列印的資訊
結果 :
mq workqueues 使用的是輪詢方式講資訊平均發給消費者 ,
消費者會在處理完訊息後 接收下一條訊息
2.Publish/subscribe 釋出訂閱模式
特點
生產者將訊息傳送給broker.由交換機將訊息發給每個跟綁定了交換機繫結的訊息佇列,每個佇列都能收到生產者傳送的每一條訊息
生產者 :
宣告Exchange_fanout_inform交換機。
宣告兩個佇列並且繫結到此交換機,
繫結時不需要指定routingkey傳送訊息時不需要指定routingkey
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer02 {
//訊息佇列名稱
public static final String QUEUE_INFORM_Test1 = "queue_inform_1";
public static final String QUEUE_INFORM_Test2 = "queue_inform_2";
//交換機名稱
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
//建立與RabbitMQ服務的TCP連線
connection = factory.newConnection();
//建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
channel = connection.createChannel();
//宣告交換機
/*
String exchange, String type
param1 : 交換機
param2 : 交換機 型別 fanout 、 topic、direct、headers
FANOUT 對應的模式是 釋出訂閱模式 publish/subscribe 模式
其他的工作模式以後會將
DIRECT 對應的是路由的工作模式
TOPIC 對應的是萬用字元工作模式
HEADERS 對應了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//宣告佇列
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*
* param1: 佇列名稱
* param2: 是否持久化
* param3 : 是否獨佔此佇列
* param4 : 佇列不用是否自動刪除
* param5 : 引數
*/
channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );
channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );
//交換機和佇列繫結
/**
* String queue, String exchange, String routingKey
* param1 : 佇列名稱
* exchange : 交換機
* routingKey : 路由key 後面講,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );
// 傳送訊息
String message = "";
for (int i = 0; i < 9; i++) {
message = "故事的開頭總是這樣,適逢其會,猝不及防。故事的結局總是這樣,花開兩朵,天各一方。"+ i;
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
*
* param1 交換機名稱
* param2 路由key,後面講,先用 "" 代替 ,
* param3 引數
* param4 傳遞的字串
*
*
*/
channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
}catch (Exception e){
}finally{
if(channel!=null){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
下面是我寫的倆個消費者
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest1 {
//佇列名稱
private static final String QUEUE_INFORM_Test1 = "queue_inform_1";
//交換機名稱
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
//建立與RabbitMQ服務的TCP連線
connection = factory.newConnection();
//建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
channel = connection.createChannel();
//這有五個引數
/***
* 宣告佇列,如果Rabbit中沒有此佇列將自動建立*
* param1:佇列名稱*
* param2:是否持久化* rabbit 關閉了該佇列是否存在..
* param3:佇列是否獨佔此連線* 如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式
* param4:佇列不再使用時是否自動刪除此佇列* 該佇列不使用了就會刪除該佇列
*
* param5:佇列引數*/
channel.queueDeclare(QUEUE_INFORM_Test1, true, false, false,null ); //這裡其實可以不用宣告佇列的,因為 生產者已經宣告過了,但是如果生產者後釋出服務,佇列沒有宣告,消費者去監聽佇列..會報錯
//宣告交換機
/*
String exchange, String type
param1 : 交換機
param2 : 交換機 型別 fanout 、 topic、direct、headers
FANOUT 對應的模式是 釋出訂閱模式 publish/subscribe 模式
其他的工作模式以後會將
DIRECT 對應的是路由的工作模式
TOPIC 對應的是萬用字元工作模式
HEADERS 對應了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//交換機和佇列繫結
/**
* String queue, String exchange, String routingKey
* param1 : 佇列名稱
* exchange : 交換機
* routingKey : 路由key 後面講,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽佇列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 佇列名稱
* param2 : 是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
* param3 : 消費物件,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_Test1,true , consumer);
}catch (Exception e){
}
}
}
消費者二
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest2 {
//佇列名稱
public static final String QUEUE_INFORM_Test2 = "queue_inform_2";
//交換機名稱
public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
//設定ip
factory.setHost("localhost");
//設定埠
factory.setPort(5672);
//設定賬號密碼
factory.setUsername("guest"); //預設賬號密碼都是guest
factory.setPassword("guest");
//設定虛擬空間
factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器
//建立與RabbitMQ服務的TCP連線
connection = factory.newConnection();
//建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
channel = connection.createChannel();
//這有五個引數
/***
* 宣告佇列,如果Rabbit中沒有此佇列將自動建立*
* param1:佇列名稱*
* param2:是否持久化* rabbit 關閉了該佇列是否存在..
* param3:佇列是否獨佔此連線* 如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式
* param4:佇列不再使用時是否自動刪除此佇列* 該佇列不使用了就會刪除該佇列
*
* param5:佇列引數*/
channel.queueDeclare(QUEUE_INFORM_Test2, true, false, false,null ); //這裡其實可以不用宣告佇列的,因為 生產者已經宣告過了,但是如果生產者後釋出服務,佇列沒有宣告,消費者去監聽佇列..會報錯
//宣告交換機
/*
String exchange, String type
param1 : 交換機
param2 : 交換機 型別 fanout 、 topic、direct、headers
FANOUT 對應的模式是 釋出訂閱模式 publish/subscribe 模式
其他的工作模式以後會將
DIRECT 對應的是路由的工作模式
TOPIC 對應的是萬用字元工作模式
HEADERS 對應了 headers 的工作模式
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);
//交換機和佇列繫結
/**
* String queue, String exchange, String routingKey
* param1 : 佇列名稱
* exchange : 交換機
* routingKey : 路由key 後面講,先 用 ""代替
*/
channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );
// 建立預設消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 重寫監聽方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive message.."+new String(body,"utf-8"));
}
};
// 監聽佇列
/**
* String queue, boolean autoAck, Consumer callback
*
* param1 : 佇列名稱
* param2 : 是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
* param3 : 消費物件,,包含消費方法
*
*/
channel.basicConsume(QUEUE_INFORM_Test2,true , consumer);
}catch (Exception e){
}
}
}
細心的同學可能發現了,,生產者跟消費者的不同程式碼其實就是傳送訊息,接收訊息的方法而已 ....
生產者宣告佇列跟交換機,消費者也宣告各自的佇列跟交換機... 其實就是為了怕先啟動消費者沒有發現佇列跟交換機報錯而已...
核心程式碼的話 其實 就是 生成交換機,生成佇列 繫結交換機跟佇列
測試的話
我們先啟動倆個消費者
然後我們啟動生產者
我們看列印結果,,倆臺消費者各自都處理了9條資訊
其實這種方法比WORKQUEUES 工作模式強,因為多臺機器可以監聽一個佇列,也就是下圖所示,我們可以要倆個佇列,當然也可以建立一個佇列...建立多少佇列跟一個佇列多少消費者完全取決與我們
因為時間問題,,,,還有4種工作模式下次寫- -