1. 程式人生 > 實用技巧 >rabbitmq 的hello world程式

rabbitmq 的hello world程式

一、引入rabbitmq依賴

首先在pom中引入rabbitmq的依賴,如下所示:

 <!--引入rabbitmq依賴-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

二、建立訊息的生產者和消費者

建立訊息的生產者,如下程式碼所示:

package com;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 訊息生產者
 */
public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException,TimeoutException {
        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //由連線工廠建立連線
        Connection conn = factory.newConnection();
        //通過連線建立通道
        Channel channel = conn.createChannel();
        //宣告交換器以及exchange型別為direct
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        //設定路由鍵
        String routingKey = "jasonKey";
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //要傳送到queue的訊息
        String message = "jason";
        //釋出訊息
        channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));

        System.out.println("生產者生產了訊息:" + message );

        //關閉通道和連線
        channel.close();
        conn.close();
    }
}

建立訊息的消費者,如下程式碼所示:

package com;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 訊息消費者
 *
 */
public class Recive {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        //建立到代理伺服器到連線
        final Connection conn = factory.newConnection();
        //獲得通道
        final Channel channel = conn.createChannel();
        //宣告交換器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        String routingKey = "jasonKey";
        //繫結佇列,通過鍵 hola 將佇列和交換器繫結起來
        channel.queueBind(QUEUE_NAME, exchangeName, routingKey);

        //消費訊息
        channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // properties 這是訊息的一些相關屬性,例如內容編碼、內容型別等
                String routingKey = envelope.getRoutingKey();
                System.out.println("消費的路由鍵:" + routingKey);
                long deliveryTag = envelope.getDeliveryTag();
                //確認訊息
                channel.basicAck(deliveryTag, false);
                String bodyStr = new String(body, "UTF-8");
                if("jason".equals(bodyStr)){
                    System.out.println("消費者從佇列中獲得了訊息,並且可以根據這個訊息進行一些業務操作:" + bodyStr);
                }

                try {
                    channel.close();
                    conn.close();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }

            }
        });

    }
}

三、執行結果

先執行生產者程式,後執行消費者程式,執行程式結果如下:

四、總結

消費者和生產者,關鍵在於訊息的傳遞以及消費。
生產者把訊息傳送到佇列,消費者從佇列中取出訊息並進行消費,這其中還需要一個監聽器,來實時監聽這個佇列,如果裡面有了訊息,消費者就進行消費。
生產者和queue之間通過exchange來進行關聯,消費者也是一樣的。
這就需要非常清楚 routing key、exchange、binding key、queue 的概念以及它們之間的關係。還需要清楚的知道訊息的分發策略,即exchange的四種類型(direct、fanout、topic、headers)以及它們的區別,各自有什麼不同。