Java使用RabbitMQ生產和消費訊息
阿新 • • 發佈:2021-01-01
技術標籤:RabbitMQ# Java應用我の原創rabbitmq
RabbitMQ 是採用 Erlang 語言實現 AMQP(Advanced Message Queuing Protocol,高階訊息佇列協議)的訊息中介軟體,它最初起源於金融系統,用於在分散式系統中儲存轉發訊息。RabbitMQ 憑藉其高可靠、易擴充套件、高可用及豐富的功能特性收到越來越多企業的青睞。下面介紹Java中如何使用RabbitMQ生產和消費訊息。
使用Maven新增依賴檔案
在pom.xml配置資訊檔案中,新增 RabbitMQ 客戶端依賴:
<!-- RabbitMQ客戶端 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
1、生產者客戶端程式碼
首先生產者傳送一條訊息至 RabbitMQ 中,之後由消費者消費。下面是生產者客戶端程式碼:
package com.pjb.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生產者客戶端 * @author pan_junbiao **/ public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = "routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "127.0.0.1"; private static final int PORT = 5672; //RabbitMQ 服務端預設埠號為 5672 private static final String USER_NAME = "guest"; private static final String PASSWORD = "guest"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); //建立連線 Channel channel = connection.createChannel(); //建立通道 //建立一個 type = DIRECT、持久化的、非自動刪除的交換器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); //建立一個持久化、非排他的、非自動刪除的佇列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //將交換器與佇列通過路由鍵繫結 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); //傳送一條持久化的資訊 String message = "您好,歡迎訪問 pan_junbiao的部落格"; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); //關閉資源 channel.close(); connection.close(); } }
2、消費者客戶端程式碼
這裡採用繼承DefaultConsumer 的方式來實現消費者。下面是消費者客戶端程式碼:
package com.pjb.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * 消費者客戶端 * @author pan_junbiao **/ public class RabbitConsumer { private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "127.0.0.1"; private static final int PORT = 5672; //RabbitMQ 服務端預設埠號為 5672 private static final String USER_NAME = "guest"; private static final String PASSWORD = "guest"; public static void main(String[] args) throws IOException, TimeoutException,InterruptedException { Address[] addresses = new Address[]{new Address(IP_ADDRESS, PORT)}; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(addresses); //建立連線 final Channel channel = connection.createChannel(); //建立通道 channel.basicQos(64); //設定客戶端最多接收未被ack的訊息的個數 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收資訊:" + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException ie) { ie.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); //等待回撥函式執行完畢之後,關閉資源 TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); } }
執行結果: