1. 程式人生 > 其它 >RabbitMQ基礎理解

RabbitMQ基礎理解

RabbitMQ訊息中介軟體:

hello world模式:

不使用交換機(或者說使用了預設交換機),訊息生產者和訊息消費者通過Queue佇列關聯,生產者將訊息放入佇列,消費者取出進行消費。

 1 package Queue;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 ConnectionFactory connectionFactory=new ConnectionFactory(); 14 15 connectionFactory.setHost("1.117.107.150"); 16 // connectionFactory.setConnectionTimeout(5000);
17 connectionFactory.setPort(5672); 18 connectionFactory.setVirtualHost("/test"); 19 connectionFactory.setUsername("test"); 20 connectionFactory.setPassword("test"); 21 22 Connection connection = connectionFactory.newConnection(); 23 24 Channel channel = connection.createChannel();
25 26 channel.queueDeclare("test",false,false,false,null); 27 28 for(int i=1;i<21;i++){ 29 channel.basicPublish("","test",null,(i+"hello").getBytes()); 30 } 31 32 33 channel.close(); 34 connection.close(); 35 } 36 }
 1 package Queue;
 2 
 3 import com.rabbitmq.client.*;
 4 
 5 import java.io.IOException;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 public class Customer1 {
 9 
10     public static void main(String[] args) throws IOException, TimeoutException {
11         ConnectionFactory connectionFactory=new ConnectionFactory();
12 
13         connectionFactory.setHost("1.117.107.150");
14 //        connectionFactory.setConnectionTimeout(5000);
15         connectionFactory.setPort(5672);
16         connectionFactory.setVirtualHost("/test");
17         connectionFactory.setUsername("test");
18         connectionFactory.setPassword("test");
19 
20         Connection connection = connectionFactory.newConnection();
21 
22         final Channel channel = connection.createChannel();
23 
24         channel.basicQos(1);//每次只接受一條訊息進行消費,否則將會把所有訊息放入佇列,如果消費過程中宕機,訊息將會丟失
25         channel.queueDeclare("test",false,false,false,null);//channel.queueDeclare(佇列名,是否持久化,是否獨佔佇列,是否自動刪除,其他引數)
26 
27         channel.basicConsume("test",false,new DefaultConsumer(channel){//這裡的false決定不自動應答,採用手動應答。
28             @Override
29             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
30                 System.out.println("new String = "+new String(body));
36                 channel.basicAck(envelope.getDeliveryTag(),false);//手動應答,告訴交換機,我佇列中的訊息處理完成。
37             }
38         });
39     }
40 }

好處:結構單一,耦合度一般,容易理解。

壞處:吞吐量小,只適合小型專案。

其實就是一個簡單的生產者/消費者模式。

工作佇列模式:

不使用交換機(或者說使用了預設交換機),訊息生產者和訊息消費者通過Queue佇列關聯,生產者將訊息放入佇列,消費者取出進行消費,但與hello woeld不同的是,訊息消費者可以是多個,他們將平均消費佇列中的訊息。

 1 package Queue;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         ConnectionFactory connectionFactory=new ConnectionFactory();
14 
15         connectionFactory.setHost("1.117.107.150");
16 //        connectionFactory.setConnectionTimeout(5000);
17         connectionFactory.setPort(5672);
18         connectionFactory.setVirtualHost("/test");
19         connectionFactory.setUsername("test");
20         connectionFactory.setPassword("test");
21 
22         Connection connection = connectionFactory.newConnection();
23 
24         Channel channel = connection.createChannel();
25 
26         channel.queueDeclare("test",false,false,false,null);
27 
28         for(int i=1;i<21;i++){
29             channel.basicPublish("","test",null,(i+"hello").getBytes());
30         }
31 
32 
33         channel.close();
34         connection.close();
35     }
36 }
 1 package Queue;
 2 
 3 import com.rabbitmq.client.*;
 4 
 5 import java.io.IOException;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 public class Customer1 {
 9 
10     public static void main(String[] args) throws IOException, TimeoutException {
11         ConnectionFactory connectionFactory=new ConnectionFactory();
12 
13         connectionFactory.setHost("1.117.107.150");
14 //        connectionFactory.setConnectionTimeout(5000);
15         connectionFactory.setPort(5672);
16         connectionFactory.setVirtualHost("/test");
17         connectionFactory.setUsername("test");
18         connectionFactory.setPassword("test");
19 
20         Connection connection = connectionFactory.newConnection();
21 
22         final Channel channel = connection.createChannel();
23 
24         channel.basicQos(1);
25         channel.queueDeclare("test",false,false,false,null);
26 
27         channel.basicConsume("test",false,new DefaultConsumer(channel){
28             @Override
29             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
30                 try {
31                     Thread.sleep(3000);
32                 } catch (InterruptedException e) {
33                     e.printStackTrace();
34                 }
35                 System.out.println("new String = "+new String(body));
36                 channel.basicAck(envelope.getDeliveryTag(),false);
37             }
38         });
39     }
40 }
package Queue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory=new ConnectionFactory();

        connectionFactory.setHost("1.117.107.150");
//        connectionFactory.setConnectionTimeout(5000);
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test");

        Connection connection = connectionFactory.newConnection();

        final Channel channel = connection.createChannel();

        channel.basicQos(1);
        channel.queueDeclare("test",false,false,false,null);

        channel.basicConsume("test",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          //加入了睡眠時間模擬不同消費者處理訊息的速度,能者多勞,處理快的被分發的訊息多,反之就少
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
          //因為2比1少睡眠1秒,所以處理的訊息比1多,手動應答後交換機就會給我一條新的訊息,忽略控制檯列印的時間,同樣6秒消費者2處理3條,消費者1處理兩條。 System.out.println(
"new String = "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }

好處:結構單一,耦合度一般,容易理解,吞吐量較hello world模式有所提升。

壞處:只適合訊息吞吐量一般的小型專案。

本質上還是生產者/消費者模式,且多個消費者平均消費。

Publish/Subscribe釋出訂閱模式:

使用了交換機,訊息生產者不在直接關聯Queue佇列,而是由exchange交換機負責訊息分發,訊息消費者依舊繫結Queue佇列,Queue繫結交換機,多個Queue可以繫結到同一個交換機。生產者不再關心訊息的去向,只負責將訊息放入交換機。

 1 package Utils;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class RabbitMQUtils {
11 
12     private static ConnectionFactory connectionFactory;
13 
14     static {
15          connectionFactory = new ConnectionFactory();
16         connectionFactory.setHost("1.117.107.150");
17         connectionFactory.setPort(5672);
18         connectionFactory.setVirtualHost("/test");
19         connectionFactory.setUsername("guest");
20         connectionFactory.setPassword("guest");
21 
22     }
23 
24 
25     public static Connection getConnection(){
26         try {
27             return connectionFactory.newConnection();
28         } catch (IOException e) {
29             e.printStackTrace();
30         } catch (TimeoutException e) {
31             e.printStackTrace();
32         }
33         return null;
34     }
35 
36     public static void close(Connection conn, Channel chnn) throws IOException, TimeoutException {
37         if (chnn!=null) chnn.close();
38         if (conn!=null) conn.close();
39     }
40 
41 }
 1 package Fanout;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import Utils.RabbitMQUtils;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         Connection connection = RabbitMQUtils.getConnection();
14 
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare("logs", "Fanout");
18 
19         channel.basicPublish("logs","",null,"fanout message".getBytes());
20 
21         RabbitMQUtils.close(connection,channel);
22     }
23 }
package Fanout;

import com.rabbitmq.client.*;
import Utils.RabbitMQUtils;

import java.io.IOException;

public class Customer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs", "Fanout");

        String queue = channel.queueDeclare().getQueue();

        channel.queueBind(queue,"logs","");

        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });

    }

}

好處:生產者消費者進一步解耦,並且吞吐量有所提升。

壞處:只能進行廣播式訊息分發,不能將訊息特定的傳送給某一個佇列。

P/S又叫廣播模式,只要是訂閱了當前交換機的所有佇列,均可收到交換機發送的訊息。分發訊息時會指定一個路由key,但在此模式下,路由key會被忽略。

Routing路由--direct(直連):

使用了交換機,訊息分發時需要指定路由key,只有訂閱了該交換機的並且路由key完全匹配的Queue佇列才能接受訊息,否則訊息會被Queue佇列丟棄。

 1 package Routing.Direct;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import Utils.RabbitMQUtils;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         Connection connection = RabbitMQUtils.getConnection();
