1-RabbitMQ安裝及簡單例項
RabbitMQ安裝及簡單例項
1 RabbitMQ簡介
它其實主要就是接受和轉發訊息,你可以把他理解成郵箱,當你把你想要釋出的郵件信箱,可以肯定的是,郵差先生最終會交付收件人的郵件。在這個比喻中,RabbitMQ信箱,一個郵局和一個郵差。
RabbitMQ和郵局之間的主要區別是,它不處理,相反,它接受資料,訊息的儲存和轉發的二進位制檔案。
· 釋出者,相當於是寄信人。把你所需要傳遞的訊息丟到郵箱:
· 消費者,相當於收件人。消費者大多是一個程式,等待接收訊息:
· RabbitMQ新建一個佇列,這個佇列相當於信箱。儘管通過RabbitMQ
在下面的圖中,“P”是我們的釋出者,“C”是我們消費者。中間的框是一個佇列
2 RabbitMQ的安裝
將RabbitMQ環境變數搭好以後,訪問:,使用剛建立的使用者進行登入。
進入admin的配置介面,配置虛擬主機的讀取許可權
RabbitMQ關閉、開啟指令
net stop rabbitMQ 關閉
net start rabbitMQ 開啟
3 RabbitMQ簡單例項
3.1 所需要的jar包
Maven的pom依賴如下
<!-- RabbitMq客戶端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.4</version>
</dependency>
客戶端庫:rabbitmq-client.jar
依賴庫:slf4j-api-1.7.21.jar、slf4j-simple-1.7.22.jar
3.2 傳送
釋出者釋出訊息到佇列,程式碼如下
package MQ;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public classSend {
private final static String QUEUE_NAME ="hello";
public static void main(String[] args)throws IOException,TimeoutException {
// 建立連線連線到MabbitMQ
ConnectionFactoryfactory = newConnectionFactory();
// 設定MabbitMQ所在主機ip或者主機名
factory.setHost("127.0.0.1");
// factory.setUsername("yuanh");
// factory.setPassword("yuanh");
// factory.setPort(5672);
// factory.setVirtualHost("y_yuanh");
Connectionconnection = factory.newConnection();
// 建立一個頻道
Channelchannel = connection.createChannel();
// 建立一個佇列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Stringmessage = "Hello World!";
// 將訊息放到佇列裡面
channel.basicPublish("",QUEUE_NAME,null, message.getBytes());
System.out.println("傳送 '" + message +"'");
//關閉通道和連線
channel.close();
connection.close();
}
}
當你有配置虛擬主機,以及使用者需要密碼驗證的時候,則需要加上
factory.setUsername("yuanh");
factory.setPassword("yuanh");
factory.setPort(5672);
factory.setVirtualHost("y_yuanh");
RabbitMQ中的hello佇列接收發送訊息,後臺顯示有一條為送出的記錄,如圖
3.2.1 虛擬主機的配置
為虛擬主機新增使用者
3.3 接收
消費者從佇列中取出訊息,消費者保持執行監聽訊息並打印出來。程式碼如下
package MQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
@SuppressWarnings("deprecation")
public classRecv {
private final static String QUEUE_NAME ="hello";
public static void main(String[] argv)throws Exception {
// 建立連線連線到MabbitMQ
ConnectionFactoryfactory = newConnectionFactory();
// 設定MabbitMQ所在主機ip或者主機名
factory.setHost("127.0.0.1");
factory.setUsername("yuanh");
factory.setPassword("yuanh");
factory.setPort(5672);
factory.setVirtualHost("y_yuanh");
Connectionconnection = factory.newConnection();
Channelchannel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");
QueueingConsumerconsumer = newQueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true, consumer);
try {
while (true) {
QueueingConsumer.Deliverydelivery = consumer.nextDelivery();
Stringmessage = newString(delivery.getBody());
System.out.println(" [x] Received '"+ message +"'");
}
}catch(Exception e) {
channel.close();
connection.close();
}
}
}
hello佇列中的訊息被消化