RabbitMQ安裝與訊息模型
阿新 • • 發佈:2020-09-19
RabbitMQ安裝與訊息模型
通過docker安裝rabbitmq
#拉起映象-management帶控制檯 docker pull rabbitmq:management #建立需要對映的目錄 mkdir -p rabbitmq/{etc,lib,var/{lib,log}} #執行容器 docker run -d --hostname "主機名" --name mq1 -e RABBITMQ_DEFAULT_USER="賬號" -e RABBITMQ_DEFAULT_PASS="密碼" -p 15672:15672 -p 5672:5672 rabbitmq:management #進入容器 docker exec -it mq1 bash #檢視服務狀態 rabbitmqctl status #檢視外掛列表 rabbitmq-plugins list #測試訪問 curl localhost:15672
訊息模型
依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
WEB操作
新建一個虛擬主機
新建一個使用者,並點選使用者名稱
設定使用者的許可權,允許其訪問ems主機
封裝工具類
private static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("122.51.70.176"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); } public static Connection getConnection() { try { return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } public static void close(Channel channel, Connection connection) { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } }
直連模型
生產者傳送訊息到佇列, 消費者從中取出訊息.
生產者
@Test
public void helloWorld() throws IOException{
Connection connection = MQUtils.getConnection();
//建立通道
Channel channel = connection.createChannel();
//通道繫結訊息佇列, 佇列名hello, 佇列不持久化, 非獨佔, 不自動刪除, 附加引數
channel.queueDeclare("hello", false, false, false, null);
//釋出訊息, 不指定交換機, 佇列名, 傳遞訊息的額外設定, 訊息內容
channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());
//關閉通道
MQUtils.close(channel, connection);
}
消費者
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
//消費訊息, 佇列名, 訊息自動確認, 消費的回撥介面
channel.basicConsume("hello", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//列印獲得的訊息
System.out.println(new String(body));
}
});
}
工作佇列(work queues)
讓多個消費者繫結到一個佇列, 共同消費佇列中的訊息, 該模型下預設消費者獲取訊息的方式是輪詢. 消費者消費的資訊數量是平均的.
生產者
@Test
public void workQueues() throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 50; i++) {
channel.basicPublish("", "work", null, ("hello work queues" + i).getBytes());
}
MQUtils.close(channel, connection);
}
消費者
//消費者1
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("消費者1" + new String(body));
}
});
}
//消費者2
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("消費者2" + new String(body));
try {
//慢處理
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
兩個消費者獲得的訊息數量都是25條, 消費者1很快就完成了任務, 但是消費者2加班了很久. 需要優化改進,讓消費者能者多勞.
改進後的消費者
//消費者1
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
final Channel channel = connection.createChannel();
//設通道上的訊息容量為1
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
//把自動確認設定為false
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1" + new String(body));
//手動確認訊息, 不開啟多條訊息確認
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
//消費者2
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
廣播模型(釋出-訂閱模型)
可以有多個消費者, 每個消費者都有自己的消費佇列, 佇列繫結到交換機上, 交換機從生產者接收訊息. 最終可以實現一條訊息被多個消費者消費.
生產者
@Test
public void fanout() throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//指定交換機, 交換機名為news, 交換機型別為fanout
channel.exchangeDeclare("news","fanout");
for (int i = 0; i < 10; i++) {
//訊息釋出, 選擇交換機, 無路由key, 無其他設定, 訊息內容
channel.basicPublish("news","",null, ("hello fanout" + i).getBytes());
}
MQUtils.close(channel, connection);
}
n個消費者
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//繫結佇列
channel.queueBind(queue, "news", "");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消費者n" + new String(body));
}
});
}
直連模型(靜態路由模型)
在路由的直連(Direct)模式下, 佇列與交換機之間不再是任意綁定了, 而是要指定一個路由Key, 訊息的傳送也必須要指定上路由Key, 交換機根據對訊息路由Key進行判斷, 只有相匹配的情況下消費者才會接收到訊息.
生產者
@Test
public void direct() throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//指定交換機, 交換機名為mail, 交換機型別為direct
channel.exchangeDeclare("mail", "direct");
for (int i = 0; i < 10; i++) {
//訊息釋出, 選擇交換機, 無路由key, 無其他設定, 訊息內容
if (i == 4 | i == 8) {
channel.basicPublish("mail", "vip", null, ("vip訊息" + i).getBytes());
continue;
}
channel.basicPublish("mail", "user", null, ("使用者訊息" + i).getBytes());
}
MQUtils.close(channel, connection);
}
消費者
//消費者1-普通使用者
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//繫結佇列
channel.queueBind(queue, "mail", "user");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消費者1" + new String(body));
}
});
}
//消費者1-vip使用者
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//繫結多個佇列
channel.queueBind(queue, "mail", "user");
channel.queueBind(queue, "mail", "vip");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消費者2" + new String(body));
}
});
}
話題型別(動態路由模型)
在路由的話題(Topic)模式下, 也是使用路由Key來分配訊息的, 不過在繫結路由Key的時候可以使用萬用字元, 一般路由Key可以由一到多個單片語成, 單詞之間使用"."進行分割.
釋出者
@Test
public void Topic() throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//指定交換機, 交換機名為notice, 交換機型別為topic
channel.exchangeDeclare("notice", "topic");
for (int i = 0; i < 10; i++) {
//訊息釋出, 選擇交換機, 無路由key, 無其他設定, 訊息內容
if (i == 4) {
channel.basicPublish("notice", "user.vip.msg", null, ("vip訊息" + i).getBytes());
continue;
}else if (i == 8) {
channel.basicPublish("notice", "user.vip.present", null, ("vip禮物" + i).getBytes());
continue;
}
channel.basicPublish("notice", "user.msg", null, ("使用者訊息" + i).getBytes());
}
MQUtils.close(channel, connection);
}
消費者
//消費者1
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//繫結佇列, *任意一個字詞, #任意數量字詞
channel.queueBind(queue, "notice", "user.*");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消費者1" + new String(body));
}
});
}
//消費者2
public static void main(String[] args) throws IOException {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//繫結佇列, *任意一個字詞, #任意數量字詞
channel.queueBind(queue, "notice", "user.#");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消費者2" + new String(body));
}
});
}