RabbitMQ詳解(二)------消息通信的概念
RabbitMQ詳解(二)------消息通信的概念
消息通信,有很多種,郵箱 qq 微信 短信等,這些通信方式都有發送者,接受者,還有一個中間存儲離線消息的容器.但是這些通信方式和RabbitMQ的通信模型是不一樣的,比如郵件,郵件服務器基於POP3/SMTP協議,通信雙方需要明確指定,並且發送的郵件內容有固定的結構.而RabbitMQ服務器基於AMQP協議,這個協議是不需要明確指定發送方和接受方的,而且發送的消息也沒有固定的結構,甚至可直接存儲二進制數據,並且和郵件服務器一樣,也能存儲離線消息,最關鍵的是RabbitMQ既能夠以一對一的方式進行路由,還能夠以一對多的方式進行廣播.
生產者和消費者
在RabbitMQ的通信過程中,有兩個主要的角色:生產者和消費者.類比於郵件通信的發送方和接收方.
首先我們需要明確RabbitMQ服務器是不能夠產生數據的,它是——消息中間件,是一個用來傳遞消息的中間商.生產者產生創建消息,然後發布到代理服務器(RabbitMQ),而消費者則從代理服務器獲取消息(不是直接找生產者要消息),而且在實際應用中,生產者和消費者也是可以角色互相轉換的,所以當我們應用程序連接到RabbitMQ服務器時,必須要明確我是生產者還是消費者.
消息
生產者創建消息,然後發布到RabbitMQ服務器中.
這裏的消息分為兩部分:有效內容和內容標簽.
- 有效內容:可以是任何內容,一個數組,一個集合,甚至二進制數據都可以.RabbitMQ不會在意你發什麽數據,盡管發就可以了.
- 內容標簽:描述有效內容,是RabbitMQ用來決定誰將獲得消息.前面說的郵件通信,必須明確指定發送方地址和收件方地址,而基於AMQP協議的RabbitMQ則是通過生產者發送消息附帶的內容標簽將消息發送給感興趣的消費者.
一般來說生產者創建消息會設置標簽,但是傳輸到消費者那裏就沒有標簽了,除非在有效內容中說明誰是生產者,一般消費者是不知道誰產生的消息的.
信道
生產者產生了消息,然後發布到RabbitMQ服務器,發布之前肯定要先連接上服務器,也就是要在應用程序和RabbitMQ服務器之間建立一條TCP連接,一旦連接建立,應用程序就可以創建一條AMQP信道.
信道是建立在"真實的TCP"連接內的虛擬連接,AMQP命令都是通過信道發送出去的,每條信道都會被指派一個唯一的ID(AMQP庫會幫你記住ID),不論是發布消息,訂閱隊列或者接受消息,這些動作都是通過信道來完成的.
為什麽不直接通過TCP連接來發送AMQP命令呢?
這是因為效率問題,對於操作系統來說,每次建立和銷毀TCP會話是非常昂貴的開銷,而實際系統中,比如電商雙十一,每秒鐘高峰期成千上萬條連接,一般來說操作系統建立TCP連接是有數量限制的,那這就會遇到瓶頸.
引入信道的概念,我們可以在一條TCP連接上創建N多個信道,這樣既能發送命令,也能夠保證每條信道的私密性,我們可以將其想象為光纖電纜.
交換器和隊列
交換器和隊列都是RabbitMQ服務器的一部分,我們知道生產者會將消息發送到RabbitMQ服務器,而進入該服務器後,首先進入交換器部分,然後交換器根據消息附帶的內容標簽,將消息綁定到相應的隊列.什麽是隊列?
- 容納消息的場所,生產者發送到RabbitMQ服務器的消息會在隊列中等待消費者消費.
- 隊列是RabbitMQ服務器的終點(除非消息進入了黑洞).
- 隊列可以實現負載均衡,我們可以增加一堆消費者,然後讓RabbitMQ以循環的方式來均勻的分配消息.
消息進入RabbitMQ服務器時,會首先將消息發送到交換器,然後交換器會根據特定的路由算法以及消息的內容標簽將消息綁定到相應的隊列.在AMQP協議中有四種交換器:direct、fanout、topic、headers,每種交換器都實現了不同的路由算法,這也對應RabbitMQ工作的幾種不同工作方式.
虛擬主機
首先我們拋出一個問題,一個RabbitMQ肯定不是只服務一個應用程序,那麽多個應用程序同時使用RabbitMQ服務器,如何保證彼此之間不會沖突?
答案就是虛擬主機,虛擬主機其實就是一個迷你版的RabbitMQ服務器,它擁有自己的交換器和隊列,更重要的是虛擬主機擁有自己的權限機制,一個服務器能夠創建多個虛擬主機.那麽我們在使用RabbitMQ服務器的時候,只需要講一個應用程序對應一個虛擬主機,這種各個實例間邏輯上的分離就能夠保證不同的應用程序安全的傳遞信息.
默認的虛擬主機是"/".
簡單實例
開啟RabbitMQ服務,這裏使用的是docker,具體安裝可看上篇博文.
pom.xml
<!--RabbitMQ-client-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.2</version>
</dependency>
ConnectionUtil.java
package org.alva.Utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <一句話描述>,RabbitMQ的連接工具類
* <詳細介紹>,
*
*/
public class ConnectionUtil {
public static Connection getConnection(String host, int port, String vhost, String username, String password) throws IOException, TimeoutException {
//1.定義連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.設置服務器地址
connectionFactory.setHost(host);
//3.設置端口
connectionFactory.setPort(port);
//4.設置虛擬主機,用戶名,密碼
connectionFactory.setVirtualHost(vhost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//5.通過連接工廠獲取連接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
Producer.java
package org.alva.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.alva.Utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <一句話描述>,生產者
* <詳細介紹>,
*
*/
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//1.獲取連接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//2.聲明通道
Channel channel = connection.createChannel();
//3.聲明(創建)隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4.定義消息內容
String message = "hello rabbitmq";
//5.發布消息
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("[x] send‘"+message+"‘");
//6.關閉通道和連接
channel.close();
connection.close();
}
}
Consumer.java
package org.alva.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.alva.Utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <一句話描述>,消費者
* <詳細介紹>,
*
*/
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.獲取連接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//2.聲明通道
Channel channel = connection.createChannel();
//3.聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.定義隊列的消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//5.監聽隊列
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
//6.獲取消息
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received ‘" + message + "‘");
}
}
}
RabbitMQ詳解(二)------消息通信的概念