1. 程式人生 > 實用技巧 >rabbimq工作模型

rabbimq工作模型

RabbitMQ

上圖是rabbitmq的圖形管理介面:

rabbitmq的基本元件:
  • ConCnections:客戶端連線rabbitmq伺服器都需要和伺服器建立連線(connections)
  • Channels:通道,客戶端與伺服器傳送接收訊息都需要通過通道傳輸。建立連線後就可以建立通道,通道可以繫結交換機或佇列來發送生產者訊息,可以繫結交換機和佇列來消費訊息。
  • Eqxchanges:交換機,相比於只用佇列來交換資訊,交換機可以實現更多種訊息消費模式。
  • Queues:訊息佇列,訊息放在佇列中,等消費者來消費。
虛擬主機

為了讓各個使用者可以互不干擾的工作,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。其實就是一個獨立的訪問路徑,不同使用者使用不同路徑,各自有自己的佇列、交換機,互相不會影響。(就是在建立連線的時候還要新增一個VirtualHost的引數,不同的程式使用不同的虛擬主機就可以相互之間的交換機,佇列都互不影響)

可以通過下圖步驟新增虛擬主機:

建立連線
public class RabbitMqUtils {
    private static ConnectionFactory connectionFactory;
    static {
        //新建一個連線工程
        connectionFactory=new ConnectionFactory();
        //設定ip
        connectionFactory.setHost("172.18.1.53");
        //設定埠
        connectionFactory.setPort(5672);
        //設定虛擬主機
        connectionFactory.setVirtualHost("/ems");
        //設定使用者名稱
        connectionFactory.setUsername("ems");
        //設定密碼
        connectionFactory.setPassword("123");
    }
    //定義提供連線物件的方法
    public static Connection getConnection(){
        try {
            //通過newConnection()方法就可以建立一個連線
            return connectionFactory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }
    //關閉連線方法
    public static void closeConnectionAndChanel(Channel channel,Connection conn){
        try {
            if(channel!=null){
                channel.close();
            }
            if (conn!=null){
                conn.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
RabitMQ的訊息模型

1,hello模型(直連)

在上圖的模型中,有以下概念:

  • P:生產者,也就是要傳送訊息的程式。

  • C:消費者:訊息的接受者,會一直等待訊息到來。

  • queue:訊息佇列,圖中紅色部分。類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。

    provider.java

package com.example.demo.helloworld;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author zdl
 * @create 2020/8/6 11:41
 */
public class Provider {

    //生產訊息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();
        //獲取連結通道
        Channel channel = connection.createChannel();
        //通道繫結對應的訊息佇列
        //引數1,佇列名稱,不存在佇列將自動建立佇列
        //引數2,用來定義佇列是否啟動持久化
        //引數3,exclusive 是否獨佔佇列,只能被當前通道繫結
        //引數4,autoDelete,是否在消費完成後並且消費者斷開連線將自動刪除佇列,被消費者消費完,佇列沒有其他元素就刪除佇列
        //引數5,額外引數
        channel.queueDeclare("hello",false,false,false,null);
        //釋出訊息
        //引數1:交換機名稱(連佇列就為"") 引數2:佇列名稱(如果連交換級就為"") 引數3:傳遞訊息額外設定 引數4:訊息的具體內容
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

consumer.java

package com.example.demo.helloworld;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException
/**
 * @author zdl
 * @create 2020/8/6 13:40
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException{
        Connection connection = RabbitMqUtils.getConnection();
        //獲取連結通道
        Channel channel = connection.createChannel();
        //通道繫結對應的訊息佇列
        //引數1,佇列名稱,不存在佇列將自動建立佇列
        //引數2,用來定義佇列是否啟動持久化(僅佇列,不包含訊息)
        //引數3,exclusive 是否獨佔佇列
        //引數4,autoDelete,是否在消費完成後自動刪除佇列
        //引數5,額外引數
        channel.queueDeclare("hello",false,false,false,null);
        //消費訊息
        //引數1:消費佇列名稱
        //引數2:開啟訊息自動確認機制
        //引數3:消費時的回撥介面(當有訊息可以消費時就會呼叫該消費者的handleDelivery方法來進行消費)
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override  //最後一個引數;訊息佇列中取出的訊息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body)+"=====================");
            }
        });
    }

}

2,Work queues(任務模型)

當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大於訊息的消費速度。長此以往,訊息就會堆積越來越多,無法及時處理。此時就可以使用work 模型:讓多個消費者繫結到一個佇列,共同消費佇列中的訊息。佇列中的訊息一旦消費,就會消失,因此任務是不會被重複執行的。

角色:

  • P:生產者:任務的釋出者
  • C1:消費者-1,領取任務並且完成任務,假設完成速度較慢
  • C2:消費者-2:領取任務並完成任務,假設完成速度快

生產者

package com.example.demo.workquene;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 14:52
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();

        channel.queueDeclare("work",true,false,false,null);
        for (int i=0;i<20;i++){
            channel.basicPublish("","work",null,(i+"hollo work quene").getBytes());
        }
        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消費者1

package com.example.demo.workquene;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 15:07
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();
        //每次只消費一條資訊
        channel.basicQos(1);
        channel.queueDeclare("work",true,false,false,null);
        //引數2:是否自動確認,不自動確認的話就處理完再確認然後才能再消費
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println("消費者-1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消費者2

package com.example.demo.workquene;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 15:31
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException{
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
                    IOException {
                System.out.println("消費者-2:"+new String(body));
                //引數1:確認佇列中具體那個訊息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

在面兩個消費者中,更改了1:每次只消費一條消費,2:取消訊息自動確認。這麼做是為了實現按勞分配。

如果是自動確認的話,RabbitMQ將按順序將每個訊息傳送給下一個使用者。平均而言,每個消費者都會收到相同數量的訊息。但是我們的消費者1的消費能力是比較差的,消費者2都消費完10條了,消費者1一條還沒消費完,所以這是不符合我們的期望的,我們希望消費能力強的可以多消費點資訊。

所以我們需要關閉訊息自動確認,並在消費者消費完後呼叫channel.basicAck(envelope.getDeliveryTag(),false);來手動確認訊息,這樣才會在消費完一條訊息後才會進行下一條訊息的消費。

fanout(廣播)

在廣播模式下,訊息傳送流程是這樣的:

  • 可以有多個消費者
  • 每個消費者有自己的queue(佇列)
  • 每個佇列都要繫結到Exchange(交換機)
  • 生產者傳送的訊息,只能傳送到交換機,交換機來決定要發給哪個佇列,生產者無法決定。
  • 交換機把訊息傳送給繫結過的所有佇列
  • 佇列的消費者都能拿到訊息。實現一條訊息被多個消費者消費

生產者

package com.example.demo.fanout;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 16:12
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        //將通道宣告指定交換機 引數1:交換機名稱 引數2:交換機型別 fanout:廣播,一條訊息多個消費者同時消費
        channel.exchangeDeclare("logs","fanout");
        //釋出訊息
        channel.basicPublish("logs","",null,"hello".getBytes());
        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消費者1

package com.example.demo.fanout;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 16:22
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel=connection.createChannel();
        channel.exchangeDeclare("logs","fanout");
        //臨時佇列
        String queue = channel.queueDeclare().getQueue();
        //繫結交換級和佇列
        channel.queueBind(queue,"logs","");
        //消費訊息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1"+new String(body));
            }
        });

    }
}

消費者2和消費者3與消費者1類似

結果:

Direct(訂閱模型)

在Fanout模式中,一條訊息,會被所有訂閱的佇列都消費。但是,在某些場景下,我們希望不同的訊息被不同的佇列消費。這時就要用到Direct型別的Exchange。

在Direct模型下:

  • 佇列與交換機的繫結,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 訊息的傳送方在 向 Exchange傳送訊息時,也必須指定訊息的 RoutingKey
  • Exchange不再把訊息交給每一個繫結的佇列,而是根據訊息的Routing Key進行判斷,只有佇列的Routingkey與訊息的 Routing key完全一致,才會接收到訊息

角色

  • P:生產者,向Exchange傳送訊息,傳送訊息時,會指定一個routing key。
  • X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給 與routing key完全匹配的佇列
  • C1:消費者,其所在佇列指定了需要routing key 為 error 的訊息
  • C2:消費者,其所在佇列指定了需要routing key 為 info、error、warning 的訊息

生產者:

package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 17:13
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");
        String routingKey="error";
		//通過引數2指定routingKey
        channel.basicPublish("logs_direct",routingKey,null,("這是directm模型釋出的基於route key:["+routingKey+"] 傳送的訊息").getBytes());

        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消費者1:

package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 17:22
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName="logs_direct";
        channel.exchangeDeclare(exchangeName,"direct");
        //獲取臨時佇列
        String queue = channel.queueDeclare().getQueue();
        //該佇列接收routingKey為"info","error","warning"其中任何一個的訊息
        channel.queueBind(queue,"logs_direct","info");
        channel.queueBind(queue,"logs_direct","error");
        channel.queueBind(queue,"logs_direct","warning");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });
    }
}

消費者2:

  
package com.example.demo.direct;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 17:28
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");
        String queue = channel.queueDeclare().getQueue();
		//基於route_key幫定佇列和交換機,第三個引數為routingKey,則該佇列只接收routingKey為"error"的訊息
        channel.queueBind(queue,exchangeName,"error");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
            }
        });
    }
}

測試生產者傳送Route key為error的訊息時

測試生產者傳送Route key為info的訊息時

Topic 模式 萬用字元訂閱

Topic型別的ExchangeDirect相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在繫結Routing key 的時候使用萬用字元!這種模型Routingkey 一般都是由一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert

# 統配符
		* (star) can substitute for exactly one word.    匹配不多不少恰好1個詞
		# (hash) can substitute for zero or more words.  匹配一個或多個詞
# 如:
		audit.#    匹配audit.irs.corporate或者 audit.irs 等
   		audit.*   只能匹配 audit.irs

生產者

package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 18:11
 */
public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        //指定交換機及路由模式
        channel.exchangeDeclare("topic","topic");
        //動態路由key
        String routekey = "user.save.fjie";
        //釋出訊息
        channel.basicPublish("topic",routekey,null,("這是路由中的動態訂閱模型,route key: ["+routekey+"]").getBytes());

        RabbitMqUtils.closeConnectionAndChanel(channel,connection);
    }
}

消費者1:

package com.example.demo.topic;
import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @author zdl
 * @create 2020/8/6 18:14
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        //繫結交換機
        channel.exchangeDeclare("topic","topic");
	//臨時佇列
        String queue = channel.queueDeclare().getQueue();
        //使用萬用字元繫結routingKey
        channel.queueBind(queue,"topic","user.*");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });

    }
}

消費者2:

package com.example.demo.topic;

import com.example.demo.utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author zdl
 * @create 2020/8/6 18:21
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
		//繫結交換機
        channel.exchangeDeclare("topic","topic");
		//臨時佇列
        String queue = channel.queueDeclare().getQueue();
		//使用萬用字元繫結routingKey
        channel.queueBind(queue,"topic","user.#");
		//消費
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
            }
        });

    }
}

結果: