RabbitMQ 幾種工作模式---(一)helloworld
阿新 • • 發佈:2021-01-20
生產者類:
package com..helloworld;
import com..utils.RabbitConstant;
import com..utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//TCP 物理連線
Connection conn= RabbitUtils.getConnection();
//建立通訊“通道”,相當於TCP中的虛擬連線
Channel channel = conn.createChannel();
//建立佇列,宣告並建立一個佇列,如果佇列已存在,則使用這個佇列
//第一個引數:佇列名稱ID
//第二個引數:是否持久化,false對應不持久化資料,MQ停掉資料就會丟失
//第三個引數:是否佇列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用,其他消費者不讓訪問
//第四個:是否自動刪除,false代表連線停掉後不自動刪除掉這個佇列
//其他額外的引數, null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
//四個引數
//exchange 交換機,暫時用不到,在後面進行釋出訂閱時才會用到
//佇列名稱
//額外的設定屬性
//最後一個引數是要傳遞的訊息位元組陣列
String message = "hello!";
channel.basicPublish("" , RabbitConstant.QUEUE_HELLOWORLD,null , message.getBytes());
channel.close();
conn.close();
System.out.println("傳送資料成功");
}
}
執行後臺列印:
頁面佇列資料顯示:
消費者類:
package com..helloworld; import com..utils.RabbitConstant; import com..utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throwsIOException, TimeoutException { Connection conn= RabbitUtils.getConnection(); //建立通道 Channel channel = conn.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null); //建立一個訊息消費者 //第二個引數代表是否自動確認收到訊息,false代表手動程式設計來確認訊息,這是MQ的推薦做法//第三個引數要傳入DefaultConsumer的實現類 channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel)); } } class Reciver extends DefaultConsumer{ private Channel channel; //重寫建構函式,Channel通道物件需要從外層傳入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { /*super.handleDelivery(consumerTag,envelope,properties,body);*/ String messageBody = new String(body); System.out.println("消費者接收到:" + messageBody); //簽收訊息,確認訊息 //envelope.getDeliveryTag() 獲取這個訊息的TagId //false只確認簽收當前的訊息,設定為true的時候則代表簽收該消費者所有未簽收的訊息 channel.basicAck(envelope.getDeliveryTag() , false); } }
執行後臺列印:
頁面佇列資料顯示: