rabbitmq 的hello world程式
阿新 • • 發佈:2020-07-14
一、引入rabbitmq依賴
首先在pom中引入rabbitmq的依賴,如下所示:
<!--引入rabbitmq依賴-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
二、建立訊息的生產者和消費者
建立訊息的生產者,如下程式碼所示:
package com; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 訊息生產者 */ public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException,TimeoutException { //建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //由連線工廠建立連線 Connection conn = factory.newConnection(); //通過連線建立通道 Channel channel = conn.createChannel(); //宣告交換器以及exchange型別為direct String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //設定路由鍵 String routingKey = "jasonKey"; //宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //要傳送到queue的訊息 String message = "jason"; //釋出訊息 channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生產者生產了訊息:" + message ); //關閉通道和連線 channel.close(); conn.close(); } }
建立訊息的消費者,如下程式碼所示:
package com; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 訊息消費者 * */ public class Recive { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //建立到代理伺服器到連線 final Connection conn = factory.newConnection(); //獲得通道 final Channel channel = conn.createChannel(); //宣告交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "jasonKey"; //繫結佇列,通過鍵 hola 將佇列和交換器繫結起來 channel.queueBind(QUEUE_NAME, exchangeName, routingKey); //消費訊息 channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // properties 這是訊息的一些相關屬性,例如內容編碼、內容型別等 String routingKey = envelope.getRoutingKey(); System.out.println("消費的路由鍵:" + routingKey); long deliveryTag = envelope.getDeliveryTag(); //確認訊息 channel.basicAck(deliveryTag, false); String bodyStr = new String(body, "UTF-8"); if("jason".equals(bodyStr)){ System.out.println("消費者從佇列中獲得了訊息,並且可以根據這個訊息進行一些業務操作:" + bodyStr); } try { channel.close(); conn.close(); } catch (TimeoutException e) { e.printStackTrace(); } } }); } }
三、執行結果
先執行生產者程式,後執行消費者程式,執行程式結果如下:
四、總結
消費者和生產者,關鍵在於訊息的傳遞以及消費。
生產者把訊息傳送到佇列,消費者從佇列中取出訊息並進行消費,這其中還需要一個監聽器,來實時監聽這個佇列,如果裡面有了訊息,消費者就進行消費。
生產者和queue之間通過exchange來進行關聯,消費者也是一樣的。
這就需要非常清楚 routing key、exchange、binding key、queue 的概念以及它們之間的關係。還需要清楚的知道訊息的分發策略,即exchange的四種類型(direct、fanout、topic、headers)以及它們的區別,各自有什麼不同。