1. 程式人生 > >RabbitMQ原理及實現

RabbitMQ原理及實現

RabbitMQ的官方網站https://www.rabbitmq.com/

AMQP(Advanced Message Queuing Protocol),是一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。RabbitMQ就是這個協議的實現。

RabbitMQ在實現中包含下面幾個元件:

1.Server(broker): 接受客戶端連線,實現AMQP訊息佇列和路由功能的程序。

2.Virtual Host:其實是一個虛擬概念,類似於許可權控制組,可以通過命令分配給使用者Virtual Host的許可權,預設的guest使用者是管理員許可權,初始空間有/,一個Virtual Host裡面可以有若干個Exchange和Queue,但是許可權控制的最小粒度是Virtual Host

3.Exchange:接受生產者傳送的訊息,並根據Binding規則將訊息路由給伺服器中的佇列。ExchangeType決定了Exchange路由訊息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同型別的Exchange路由的行為是不一樣的。

4.Message Queue:訊息佇列,用於儲存還未被消費者消費的訊息。

5.Message: 由Header和Body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先順序是多少等。而Body是真正需要傳輸的APP資料。

6.Binding:Binding聯絡了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中儲存著Message Queue所需訊息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer傳送Message時指定,兩者的匹配方式由Exchange Type決定。

7.Connection:連線,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連線。

8.Channel:通道,僅僅建立了客戶端到Broker之間的連線後,客戶端還是不能傳送訊息的。需要為每一個Connection建立Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連線的建立和釋放都是十分昂貴的,如果一個客戶端每一個執行緒都需要與Broker互動,如果每一個執行緒都建立一個TCP連線,暫且不考慮TCP連線是否浪費,就算作業系統也無法承受每秒建立如此多的TCP連線。RabbitMQ建議客戶端執行緒之間不要共用Channel,至少要保證共用Channel的執行緒傳送訊息必須是序列的,但是建議儘量共用Connection

9.Command:AMQP的命令,客戶端通過Command完成與AMQP伺服器的互動來實現自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令傳送訊息,txSelect開啟一個事務,txCommit提交一個事務。

AMQP的三層協議

1.Module Layer,位於協議最高層,主要定義了一些供客戶端呼叫的命令,客戶端可以利用這些命令實現自己的業務邏輯,例如,客戶端可以通過queue.declare宣告一個佇列,利用consume命令獲取一個佇列中的訊息。

2.Session Layer,主要負責將客戶端的命令傳送給伺服器,在將伺服器端的應答返回給客戶端,主要為客戶端與伺服器之間通訊提供可靠性、同步機制和錯誤處理。

3.Transport Layer,主要傳輸二進位制資料流,提供幀的處理、通道複用、錯誤檢測和資料表示。

RabbitMQ使用場景

學習RabbitMQ的使用場景,來自官方教程:https://www.rabbitmq.com/getstarted.html

場景1:單傳送單接收

使用場景:簡單的傳送與接收,沒有特別的處理。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {
    
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
                
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    channel.close();
    connection.close();
  }
}
複製程式碼

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {
    
     private final static String QUEUE_NAME = "hello";


    public static void main(String[] argv) throws Exception {


    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();


//    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    

//    channel.basicConsume(QUEUE_NAME, true, consumer);
    
    while (true) {
    Consumer consumer = new DefaultConsumer(channel) {  
                @Override  
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)  
                        throws IOException {  
                    String message = new String(body, "UTF-8");  
                    System.out.println(" Consumer have received '" + message + "'");  
                }  
            };  
            channel.basicConsume(QUEUE_NAME, true, consumer);  
    }
  }
}
複製程式碼

注1:queueDeclare第一個引數表示佇列名稱、第二個引數為是否持久化(true表示是,佇列將在伺服器重啟時生存)、第三個引數為是否是獨佔佇列(建立者可以使用的私有佇列,斷開後自動刪除)、第四個引數為當所有消費者客戶端長時間連線斷開時是否自動刪除佇列、第五個引數為佇列的其他引數

注2:basicPublish第一個引數為交換機名稱、第二個引數為佇列對映的路由key、第三個引數為訊息的其他屬性、第四個引數為傳送資訊的主體

場景2:單傳送多接收

使用場景:一個傳送端,多個接收端,如分散式的任務派發。為了保證訊息傳送的可靠性,不丟失訊息,使訊息持久化了。同時為了防止接收端在處理訊息時down掉,只有在訊息處理完成後才傳送ack訊息。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
  private static final String TASK_QUEUE_NAME="task_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //分發資訊
        for (int i=0;i<10;i++){
            String message="Hello RabbitMQ"+i;
            channel.basicPublish("",TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            System.out.println("NewTask send '"+message+"'");
        }
        channel.close();
        connection.close();
    }
}
複製程式碼

傳送端和場景1不同點:

1、使用“task_queue”聲明瞭另一個Queue,因為RabbitMQ不容許宣告2個相同名稱、配置不同的Queue

2、使"task_queue"的Queue的durable的屬性為true,即使訊息佇列durable

3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使訊息durable

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
  
public class Worker {

    //設定和send 相同,
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //開啟connection 和 channel
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        /**
         * 宣告要消費的queue。可能消費都先被執行,在消費訊息之前要確保queue存在。
         * RabbitmqConfigure.DURABLE 設定訊息持久化,設定了持久化也不能保證持久化
         *
         */
        channel.queueDeclare("task_queue", true, false, false, null);
        //公平排程:告訴RabbitMQ,再同一時刻,不要傳送超過1條訊息給一個工作者(worker),直到它已經處理了上一條訊息並且作出了響應。
        channel.basicQos(1);
        System.out.println(" [*] Waiting for messages.");
        /**
         * consumer 接收訊息回撥方法,DefaultConsumer提供一個方法可以快取傳送的訊息,直到訊息被消費。
         */
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received '" + message + "'");
                try {
                    doWork(message);
                }catch(Exception e){
                    e.printStackTrace();
                } finally {
                    System.out.println("basicAck  Done");
                    //手動確認
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //設定自動確認為false
        channel.basicConsume("task_queue",  false, consumer);
    }
    private static void doWork(String task) throws InterruptedException {
        for (char ch: task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}
複製程式碼

接收端和場景1不同點:

1、使用“task_queue”宣告訊息佇列,並使訊息佇列durable

2、在使用channel.basicConsume接收訊息時使autoAck為false,即不自動會發ack,由channel.basicAck()在訊息處理完成後傳送訊息。

3、使用了channel.basicQos(1)保證在接收端一個訊息沒有處理完時不會接收另一個訊息,即接收端傳送了ack後才會接收下一個訊息。在這種情況下發送端會嘗試把訊息傳送給下一個not busy的接收端。

注意點:

1)It's a common mistake to miss the basicAck. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

2)Note on message persistence

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in atransaction.

3)Note about queue size

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

4)RabbitMQ allows you to set Time To Live for both messages and queues. https://www.rabbitmq.com/ttl.html

場景3:Publish/Subscribe

使用場景:釋出、訂閱模式,傳送端傳送廣播訊息,多個接收端接收。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getMessage(String[] strings){
    if (strings.length < 1)
            return "info: Hello World!";
    return joinStrings(strings, " ");
  }
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
複製程式碼

傳送端:

傳送訊息到一個名為“logs”的exchange上,使用“fanout”方式傳送,即廣播訊息,不需要使用queue,傳送端不需要關心誰接收。

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
    }
  }
}
複製程式碼

接收端:

1、宣告名為“logs”的exchange的,方式為"fanout",和傳送端一樣。

2、channel.queueDeclare().getQueue();該語句得到一個隨機名稱的Queue,該queue的型別為non-durable、exclusive、auto-delete的,將該queue繫結到上面的exchange上接收訊息。

3、注意binding queue的時候,channel.queueBind()的第三個引數Routing key為空,即所有的訊息都接收。如果這個值不為空,在exchange type為“fanout”方式下該值被忽略!

場景4:Routing (按路線傳送接收)

使用場景:傳送端按routing key傳送訊息,不同的接收端按不同的routing key接收訊息。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    String severity = getSeverity(argv);
    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

    channel.close();
    connection.close();
  }
  
  private static String getSeverity(String[] strings){
    if (strings.length < 1)
            return "info";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
複製程式碼

傳送端和場景3的區別:

1、exchange的type為direct

2、傳送訊息的時候加入了routing key

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();
    
    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }
    
    for(String severity : argv){    
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      String routingKey = delivery.getEnvelope().getRoutingKey();

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");   
    }
  }
}
複製程式碼

接收端和場景3的區別:

在繫結queue和exchange的時候使用了routing key,即從該exchange上只接收routing key指定的訊息。

場景5:Topics (按topic傳送接收)

使用場景:傳送端不只按固定的routing key傳送訊息,而是按字串“匹配”傳送,接收端同樣如此。

Producer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");

      String routingKey = getRouting(argv);
      String message = getMessage(argv);

      channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
  
  private static String getRouting(String[] strings){
    if (strings.length < 1)
            return "anonymous.info";
    return strings[0];
  }

  private static String getMessage(String[] strings){ 
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings, " ", 1);
  }
  
  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}
複製程式碼

傳送端和場景4的區別:

1、exchange的type為topic

2、傳送訊息的routing key不是固定的單詞,而是匹配字串,如"*.lu.#",*匹配一個單詞,#匹配0個或多個單詞。

Consumer:

複製程式碼
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
  
      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.exchangeDeclare(EXCHANGE_NAME, "topic");
      String queueName = channel.queueDeclare().getQueue();
 
      if (argv.length < 1){
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
      }
    
      for(String bindingKey : argv){    
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
      }
    
      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(queueName, true, consumer);

      while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        String routingKey = delivery.getEnvelope().getRoutingKey();

        System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");   
      }
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}
複製程式碼

接收端和場景4的區別:

1、exchange的type為topic

2、接收訊息的routing key不是固定的單詞,而是匹配字串。


轉載自http://www.cnblogs.com/luxiaoxun/p/3918054.html<