RabbitMQ 學習開發筆記
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel,這三個都是RabbitMQ對外提供的API中最基本的對象。不管是服務器端還是客戶端都會首先創建這三類對象。
ConnectionFactory為Connection的制造工廠。
Connection是與RabbitMQ服務器的socket鏈接,它封裝了socket協議及身份驗證相關部分邏輯。
Channel是我們與RabbitMQ打交道的最重要的一個接口,大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
Queue
Queue(隊列)是RabbitMQ的內部對象,用於存儲消息
RabbitMQ中的消息都只能存儲在Queue中,生產者生產消息並最終投遞到Queue中,消費者可以從Queue中獲取消息並消費。
隊列是有Channel聲明的,而且這個操作是冪等的。同名的隊列多次聲明也只會創建一次。我們發送消息就是想這個聲明的隊列裏發送消息
工作隊列的主要思想是不用等待資源密集型的任務處理完成,
為了確保消息或者任務不會丟失,rabbitmq 支持消息確信 ACK。ACK機制是消費者端從rabbitmq收到消息並處理完成後,反饋給rabbitmq,rabbitmq收到反饋信息後將消息從隊列中刪除
如果rabbitmq向消費者改善消息時,消費者服務器掛了,消息也不會超時,即使一個消息需要非常長的時間處理,也不會導致消息超時,永遠不會從rabbitmq中刪除,
忘記通過basicAck返回確認信息是個嚴重的錯誤
rabbitmq不允許重新定義一個已有的隊列信息
QueueingConsumer
隊列消費者,用於監聽隊列中的消息。調用nextDelivery方法時,內部實現就是調用隊列的take方法。該方法的作用:獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。說白了就是如果沒有消息,就處於阻塞狀態。
RabbitMQ 筆記-Exchanges
Procuder Publish的Message進入了Exchange。接著通過“routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue裏。queue也是通過這個routing keys來做的綁定。
有三種類型的Exchanges:direct, fanout,topic。 每個實現了不同的路由算法(routing algorithm)。
Direct exchange: 如果 routing key 匹配, 那麽Message就會被傳遞到相應的queue中。其實在queue創建時,它會自動的以queue的名字作為routing key來綁定那個exchange。
生產者:
// 聲明交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 發送消息 for (String severity : routingKeys) { String message = "Send the message level: " + severity; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘"); } 消費者 // 聲明交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 獲取匿名隊列名稱 String queueName = channel.queueDeclare().getQueue(); // 根據路由關鍵字進行多重綁定 for (String severity : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity); }
Fanout exchange: 會向響應的queue廣播。
生產者 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分發消息 for (int i=0; i<5; i++) { String message = "Hello World!" + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘"); } 消費者 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");
Topic exchange: 對key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。
生產者 // 聲明一個匹配模式的交換器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 待發送的消息 String routingKeys[] = new String[]{ "quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit" }; // 發送消息 for (String severity : routingKeys) { String message = "From "+severity+" routingKey‘ s message!"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println("TopicSend [x] Sent ‘" + severity + "‘:‘" + message + "‘"); } 消費者 // 聲明一個匹配模式的交換器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); // 路由關鍵字 String routingKeys[] = new String[] { "*.orange.*" }; // 綁定路由關鍵字 for (String bindingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey); }
匿名: 直接發送到queue。
生產者 for (int i=0; i<5; i++) { String message = "hello world! " + i; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘"); } 消費者 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
RabbitMQ 筆記-RPC
RabbitMQ中實現RPC的機制是:
- 客戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協議中定義了14中properties,這些屬性會隨著消息一起發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成後將通知我的消息發送到這個Queue中)和correlationId(此次請求的標識號,服務器處理完成後需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
- 服務器端收到消息並處理
- 服務器端處理完消息後,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
- 客戶端之前已訂閱replyTo指定的Queue,從中收到服務器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理
public class RPCClient { private static final String RPC_QUEUE_NAME = "rpc_queue"; private Connection connection; private Channel channel; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("192.168.65.136"); factory.setUsername("rabbitmq"); factory.setPassword("123456"); // 創建一個連接 connection = factory.newConnection(); // 創建一個頻道 channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //為每一個客戶端獲取一個隨機的回調隊列 replyQueueName = channel.queueDeclare().getQueue(); //為每一個客戶端創建一個消費者(用於監聽回調隊列,獲取結果) consumer = new QueueingConsumer(channel); //消費者與隊列關聯 channel.basicConsume(replyQueueName, true, consumer); } /** * 獲取斐波列其數列的值 * * @param message * @return * @throws Exception */ public String call(String message) throws Exception{ String response = null; String corrId = java.util.UUID.randomUUID().toString(); //設置replyTo和correlationId屬性值 BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); //發送消息到rpc_queue隊列 channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes()); while (true) { System.out.println("OK?"); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println("OK"); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(),"UTF-8"); break; } } return response; } public static void main(String[] args) throws Exception { RPCClient fibonacciRpc = new RPCClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result); } }
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); // 設置MabbitMQ所在主機ip或者主機名 factory.setHost("192.168.65.136"); factory.setUsername("rabbitmq"); factory.setPassword("123456"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個頻道 Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //限制:每次最多給一個消費者發送1條消息 channel.basicQos(1); //為rpc_queue隊列創建消費者,用於處理請求 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //獲取請求中的correlationId屬性值,並將其設置到結果消息的correlationId屬性中 BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); //獲取回調隊列名字 String callQueueName = props.getReplyTo(); String message = new String(delivery.getBody(),"UTF-8"); System.out.println(" [.] fib(" + message + ")"); //獲取結果 String response = "" + fib(Integer.parseInt(message)); //先發送回調結果 channel.basicPublish("", callQueueName, replyProps,response.getBytes()); //後手動發送消息反饋 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } /** * 計算斐波列其數列的第n項 * * @param n * @return * @throws Exception */ private static int fib(int n) throws Exception { if (n < 0) throw new Exception("參數錯誤,n必須大於等於0"); if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } }
RabbitMQ 學習開發筆記