RabbitMQ的訂閱和釋出步驟詳解
阿新 • • 發佈:2019-01-08
一、關於RabbitMQ搭建和基本概念這裡不做介紹,下面給出實用的參考部落格
RabbitMQ基礎概念及詳細介紹參考文件:http://blog.csdn.net/whycold/article/details/41119807
RabbitMQ入門及環境的搭建:http://m.blog.csdn.net/article/details?id=50487028
RabbitMQ網頁控制檯開啟方式:http://blog.csdn.net/spyiu/article/details/24697221
二、RabbitMQ的釋出
(1)釋出端的連線方法
釋出端的連線只需要建立一個ConnectionFactory然後建立一個連線,然後建立一個頻道, 宣告一個路由器,指定名稱、模式、及其是否durable
public int BaseConnection(){
/**
* 建立連線連線到RabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//設定MabbitMQ所在主機ip或者主機名
factory.setHost(IP);
factory.setPort(5672);
factory.setUsername(user);
factory.setPassword(password);
//建立一個連線
try {
connection = factory.newConnection();
} catch (IOException e) {
System.out.println("[x] 請確認輸入的IP地址、使用者名稱、密碼是否準確!");
return -1;
} catch (Exception e) {
System.out.println("[x] 連線RabbitMQ超時,請重試!");
return -1;
}
//建立一個頻道
try {
channel = connection.createChannel();
} catch (IOException e) {
System.out.println("[x] 建立頻道出錯,請重試!");
return -1;
}
//宣告一個路由器,指定名稱、模式、及其是否durable
try {
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
} catch (IOException e) {
System.out.println("[x] 路由器宣告失敗,請重試!");
return -1;
}
System.out.println("[x] 釋出訊息者成功連線至RabbitMQ !");
return 0;
}
(2)釋出端釋出訊息的方法
釋出端釋出訊息往指定的exchange(路由)傳送的時候需要指定一個routingKey,topic型別時當接收端的bindingKey和routingKey相匹配的時候才能接收到訂閱的訊息
/**
* 釋出訊息
* @param routingKey routingKey
* @param msg 訊息
* @return
*/
public boolean Publish(String routingKey,byte[] msg){
try {
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg);
System.out.println("[x] Sender Sent " + routingKey + " : " + msg);
return true;
} catch (IOException e) {
System.out.println("[y] Sender failed to basic publish.");
return false;
}
}
三、RabbitMQ的訂閱
(1)訂閱端的連線方法
訂閱端的連線在釋出端連線的基礎上還需要給該exchange繫結
/**
* 建立連線連線到RabbitMQ
* @return 0 means success
* -1 means failure
*/
public int BaseConnection(){
ConnectionFactory factory = new ConnectionFactory();
//設定MabbitMQ所在主機ip或者主機名
factory.setHost(IP);
factory.setPort(5672);
factory.setUsername(user);
factory.setPassword(password);
//建立一個連線
try {
connection = factory.newConnection();
} catch (IOException e) {
System.out.println("[y] 請確認輸入的IP地址、使用者名稱、密碼是否準確!");
return -1;
} catch (Exception e) {
System.out.println("[y] 連線RabbitMQ超時,請重試!");
return -1;
}
//建立一個頻道
try {
channel = connection.createChannel();
} catch (IOException e) {
System.out.println("[y] 建立頻道出錯,請重試!");
return -1;
}
//宣告一個路由器,指定名稱、模式、及其是否durable
try {
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
} catch (IOException e) {
System.out.println("[y] 路由器宣告失敗,請重試!");
return -1;
}
//指定一個佇列,隨機佇列名,non-durable,exclusive,not auto-delete
// QUEUE_NAME = MqConfig.QUEUE_NAME;
Map<String,Object> args = new HashMap();
args.put("x-message-ttl",message_ttl);
try {
channel.queueDeclare(QUEUE_NAME,QUEUE_DURABLE,QUEUE_EXCLUSIVE,QUEUE_AUTO_DELETE,args);
} catch (IOException e) {
System.out.println("[y] 佇列宣告失敗,請重試!");
}
try {
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,bindingKey);
} catch (IOException e) {
System.out.println("[y] 佇列繫結路由關鍵字 " + bindingKey + " 時出錯!");
}
System.out.println("[y] 訂閱訊息者成功連線至RabbitMQ !");
return 0;
}
(2)訂閱端的訂閱方法
/**
* 訂閱訊息函式
*/
public boolean Consume(){
System.out.println("[y] Receiver " + QUEUE_NAME + " Waiting for messages.To exit press CTRL+C.");
//訊息訂閱
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
team.iOceanPlus.PB.Config.PBConfig rec=team.iOceanPlus.PB.Config.PBConfig.parseFrom(body);
// rec.getConfigTargetDistribution();
// rec.getConfigTargetDistribution().getConfigType();
System.out.println(rec.getConfigTargetDistribution(0));
}
};
//訊息反饋
try {
channel.basicConsume(QUEUE_NAME,true,consumer);
return true;
} catch (IOException e) {
System.out.println("[y] Receiver "+ QUEUE_NAME + " failed to basic consume." );
return false;
}
}