rabbitmq的java程式碼簡單使用
阿新 • • 發佈:2019-01-02
引入pom內容:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.4</version>
</dependency>
傳送端程式碼:
package com; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class Producer { private static String queueName = "queue2"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, true, false, false, null); for (int i = 0; i < 20; i++) { //傳送的訊息 String message = "hello world!"+i; //往佇列中發出一條訊息 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // Thread.sleep(1000); } //關閉頻道和連線 channel.close(); connection.close(); } }
接收端程式碼:
package com; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer { private static String queueName = "queue2"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。 channel.queueDeclare(queueName, true, false, false, null); System.out.println(Consumer.class.hashCode() + " [*] Waiting for messages. To exit press CTRL+C"); // 建立佇列消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 設定最大服務訊息接收數量 int prefetchCount = 1; channel.basicQos(prefetchCount); boolean ack = false; // 是否自動確認訊息被成功消費 channel.basicConsume(queueName, ack, consumer); // 指定消費佇列 while (true) { // nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); Thread.sleep(2000); } } }