1. 程式人生 > 其它 >Java使用RabbitMQ生產和消費訊息

Java使用RabbitMQ生產和消費訊息

技術標籤:RabbitMQ# Java應用我の原創rabbitmq

RabbitMQ 是採用 Erlang 語言實現 AMQP(Advanced Message Queuing Protocol,高階訊息佇列協議)的訊息中介軟體,它最初起源於金融系統,用於在分散式系統中儲存轉發訊息。RabbitMQ 憑藉其高可靠、易擴充套件、高可用及豐富的功能特性收到越來越多企業的青睞。下面介紹Java中如何使用RabbitMQ生產和消費訊息。

使用Maven新增依賴檔案

在pom.xml配置資訊檔案中,新增 RabbitMQ 客戶端依賴:

<!-- RabbitMQ客戶端 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

1、生產者客戶端程式碼

首先生產者傳送一條訊息至 RabbitMQ 中,之後由消費者消費。下面是生產者客戶端程式碼:

package com.pjb.demo;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生產者客戶端
 * @author pan_junbiao
 **/
public class RabbitProducer
{
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routingkey_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final int PORT = 5672; //RabbitMQ 服務端預設埠號為 5672
    private static final String USER_NAME = "guest";
    private static final String PASSWORD = "guest";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection(); //建立連線
        Channel channel = connection.createChannel(); //建立通道
        //建立一個 type = DIRECT、持久化的、非自動刪除的交換器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
        //建立一個持久化、非排他的、非自動刪除的佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //將交換器與佇列通過路由鍵繫結
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //傳送一條持久化的資訊
        String message = "您好,歡迎訪問 pan_junbiao的部落格";
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
        //關閉資源
        channel.close();
        connection.close();
    }
}

2、消費者客戶端程式碼

這裡採用繼承DefaultConsumer 的方式來實現消費者。下面是消費者客戶端程式碼:

package com.pjb.demo;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 消費者客戶端
 * @author pan_junbiao
 **/
public class RabbitConsumer
{
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final int PORT = 5672; //RabbitMQ 服務端預設埠號為 5672
    private static final String USER_NAME = "guest";
    private static final String PASSWORD = "guest";

    public static void main(String[] args) throws IOException, TimeoutException,InterruptedException
    {
        Address[] addresses = new Address[]{new Address(IP_ADDRESS, PORT)};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection(addresses); //建立連線
        final Channel channel = connection.createChannel(); //建立通道
        channel.basicQos(64); //設定客戶端最多接收未被ack的訊息的個數
        Consumer consumer = new DefaultConsumer(channel)
        {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println("接收資訊:" + new String(body));
                try
                {
                    TimeUnit.SECONDS.sleep(1);
                }
                catch (InterruptedException ie)
                {
                    ie.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        //等待回撥函式執行完畢之後,關閉資源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

執行結果: