1. 程式人生 > >rabbitmq 工作模式 一

rabbitmq 工作模式 一

學習工作模式前,先看一下rabbitmq 給的helloworld案例

 

 

 

這是傳統的一對一,,,,  也就是一臺機器生產,一臺機器接收....

 

為了更好的瞭解程式碼....我這裡演示的話用底層的程式碼來演示....不整合框架了

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.3</version>
</dependency>

 

這是rabbitmq 提供的依賴......匯入一下就可以測試了

 

下面是我寫的生產類...

        測試是沒問題的

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {

    //佇列名稱
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {




        Connection connection = null;   //與
        Channel channel = null;


        try {

            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();

            //這有五個引數
            /***
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立*
             * param1:佇列名稱*
             * param2:是否持久化*  rabbit 關閉了該佇列是否存在..
             * param3:佇列是否獨佔此連線*    如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式
             * param4:佇列不再使用時是否自動刪除此佇列*    該佇列不使用了就會刪除該佇列
             *
             * param5:佇列引數*/
            channel.queueDeclare(QUEUE, true, false, false,null );

            String message = "你愛到極致的人,不會愛你";


            //釋出訊息
            /**
             * String exchange, String routingKey,  BasicProperties props, byte[] body
             *
             * param1  : 交換機 後面我會講這裡是指定交換機,使用預設的交換機
             * param2  :  路由key,這也是先不寫,,後面講   大概作用是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
             *  param3 :訊息包含的屬性
             *   訊息體
             *
             */
            channel.basicPublish("", QUEUE,null ,message.getBytes() );

            System.out.println("Send Message is:'" + message + "'");
        }catch (Exception e){

        }finally {
            if (channel != null){
                channel.close();
            }
            
            if (connection != null){
                connection.close();
            }
        }
    }
}

 

然後下面是我寫的消費類 ,,  連線mq程式碼都一樣來著...關注傳送訊息和接收訊息的方法就行 ...

同樣測試過,程式碼是可執行的

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {

    //佇列名稱
    private static final String QUEUE = "helloworld";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();

            //這有五個引數
            /***
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立*
             * param1:佇列名稱*
             * param2:是否持久化*  rabbit 關閉了該佇列是否存在..
             * param3:佇列是否獨佔此連線*    如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式
             * param4:佇列不再使用時是否自動刪除此佇列*    該佇列不使用了就會刪除該佇列
             *
             * param5:佇列引數*/
            channel.queueDeclare(QUEUE, true, false, false,null );  //這裡其實可以不用宣告佇列的,因為 生產者已經宣告過了,但是如果生產者後釋出服務,佇列沒有宣告,消費者去監聽佇列..會報錯


//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽佇列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  佇列名稱
             * param2 :   是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
             * param3 : 消費物件,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE,true , consumer);
        }catch (Exception e){

        } 
    }
}

 

以上的話就是rabbitmq提供的案例  

 一臺生產者 , 一臺消費者 

 

 

 

emmm,,,

這裡講的工作的模式是 workqueues 

WorkQueues

 

對比helloword案例,這裡多了個消費者..

應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度。

 

測試 

     我們啟動倆次消費者 

 

 

     然後用剛剛寫的生產者,傳送五條資訊 

 

 

然後我們看消費者列印的資訊

 

 

結果 :

          mq workqueues 使用的是輪詢方式講資訊平均發給消費者 ,

         消費者會在處理完訊息後 接收下一條訊息 

 

 

 2.Publish/subscribe 釋出訂閱模式

 

 

特點 

        生產者將訊息傳送給broker.由交換機將訊息發給每個跟綁定了交換機繫結的訊息佇列,每個佇列都能收到生產者傳送的每一條訊息

 

 

生產者 :

 宣告Exchange_fanout_inform交換機。

宣告兩個佇列並且繫結到此交換機,

繫結時不需要指定routingkey傳送訊息時不需要指定routingkey

 

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class Producer02 {

    //訊息佇列名稱
    public static final String QUEUE_INFORM_Test1 = "queue_inform_1";
    public static final String QUEUE_INFORM_Test2 = "queue_inform_2";

    //交換機名稱
    public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";


    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;


        try {

            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();


            //宣告交換機

            /*
                String exchange, String type
                param1 :  交換機
                param2 : 交換機 型別   fanout 、 topic、direct、headers

                FANOUT 對應的模式是 釋出訂閱模式  publish/subscribe 模式

                其他的工作模式以後會將
                DIRECT  對應的是路由的工作模式
                TOPIC  對應的是萬用字元工作模式
                HEADERS   對應了 headers 的工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);


            //宣告佇列

            /**
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             * param1: 佇列名稱
             * param2: 是否持久化
             * param3 : 是否獨佔此佇列
             * param4 : 佇列不用是否自動刪除
             *  param5 : 引數
             */
            channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );
            channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );


            //交換機和佇列繫結
            /**
             * String queue, String exchange, String routingKey
             * param1 :  佇列名稱
             * exchange :  交換機
             * routingKey : 路由key  後面講,先 用 ""代替
             */
            channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
            channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );


//            傳送訊息
            String message = "";
            for (int i = 0; i < 9; i++) {
                message = "故事的開頭總是這樣,適逢其會,猝不及防。故事的結局總是這樣,花開兩朵,天各一方。"+ i;


                /**
                 * String exchange, String routingKey, BasicProperties props, byte[] body
                 *
                 *   param1  交換機名稱
                 *   param2 路由key,後面講,先用 "" 代替 ,
                 *   param3  引數
                 *   param4 傳遞的字串
                 *
                 *
                 */
                channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());

                System.out.println("Send Message is:'" + message + "'");
            }
        }catch (Exception e){

        }finally{
            if(channel!=null){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }







        }
}

 

下面是我寫的倆個消費者

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest1 {

    //佇列名稱
    private static final String QUEUE_INFORM_Test1 = "queue_inform_1";
    //交換機名稱
    public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();

            //這有五個引數
            /***
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立*
             * param1:佇列名稱*
             * param2:是否持久化*  rabbit 關閉了該佇列是否存在..
             * param3:佇列是否獨佔此連線*    如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式
             * param4:佇列不再使用時是否自動刪除此佇列*    該佇列不使用了就會刪除該佇列
             *
             * param5:佇列引數*/
            channel.queueDeclare(QUEUE_INFORM_Test1, true, false, false,null );  //這裡其實可以不用宣告佇列的,因為 生產者已經宣告過了,但是如果生產者後釋出服務,佇列沒有宣告,消費者去監聽佇列..會報錯




            //宣告交換機

            /*
                String exchange, String type
                param1 :  交換機
                param2 : 交換機 型別   fanout 、 topic、direct、headers

                FANOUT 對應的模式是 釋出訂閱模式  publish/subscribe 模式

                其他的工作模式以後會將
                DIRECT  對應的是路由的工作模式
                TOPIC  對應的是萬用字元工作模式
                HEADERS   對應了 headers 的工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);

            //交換機和佇列繫結
            /**
             * String queue, String exchange, String routingKey
             * param1 :  佇列名稱
             * exchange :  交換機
             * routingKey : 路由key  後面講,先 用 ""代替
             */
            channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );




//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽佇列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  佇列名稱
             * param2 :   是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
             * param3 : 消費物件,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_Test1,true , consumer);
        }catch (Exception e){

        }
    }
}

 

消費者二

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest2 {

    //佇列名稱

    public static final String QUEUE_INFORM_Test2 = "queue_inform_2";

    //交換機名稱
    public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();

            //這有五個引數
            /***
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立*
             * param1:佇列名稱*
             * param2:是否持久化*  rabbit 關閉了該佇列是否存在..
             * param3:佇列是否獨佔此連線*    如果引數是true,那麼一個連線connection 只能存在這一個channel,除非關閉程式
             * param4:佇列不再使用時是否自動刪除此佇列*    該佇列不使用了就會刪除該佇列
             *
             * param5:佇列引數*/
            channel.queueDeclare(QUEUE_INFORM_Test2, true, false, false,null );  //這裡其實可以不用宣告佇列的,因為 生產者已經宣告過了,但是如果生產者後釋出服務,佇列沒有宣告,消費者去監聽佇列..會報錯




            //宣告交換機

            /*
                String exchange, String type
                param1 :  交換機
                param2 : 交換機 型別   fanout 、 topic、direct、headers

                FANOUT 對應的模式是 釋出訂閱模式  publish/subscribe 模式

                其他的工作模式以後會將
                DIRECT  對應的是路由的工作模式
                TOPIC  對應的是萬用字元工作模式
                HEADERS   對應了 headers 的工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);





            //交換機和佇列繫結
            /**
             * String queue, String exchange, String routingKey
             * param1 :  佇列名稱
             * exchange :  交換機
             * routingKey : 路由key  後面講,先 用 ""代替
             */
            channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );




//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽佇列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  佇列名稱
             * param2 :   是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
             * param3 : 消費物件,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_Test2,true , consumer);
        }catch (Exception e){

        }
    }
}

 

細心的同學可能發現了,,生產者跟消費者的不同程式碼其實就是傳送訊息,接收訊息的方法而已 ....

生產者宣告佇列跟交換機,消費者也宣告各自的佇列跟交換機...  其實就是為了怕先啟動消費者沒有發現佇列跟交換機報錯而已...

核心程式碼的話  其實  就是 生成交換機,生成佇列 繫結交換機跟佇列  

 

 

 

測試的話

我們先啟動倆個消費者

 

 

 

然後我們啟動生產者

 

 

       

我們看列印結果,,倆臺消費者各自都處理了9條資訊 

 

其實這種方法比WORKQUEUES 工作模式強,因為多臺機器可以監聽一個佇列,也就是下圖所示,我們可以要倆個佇列,當然也可以建立一個佇列...建立多少佇列跟一個佇列多少消費者完全取決與我們

 

因為時間問題,,,,還有4種工作模式下次寫- -