快速瞭解安卓RabbitMQ
阿新 • • 發佈:2019-01-04
RabbitMQ 是一個訊息代理。這主要的原理十分簡單,就是通過接受和轉發訊息。RabbitMQ不處理檔案,而是接受,並存儲和以二進位制形式將訊息轉發。在訊息的傳送過程中,我們使用一些標準稱呼:傳送訊息的程式就是一個生產者,我們使用“P”來描述它;接收訊息的程式是消費者,消費過程與接收相似,一個消費者通常是一個等著接受訊息的程式,我們使用"C"來描述。來源:http://blog.csdn.net/a704755096/article/details/45969717
Java 客戶端庫 RabbitMQ 遵循AMQP協議,那是一個開放的,並且通用的訊息協議。接下來看下java Android RabbitMQ怎麼傳送和接收訊息:
傳送端:生產者
package com.lenovo.app.mq; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class SendDirect{ private final static String QUEUE_NAME = "queue"; //佇列名稱 public static void main(String[] arg) throws java.io.IOException{ //1.連線MabbitMQ所在主機ip或者主機名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");factory.setRequestedHeartbeat(3);//連線心跳 //factory.setHost("110.80.10.26"); //factory.setPort(5672); //factory.setUsername("123"); //factory.setPassword("123"); //建立一個連線 建立一個頻道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //2.指定一個佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; //3.往佇列中發出一條訊息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[Send]" + message ); //4.關閉頻道和連線 channel.close(); connection.close(); } }
接收端:消費者
package com.lenovo.app.mq; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveDirect{ private final static String QUEUE_NAME = "queue";//佇列名稱 public static void main(String[] arg) throws java.io.IOException, java.lang.InterruptedException{ //1.開啟連線和建立頻道,與傳送端一樣 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");factory.setRequestedHeartbeat(3);//連線心跳 //factory.setHost("110.80.10.26"); //factory.setPort(5672); //factory.setUsername("123"); //factory.setPassword("123"); //建立一個連線 建立一個頻道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //2.宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Waiting for messages……"); //3.建立佇列消費者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer);//指定消費佇列 while (true){ //4.開啟nextDelivery阻塞方法(內部實現其實是阻塞佇列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[Received]" + message ); } } }
關閉連線
private void closeConn(){
if(connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
isConnect=false;
}