14 
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare("routing","direct");
18 
19         for(int i=1;i<22;i++){
20             if(i % 2==0){
21                 channel.basicPublish("routing","black",null,(i+": my name is black").getBytes());//black為路由key
22             }else if(i % 3==0){
23                 channel.basicPublish("routing","green",null,(i+": my name is green").getBytes());//green為路由key
24             }
25         }
26         RabbitMQUtils.close(connection,channel);
27 
28     }
29 }
package Routing.Direct;

import com.rabbitmq.client.*;
import Utils.RabbitMQUtils;

import java.io.IOException;

public class CustomerBlack {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare("routing","direct");

        String queue = channel.queueDeclare().getQueue();

        channel.queueBind(queue,"routing","black");//佇列的路由key

        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }

}
 1 package Routing.Direct;
 2 
 3 import com.rabbitmq.client.*;
 4 import Utils.RabbitMQUtils;
 5 
 6 import java.io.IOException;
 7 
 8 public class CustomerGreen {
 9 
10     public static void main(String[] args) throws IOException {
11         Connection connection = RabbitMQUtils.getConnection();
12 
13         Channel channel = connection.createChannel();
14 
15         channel.exchangeDeclare("routing","direct");
16 
17         String queue = channel.queueDeclare().getQueue();
18 
19         channel.queueBind(queue,"routing","green");//佇列的路由key
20 
21         channel.basicConsume(queue,true,new DefaultConsumer(channel){
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 System.out.println(new String(body));
25             }
26         });
27     }
28 
29 }

好處:可以根據路由key將訊息傳送到指定佇列,控制訊息的去向。

壞處:不夠靈活,只能路由key完全匹配的時候才能接收訊息。

Routing路由--topic(動態路由):

路由key支援 “*”與“#”兩個萬用字元,路由key由多個單片語成,單詞之間使用“.”分隔開,單詞可以使用萬用字元代替,進行動態路由。“*”代表一個單詞,“#”代表0個1個或者多個單詞。兩個“.”之間為一個單詞。例如:one.rabbitmq.two,name.rabbitmq.sex,email.rabbitmq.phone等都可以與【*.rabbitmq.*】相匹配,而face.rabbitmq.one.two則會匹配失敗。將路由規則修改為【#.rabbitmq.#】則one.rabbitmq.two,face.rabbitmq.one.two,one.two.rabbitmq.face,one.two.rabbitmq.three.face等都可以匹配成功。

 1 package Routing.Topic;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import Utils.RabbitMQUtils;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         Connection connection = RabbitMQUtils.getConnection();
14 
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare("topic","topic");
18 
19         for(int i=1;i<22;i++){
20             if(i % 2==0){
21                 channel.basicPublish("topic","s.topic.t",null,(i+": my name is *").getBytes());
22             }else if(i % 3==0){
23                 channel.basicPublish("topic","topic.one.two",null,(i+": my name is #").getBytes());
24             }
25         }
26         RabbitMQUtils.close(connection,channel);
27     }
28 }
 1 package Routing.Topic;
 2 
 3 import com.rabbitmq.client.*;
 4 import Utils.RabbitMQUtils;
 5 
 6 import java.io.IOException;
 7 
 8 public class Customer1 {
 9 
10     public static void main(String[] args) throws IOException {
11         Connection connection = RabbitMQUtils.getConnection();
12 
13         Channel channel = connection.createChannel();
14 
15         channel.exchangeDeclare("topic","topic");
16 
17         String queue = channel.queueDeclare().getQueue();
18 
19         channel.queueBind(queue,"topic","*.topic.*");//只能接受三個單詞且第二個單詞為topic的路由key訊息
20 
21         channel.basicConsume(queue,true,new DefaultConsumer(channel){
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 System.out.println(new String(body));
25             }
26         });
27     }
28 }
 1 package Routing.Topic;
 2 
 3 import com.rabbitmq.client.*;
 4 import Utils.RabbitMQUtils;
 5 
 6 import java.io.IOException;
 7 
 8 public class Customer2 {
 9 
10     public static void main(String[] args) throws IOException {
11         Connection connection = RabbitMQUtils.getConnection();
12 
13         Channel channel = connection.createChannel();
14 
15         channel.exchangeDeclare("topic","topic");
16 
17         String queue = channel.queueDeclare().getQueue();
18 
19         channel.queueBind(queue,"topic","#.topic.#");//可以接收到所有第二個單詞為topic的訊息
20 
21         channel.basicConsume(queue,true,new DefaultConsumer(channel){
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 System.out.println(new String(body));
25             }
26         });
27     }
28 }

好處:更加靈活。