1. 程式人生 > >RabbitMQ學習筆記二:rabbitmq傳送接收訊息Helloworld(Java版)

RabbitMQ學習筆記二:rabbitmq傳送接收訊息Helloworld(Java版)

一 引入rabbitmq java client

前面我們已經在本地(windows下)安裝配置好了RabbitMQ server。現在我們引入rabbitmq Java client。
在eclipse中建立一個maven專案,在pom.xml檔案中加入依賴

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.0.1</version>
</dependency
>

Alternatively, if you’re using Gradle:

dependencies {
  compile 'com.rabbitmq:amqp-client:4.0.1'
}

二 rabbitmq傳送訊息

2.1 建立一個訊息傳送類

package com.gta.goldnock.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory; /** * * @ClassName: Send * @Description: TODO(The sender will connect to RabbitMQ, send a single message, then exit.) * @author yuhuan.gao * @date 2017年1月19日 下午1:37:19 * */ public class Send { /* * 定義一個佇列“hello” */ private final static String QUEUE_NAME = "hello"
; public static void main(String[] argv) throws IOException, TimeoutException{ //建立一個連線 ConnectionFactory factory = new ConnectionFactory(); //連線本地,如果需要指定到服務,需在這裡指定IP factory.setHost("localhost"); Connection connection = factory.newConnection(); //建立一個通道 Channel channel = connection.createChannel(); //申明通道傳送訊息的佇列,把訊息傳送至訊息佇列‘hello’ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //Declaring a queue is idempotent - 如果佇列不存在則會建立一個佇列 //訊息內容為byte array, so可以自己隨意編碼 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //訊息傳送完成後,關閉通道和連線 channel.close(); connection.close(); } }

2.2 執行main,成功,但是不確定訊息是否傳送成功了。檢視rabbit-server的日誌

=INFO REPORT==== 19-Jan-2017::14:27:46 ===
accepting AMQP connection <0.19748.19> ([::1]:60445 -> [::1]:5672)

=INFO REPORT==== 19-Jan-2017::14:27:46 ===
closing AMQP connection <0.19748.19> ([::1]:60445 -> [::1]:5672)

2.3 通過外掛檢視服務管理

這裡寫圖片描述
message從3變成4了,訊息是傳送成功了。

Sending doesn't work!

If this is your first time using RabbitMQ and you don't see the "Sent" message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 1Gb free) and is therefore refusing to accept messages. Check the broker logfile to confirm and reduce the limit if necessary. The configuration file documentation will show you how to set disk_free_limit.

三 rabbitmq訊息接收

不同於sender,reciever是從RabbitMQ中推出訊息。sender每次傳送一條單一的訊息,而reciever一直執行並監控著服務,一有訊息就接收。

3.1 建立訊息接收類

package com.gta.goldnock.mq;


import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 
* @ClassName: Recv
* @Description: TODO(接收訊息類)
* @author yuhuan.gao
* @date 2017年1月19日 下午2:33:27
*
 */
public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        //建立一個連線
        ConnectionFactory factory = new ConnectionFactory();
        //連線本地,如果需要指定到服務,需在這裡指定IP
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        //建立一個通道
        Channel channel = connection.createChannel();
        //申明接收訊息的佇列,與傳送訊息佇列"hello"對應
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //The extra DefaultConsumer is a class implementing the Consumer interface 
        //we'll use to buffer the messages pushed to us by the server.
        Consumer consumer = new DefaultConsumer(channel){
            //重寫DefaultConsumer中handleDelivery方法,在方法中獲取訊息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, 
                    AMQP.BasicProperties properties, byte[] body) throws IOException{
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true,consumer);
    }
}
We're about to tell the server to deliver us the messages from the queue. Since it will push us messages asynchronously, we provide a callback in the form of an object that will buffer the messages until we're ready to use them. That is what a DefaultConsumer subclass does.

3.2 執行

執行main,執行成功,並且控制檯列印如下

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'

上面結果正如預期的,當訂閱‘hello’佇列訊息後,收到了之前Sender發的x訊息。繼續執行Sender的main,Recv在執行狀態下會自動收到訊息。