rabbimq工作模型
RabbitMQ
上圖是rabbitmq的圖形管理介面:
rabbitmq的基本元件:
- ConCnections:客戶端連線rabbitmq伺服器都需要和伺服器建立連線(connections)
- Channels:通道,客戶端與伺服器傳送接收訊息都需要通過通道傳輸。建立連線後就可以建立通道,通道可以繫結交換機或佇列來發送生產者訊息,可以繫結交換機和佇列來消費訊息。
- Eqxchanges:交換機,相比於只用佇列來交換資訊,交換機可以實現更多種訊息消費模式。
- Queues:訊息佇列,訊息放在佇列中,等消費者來消費。
虛擬主機
為了讓各個使用者可以互不干擾的工作,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。其實就是一個獨立的訪問路徑,不同使用者使用不同路徑,各自有自己的佇列、交換機,互相不會影響。(就是在建立連線的時候還要新增一個VirtualHost的引數,不同的程式使用不同的虛擬主機就可以相互之間的交換機,佇列都互不影響)
可以通過下圖步驟新增虛擬主機:
建立連線
public class RabbitMqUtils { private static ConnectionFactory connectionFactory; static { //新建一個連線工程 connectionFactory=new ConnectionFactory(); //設定ip connectionFactory.setHost("172.18.1.53"); //設定埠 connectionFactory.setPort(5672); //設定虛擬主機 connectionFactory.setVirtualHost("/ems"); //設定使用者名稱 connectionFactory.setUsername("ems"); //設定密碼 connectionFactory.setPassword("123"); } //定義提供連線物件的方法 public static Connection getConnection(){ try { //通過newConnection()方法就可以建立一個連線 return connectionFactory.newConnection(); }catch (Exception e){ e.printStackTrace(); } return null; } //關閉連線方法 public static void closeConnectionAndChanel(Channel channel,Connection conn){ try { if(channel!=null){ channel.close(); } if (conn!=null){ conn.close(); } }catch (Exception e){ e.printStackTrace(); } } }
RabitMQ的訊息模型
1,hello模型(直連)
在上圖的模型中,有以下概念:
-
P:生產者,也就是要傳送訊息的程式。
-
C:消費者:訊息的接受者,會一直等待訊息到來。
-
queue:訊息佇列,圖中紅色部分。類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。
provider.java
package com.example.demo.helloworld; import com.example.demo.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author zdl * @create 2020/8/6 11:41 */ public class Provider { //生產訊息 @Test public void testSendMessage() throws IOException, TimeoutException { Connection connection = RabbitMqUtils.getConnection(); //獲取連結通道 Channel channel = connection.createChannel(); //通道繫結對應的訊息佇列 //引數1,佇列名稱,不存在佇列將自動建立佇列 //引數2,用來定義佇列是否啟動持久化 //引數3,exclusive 是否獨佔佇列,只能被當前通道繫結 //引數4,autoDelete,是否在消費完成後並且消費者斷開連線將自動刪除佇列,被消費者消費完,佇列沒有其他元素就刪除佇列 //引數5,額外引數 channel.queueDeclare("hello",false,false,false,null); //釋出訊息 //引數1:交換機名稱(連佇列就為"") 引數2:佇列名稱(如果連交換級就為"") 引數3:傳遞訊息額外設定 引數4:訊息的具體內容 channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes()); RabbitMqUtils.closeConnectionAndChanel(channel,connection); } }
consumer.java
package com.example.demo.helloworld;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException
/**
* @author zdl
* @create 2020/8/6 13:40
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = RabbitMqUtils.getConnection();
//獲取連結通道
Channel channel = connection.createChannel();
//通道繫結對應的訊息佇列
//引數1,佇列名稱,不存在佇列將自動建立佇列
//引數2,用來定義佇列是否啟動持久化(僅佇列,不包含訊息)
//引數3,exclusive 是否獨佔佇列
//引數4,autoDelete,是否在消費完成後自動刪除佇列
//引數5,額外引數
channel.queueDeclare("hello",false,false,false,null);
//消費訊息
//引數1:消費佇列名稱
//引數2:開啟訊息自動確認機制
//引數3:消費時的回撥介面(當有訊息可以消費時就會呼叫該消費者的handleDelivery方法來進行消費)
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override //最後一個引數;訊息佇列中取出的訊息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body)+"=====================");
}
});
}
}
2,Work queues(任務模型)
當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大於訊息的消費速度。長此以往,訊息就會堆積越來越多,無法及時處理。此時就可以使用work 模型:讓多個消費者繫結到一個佇列,共同消費佇列中的訊息。佇列中的訊息一旦消費,就會消失,因此任務是不會被重複執行的。
角色:
- P:生產者:任務的釋出者
- C1:消費者-1,領取任務並且完成任務,假設完成速度較慢
- C2:消費者-2:領取任務並完成任務,假設完成速度快
生產者
package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 14:52
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i=0;i<20;i++){
channel.basicPublish("","work",null,(i+"hollo work quene").getBytes());
}
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者1
package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 15:07
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
//每次只消費一條資訊
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
//引數2:是否自動確認,不自動確認的話就處理完再確認然後才能再消費
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("消費者-1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
消費者2
package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 15:31
*/
public class Consumer2 {
public static void main(String[] args) throws IOException{
Connection connection = RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
IOException {
System.out.println("消費者-2:"+new String(body));
//引數1:確認佇列中具體那個訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
在面兩個消費者中,更改了1:每次只消費一條消費,2:取消訊息自動確認。這麼做是為了實現按勞分配。
如果是自動確認的話,RabbitMQ將按順序將每個訊息傳送給下一個使用者。平均而言,每個消費者都會收到相同數量的訊息。但是我們的消費者1的消費能力是比較差的,消費者2都消費完10條了,消費者1一條還沒消費完,所以這是不符合我們的期望的,我們希望消費能力強的可以多消費點資訊。
所以我們需要關閉訊息自動確認,並在消費者消費完後呼叫channel.basicAck(envelope.getDeliveryTag(),false);來手動確認訊息,這樣才會在消費完一條訊息後才會進行下一條訊息的消費。
fanout(廣播)
在廣播模式下,訊息傳送流程是這樣的:
- 可以有多個消費者
- 每個消費者有自己的queue(佇列)
- 每個佇列都要繫結到Exchange(交換機)
- 生產者傳送的訊息,只能傳送到交換機,交換機來決定要發給哪個佇列,生產者無法決定。
- 交換機把訊息傳送給繫結過的所有佇列
- 佇列的消費者都能拿到訊息。實現一條訊息被多個消費者消費
生產者
package com.example.demo.fanout;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 16:12
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//將通道宣告指定交換機 引數1:交換機名稱 引數2:交換機型別 fanout:廣播,一條訊息多個消費者同時消費
channel.exchangeDeclare("logs","fanout");
//釋出訊息
channel.basicPublish("logs","",null,"hello".getBytes());
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者1
package com.example.demo.fanout;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 16:22
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare("logs","fanout");
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//繫結交換級和佇列
channel.queueBind(queue,"logs","");
//消費訊息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1"+new String(body));
}
});
}
}
消費者2和消費者3與消費者1類似
結果:
Direct(訂閱模型)
在Fanout模式中,一條訊息,會被所有訂閱的佇列都消費。但是,在某些場景下,我們希望不同的訊息被不同的佇列消費。這時就要用到Direct型別的Exchange。
在Direct模型下:
- 佇列與交換機的繫結,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 訊息的傳送方在 向 Exchange傳送訊息時,也必須指定訊息的
RoutingKey
。 - Exchange不再把訊息交給每一個繫結的佇列,而是根據訊息的
Routing Key
進行判斷,只有佇列的Routingkey
與訊息的Routing key
完全一致,才會接收到訊息
角色
- P:生產者,向Exchange傳送訊息,傳送訊息時,會指定一個routing key。
- X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給 與routing key完全匹配的佇列
- C1:消費者,其所在佇列指定了需要routing key 為 error 的訊息
- C2:消費者,其所在佇列指定了需要routing key 為 info、error、warning 的訊息
生產者:
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 17:13
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
String routingKey="error";
//通過引數2指定routingKey
channel.basicPublish("logs_direct",routingKey,null,("這是directm模型釋出的基於route key:["+routingKey+"] 傳送的訊息").getBytes());
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者1:
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 17:22
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName="logs_direct";
channel.exchangeDeclare(exchangeName,"direct");
//獲取臨時佇列
String queue = channel.queueDeclare().getQueue();
//該佇列接收routingKey為"info","error","warning"其中任何一個的訊息
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","warning");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
消費者2:
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 17:28
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
String queue = channel.queueDeclare().getQueue();
//基於route_key幫定佇列和交換機,第三個引數為routingKey,則該佇列只接收routingKey為"error"的訊息
channel.queueBind(queue,exchangeName,"error");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+new String(body));
}
});
}
}
測試生產者傳送Route key為error的訊息時
測試生產者傳送Route key為info的訊息時
Topic 模式 萬用字元訂閱
Topic
型別的Exchange
與Direct
相比,都是可以根據RoutingKey
把訊息路由到不同的佇列。只不過Topic
型別Exchange
可以讓佇列在繫結Routing key
的時候使用萬用字元!這種模型Routingkey
一般都是由一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert
# 統配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1個詞
# (hash) can substitute for zero or more words. 匹配一個或多個詞
# 如:
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs
生產者
package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 18:11
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//指定交換機及路由模式
channel.exchangeDeclare("topic","topic");
//動態路由key
String routekey = "user.save.fjie";
//釋出訊息
channel.basicPublish("topic",routekey,null,("這是路由中的動態訂閱模型,route key: ["+routekey+"]").getBytes());
RabbitMqUtils.closeConnectionAndChanel(channel,connection);
}
}
消費者1:
package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 18:14
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//繫結交換機
channel.exchangeDeclare("topic","topic");
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//使用萬用字元繫結routingKey
channel.queueBind(queue,"topic","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
消費者2:
package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author zdl
* @create 2020/8/6 18:21
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//繫結交換機
channel.exchangeDeclare("topic","topic");
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//使用萬用字元繫結routingKey
channel.queueBind(queue,"topic","user.#");
//消費
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+new String(body));
}
});
}
}
結果: