1. 程式人生 > 其它 >RabbitMQ優先順序佇列機制(八)

RabbitMQ優先順序佇列機制(八)

一、什麼是優先順序佇列

在服務級級別的測試中需要考慮被執行任務的優先順序機制,也就是通過執行緒優先順序來進行,設定優先順序的目的

是在資源非常緊張的情況下,讓優先順序高的任務優先執行,而優先順序低的任務排後執行,當然這樣的一種設定機制

只能是非同步的模式下執行,如果是設計在同步的模式下執行,那這個設計從系統上來說就缺少巨集觀維度的思考。在

RabbitMQ的機制中也是提供了佇列的優先順序機制,這樣設計的目的也是在在生產者生產過快,而消費者消費不過

來的情況下,也就是資源在緊張或者說是在有限的情況下,設定的佇列優先順序高的任務它的訊息優先進行消費,而

優先順序低的訊息排後消費。當然,如果是在資源不緊張的情況下,設定優先順序其實沒多大的意義,因為這個時候優

先過來的訊息先進行消費,也談不上排隊的機制和優先順序的機制。

二、優先順序的實現機制

針對優先順序的設定,在消費者端進行設定,引數具體是x-max-priority,涉及的程式碼具體如下:

            //設定優先順序
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);
            channel.queueDeclare(queueName,true,false,false,arguments);

這樣消費者的程式碼執行後,在RabbitMQ的WEB控制檯,就可以看到該訊息佇列顯示設定的優先順序,具體如下所

示:

如上,我們演示了配置一個佇列的最大優先順序,其實核心的是需要在生產者傳送訊息的時候設定當前傳送任務的優先順序

涉及程式碼如下:

               AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());

在如上中,我們設定的傳送任務的優先順序是8。

三、優先順序佇列實戰程式碼

3.1、生產者程式碼

package com.example.rabbitmq.priority;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ProducerPriority
{
    private  static  final  String exchangeName="test_priority_exchange";
    private  static  final  String routyKey="priority.test";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        for(int i=0;i<3;i++)
        {
            Map<String,Object>  headers=new HashMap<String,Object>();
            headers.put("num",i);

            if (i==0)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else if (i==1)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(3)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(9)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }


        }

    }
}
在如上中,我們針對傳送的任務依據編號進行了優先順序的設定。

3.2、消費者程式碼

package com.example.rabbitmq.priority;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ConsumerPriority
{
    private static final String exchangeName = "test_priority_exchange";
    private  static  final String queueName="test_priority_queue";
    private  static  final  String routingKey="priority.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();

            //設定優先順序
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);

            channel.exchangeDeclare(exchangeName,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,arguments);
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

3.3、自定義消費者程式碼

package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    //開啟限流,所以需要設定channel
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------\n");
        System.err.println("the message received:"+new String(body));
        System.err.println("message priority:"+properties.getPriority());

    }
}

3.4、執行結果資訊

如上的程式碼執行後,當然當前的資源不存在緊張的情況,那麼就會按正常的順序消費,具體輸出結果如下:

如上,主要總結了訊息佇列優先順序這部分的總結和它的案例應用。感謝您的閱讀!

歡迎關注微信公眾號“Python自動化測試”