1. 程式人生 > >rabbitmq--工作佇列(單發多收)模型

rabbitmq--工作佇列(單發多收)模型

上一篇,我們介紹了rabbitmq中最簡單的一種模型——收發模型,收、發都只有一方,即一個應用,並且不涉及到exchange (實際上這種模型中也是有exchange的,exchange的型別為direct,且名稱為空字串"",只不過這裡的exchange是透明的,故常被我們所忽略了,好像收、發之間是直接連通的)。

 本篇我們介紹一下rabbitmq中的另一種模型——工作佇列模型,也叫單發多收模型,它是收發模型的延伸。它支援一方傳送訊息,由多方共同接收處理,並且支援給多個消費方設定不同的QOS 。 如圖:

例如,一個大系統中每天都與產生大量的任務(訊息),需要釋出給各個子系統去共同處理,這時候就可以採用工作佇列模型 。

示例程式碼:  

任務生產(釋出)者: 

package com.tingcream.rabbitmq.workQueues;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
 * (AMQP DEFAULT)  
 * @author jelly
 *
 */
public class NewTask {
	
  private final static String QUEUE_NAME = "direct_task_queue";
	
  public static void main(String[] args) throws Exception {
	  
		
		ConnectionFactory factory = new ConnectionFactory();
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
		
		
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
 
		boolean  durable=true ;
		channel.queueDeclare(QUEUE_NAME, durable, false,false, null);    
	    //String message = "Hello rabbitmq";
		//exchagne名稱預設為 (AMQP DEFAULT)  direct
		//channel.basicPublish(exchange, routingKey, props, body);
	    //channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		//System.out.println(" [x] Sent '" + message + "'");
		
		for(int i=0;i<20;i++) {
			String message="hello direct task message E "+i;
			//釋出訊息 
			channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
		
		//關閉連線
		channel.close();
		connection.close();
	
  }
	
}

任務處理(消費)者A:

package com.tingcream.rabbitmq.workQueues;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class WorkA {
	
	private final static String QUEUE_NAME = "direct_task_queue";
	

    public static void main(String[] args) throws Exception {
    	
        ConnectionFactory factory = new ConnectionFactory();
          
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
		
		
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
      
    //  channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
		boolean  durable=true ;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//channel向伺服器宣告一個佇列,設定durable為true,則當rabbitmq 伺服器重啟時,佇列不會丟失
        System.out.println("WorkerA  Waiting for messages");
 
        //每次從佇列獲取的message的數量
         channel.basicQos(1);
     //   prefetchCount maximum number of messages that the server will deliver, 0 if unlimited

     
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("WorkerA  Received :" + message );
                try {
                     
                    doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("WorkerA Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);//當訊息處理完畢後 在finally中 回覆一個ack  手動ack
                }
            }
        };
        boolean autoAck=false;
         
        //訊息消費完成確認   不自動 ack  非自動ack
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) {
    	try {
			Thread.sleep(5000);// 暫停2秒鐘
		} catch (InterruptedException e) {
			e.printStackTrace();
		} 
    }

}

任務處理(消費)者B:

package com.tingcream.rabbitmq.workQueues;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class WorkB {
	
	private final static String QUEUE_NAME = "direct_task_queue";
	

    public static void main(String[] args) throws Exception {
    	
        ConnectionFactory factory = new ConnectionFactory();
          
		//主機  埠  vhost 使用者名稱 密碼
		factory.setHost("192.168.9.102");
		factory.setUsername("rabbitmq");
		factory.setPassword("rabbitmq123");
		factory.setPort(AMQP.PROTOCOL.PORT);
		factory.setVirtualHost("/");
		
		
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
      
    //  channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
		boolean  durable=true ;//channel向伺服器宣告一個佇列,設定durable為true,則當rabbitmq 伺服器重啟時,佇列不會丟失
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        System.out.println("WorkerB  Waiting for messages");
 
        //每次從佇列獲取的message數量
        channel.basicQos(2);
     //   prefetchCount maximum number of messages that the server will deliver, 0 if unlimited

     
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("WorkerB  Received :" + message );
                try {
                     
                    doWork(message);
                }catch (Exception e){
                    channel.abort();
                }finally {
                    System.out.println("WorkerB Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck=false;
         
        //訊息消費完成確認   不自動 ack  非自動ack
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) {
    	try {
			Thread.sleep(3000);// 暫停2秒鐘
		} catch (InterruptedException e) {
			 
			e.printStackTrace();
		} 
    }

}

 

注意:

1 、任務消費者A和B 可以共同消費處理任務生產者(NewTask)釋出的訊息。消費者A和B的qos可以根據伺服器效能的不同設定不同的qos,例如本例中消費者A的qos設定為了1,而消費者B的qos設定為了2 。

2 、consumer中有個重要的動作就是設定autoAck為false (預設為true),表示需要消費者程式手動ack。注意一旦consumer將autoAck設定為false之後,一定要記得處理完訊息之後,向伺服器傳送確認訊息,否則伺服器將會一直轉發該訊息。當設定了autoAck為false,若消費者A意外宕機,處理的訊息還沒有ack回去,則rabbitmq伺服器會將訊息轉給消費者B。

3、生產者釋出訊息時,設定了一個引數durable為true。這樣做的好處是,當這樣設定之後,伺服器收到訊息後就會立刻將訊息寫入到硬碟,就可以防止突然伺服器掛掉,而引起的資料丟失了。  但是伺服器如果剛收到訊息,還沒來得及寫入到硬碟,就掛掉了,這樣還是無法避免訊息的丟失。

4、 在本例(工作佇列模型)中,我們還是未看到exchange的身影,這一點同上一種模型——收發模型是一樣的。  實際上,在本例(工作佇列模型)中,是存在exchange的,exchange的名稱為空字串"",型別為direct ,同樣對我們是透明的,也被我們所忽略了。

實際上,我們介紹rabbitmq的收發模型和工作佇列模型所採用的exchange就是圖中縮圈出的這種。

 

關於exchange的更多內容會在我們後面的篇章中講到。