RabbitMQ for Java【入門教程 1】
阿新 • • 發佈:2018-12-17
RabbitMQ是訊息代理。從本質上說,它接受來自生產者的資訊,並將它們傳遞給消費者。在兩者之間,它可以根據你給它的路由,緩衝規則進行傳遞訊息。 如果你的工作中需要用到RabbitMQ,那麼我建議你先在電腦上安裝好RabbitMQ伺服器,然後開啟eclipse,跟這我的教程一步步的學習RabbitMQ,這樣你會對RabbitMQ有一個全面的認識,而且能打好一個很好的基礎。
準備工作:安裝erlang,RabbitMQ等工具 配置相應的環境變數 這裡就不多說!
一:helloWorld的實現
P消費者將訊息推送到queue佇列中,佇列將訊息推送給消費者或者快取到本地快取(取決於消費者的狀態) C消費者向queue取相關的資訊或者queue推送給消費者
注意:
生產者,消費者和佇列(RabbitMQ)不必部署在同一臺機器上。實際在生產環境的大多數應用中,他們都是分開部署的
程式碼實現:
首先加入依賴包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.3</version> </dependency>
接下來建立連線rabbitMq伺服器的工具類
package wxtest.rabbitMq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 設定MabbitMQ, 主機ip或者主機名 factory.setHost("localhost"); // 建立一個連線 Connection connection = factory.newConnection(); return connection; } }
生產者:
package wxtest.rabbitMq; import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 訊息傳送方 **/ public class rabbitMqSend { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 建立一個通道 Channel channel = ConnectionUtil.getConnection().createChannel(); // 指定一個佇列(給佇列明命名 那麼接受者也應該是這個對列名) channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 傳送訊息 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // 關閉頻道和連線 channel.close(); ConnectionUtil.getConnection().close(); } }
消費者:
package wxtest.rabbitMq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 訊息接受方 **/ public class rabbitMqRev { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 建立一個通道 Channel channel = ConnectionUtil.getConnection().createChannel(); // 指定一個佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
輸出結果如下
[x] Received 'Hello World!' [x] Received 'Hello World!' [x] Received 'Hello World!'