RabbitMQ的路由模式Demo
阿新 • • 發佈:2018-12-29
生產者:
package com.xuecheng.rabbitmq.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: 星仔
* @Date: 2018/12/24 21:31
* @Description:
*/
public class ProducerTest03 {
//宣告佇列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//宣告交換機的名稱
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try {
//建立連線工廠,建立連線
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory. setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//建立虛擬主機,rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
factory.setVirtualHost("/");
//創建於MQ服務的TCP連線
connection = factory.newConnection();
//創建於EXchange的通道,每一個通道都相當於一個會話事務
channel = connection.createChannel();
//宣告交換機 String exchange,BuiltinExchangeType type
/**
* 引數明細:
* exchange:交換機名稱
* type:交換機的型別
* fanout,topic,direct,headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//宣告佇列,如果RabbitMQ中沒有該佇列,則會建立
/**
*引數:String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> params
*引數明細:
* 1. queue:佇列名稱
* 2. durable:是否持久化,如果持久化,將MQ重啟之後佇列還在
* 3. exclusive: 是否獨佔連線,佇列只允許在該佇列中訪問,一旦連線關閉,該佇列將自動刪除,如果將此引數設定為true,那麼可用於臨時佇列的建立
* 4. autoDelete:自動刪除,佇列不再使用時是否關閉,如果將此引數設定為true將exclusive設定為true,可用於建立臨時佇列
* 5. params: 可以設定佇列的一些擴充套件引數,比如設定存活時間等等
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//交換機和佇列進行繫結String queue, String exchange, String routingKey
/**
* 引數明細:
* queue:佇列的名稱
* exchange:交換機的名稱
* routinfkey:路由key
*/
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform02");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform01");
//傳送訊息
/**
* 引數:String exchange,String routingKey,String props,Byte[] body
* 引數明細:
* 1、exchange: 交換機,如果不使用,將使用MQ的預設交換機
* 2、routingKey: 路由key,交換機根據路由key將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
* 3、props:訊息的屬性
* 4、body: 訊息內容
*/
for (int i=0; i<5;i++){
String msg = "helllo world"+i;
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform02",null,msg.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}finally{
//關閉連線,先關閉通道,在關閉連線
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
}
}
}
消費者:e-mail
package com.xuecheng.rabbitmq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: 星仔
* @Date: 2018/12/24 22:43
* @Description:
*/
public class ConsumeTestEmail {
//宣告佇列名稱
private static final String QUEUE_INFORM_SMS = "queue_inform_email";
//宣告交換機的名稱
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠,建立連線
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//建立虛擬主機,rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
factory.setVirtualHost("/");
//創建於MQ服務的TCP連線
Connection connection = factory.newConnection();
//創建於EXchange的通道,每一個通道都相當於一個會話事務
Channel channel = connection.createChannel();
/**
* 引數明細:
* exchange:交換機名稱
* type:交換機的型別
* fanout,topic,direct,headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//宣告佇列
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告消費訊息的方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 消費者接收訊息呼叫此方法
* @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
* @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException{
//交換機
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//訊息id,mq在channel中用來標識訊息的id,用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg);
}
};
/**
* 監聽佇列
* 引數:String queue, boolean autoAck,Consumer callback
* 引數明細:
* 1.queue: 佇列名稱
* 2.autoAck: 自動回覆,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動回覆
* 3.callback: 消費訊息的方法,消費者接收到訊息後呼叫此方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);
}
}
消費者-sms:
package com.xuecheng.rabbitmq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: 星仔
* @Date: 2018/12/24 22:43
* @Description:
*/
public class ConsumeTestSms {
//宣告佇列名稱
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//宣告交換機的名稱
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠,建立連線
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//建立虛擬主機,rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
factory.setVirtualHost("/");
//創建於MQ服務的TCP連線
Connection connection = factory.newConnection();
//創建於EXchange的通道,每一個通道都相當於一個會話事務
Channel channel = connection.createChannel();
/**
* 引數明細:
* exchange:交換機名稱
* type:交換機的型別
* fanout,topic,direct,headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//宣告佇列
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//宣告消費訊息的方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 消費者接收訊息呼叫此方法
* @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
* @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException{
//交換機
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//訊息id,mq在channel中用來標識訊息的id,用於確認訊息已接收
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg);
}
};
/**
* 監聽佇列
* 引數:String queue, boolean autoAck,Consumer callback
* 引數明細:
* 1.queue: 佇列名稱
* 2.autoAck: 自動回覆,設定為true為表示訊息接收到自動向mq回覆接收到了,mq接收到回覆會刪除訊息,設定為false則需要手動回覆
* 3.callback: 消費訊息的方法,消費者接收到訊息後呼叫此方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);
}
}