java+rabbitMQ實現一對一聊天
阿新 • • 發佈:2019-01-03
原始碼地址: https://download.csdn.net/download/weixin_40461281/10321780
上一篇文章講了RabbitMQ的安裝
接下來介紹一下具體的應用
使用java + rabbitMQ實現聊天功能的demo , 非常有助於理解和上手rabbitMQ , 該demo僅限於用來學習rabbitMQ , 實際工作中實現聊天功能不推薦使用rabbitMQ
首先建立一個maven專案,然後在pom.xml檔案中匯入RabbitMQ的jar包
地址如下:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency> </dependencies>
工作模式採用-工作佇列 接下來具體講解一下程式碼實現
首先建立一個類A 並建立連線工廠和建立一個新的連線
//建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定RabbitMQ地址 factory.setHost("localhost");//連線地址 factory.setUsername("guest");//使用者名稱 factory.setPassword("guest");//密碼 factory.setPort(5672);//埠號 //建立一個新的連線 final Connection connection = factory.newConnection();
然後建立執行緒T1用來發送訊息:
//傳送訊息執行緒 Thread t1 = new Thread(new Runnable() { public void run() { //建立一個頻道 Channel channel = null; try { channel = connection.createChannel(); //宣告要關注的頻道 channel.exchangeDeclare("logs", "fanout"); //channel.queueDeclare(QUEUE_NAME, false, false, false, null); } catch (Exception e) { e.printStackTrace(); } while(true) { Scanner scan = new Scanner(System.in); System.out.println("請輸入訊息"); String message = scan.nextLine(); //傳送訊息到佇列中 try { channel.basicPublish("logs", QUEUE_NAME, null, message.getBytes()); //channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); } catch (Exception e) { e.printStackTrace(); } System.out.println("B傳送訊息:" + message); } } });
建立T2用來監聽接收訊息:
//接收訊息執行緒
Thread t2 = new Thread(new Runnable() {
public void run() {
Channel channel = null;
try {
channel = connection.createChannel();
//宣告要關注的頻道
channel.exchangeDeclare("logs", "fanout");
//channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME, "logs", "");
//建立消費者 ---- 得到訊息後會自動觸發
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body
) throws IOException {
//body為訊息體
String message = new String(body, "UTF-8");
System.out.println("B接收訊息:" + message);
}
};
//訊息消費完成確認
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
});
最後別忘了啟動兩個執行緒:
t1.start();
t2.start();
然後我們在建立一個一模一樣的類B
public class B {
private final static String QUEUE_NAME = "test";
public static void main(String[] args) throws Exception{
//建立連線工廠
ConnectionFactory factory = new ConnectionFactory();
//設定RabbitMQ地址
factory.setHost("localhost");//連線地址
factory.setUsername("guest");//使用者名稱
factory.setPassword("guest");//密碼
factory.setPort(5672);//埠號
//建立一個新的連線
final Connection connection = factory.newConnection();
//傳送訊息執行緒
Thread t1 = new Thread(new Runnable() {
public void run() {
//建立一個頻道
Channel channel = null;
try {
channel = connection.createChannel();
//宣告要關注的頻道
channel.exchangeDeclare("logs", "fanout");
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
} catch (Exception e) {
e.printStackTrace();
}
while(true) {
Scanner scan = new Scanner(System.in);
System.out.println("請輸入訊息");
String message = scan.nextLine();
//傳送訊息到佇列中
try {
channel.basicPublish("logs", QUEUE_NAME, null, message.getBytes());
//channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("B傳送訊息:" + message);
}
}
});
//接收訊息執行緒
Thread t2 = new Thread(new Runnable() {
public void run() {
Channel channel = null;
try {
channel = connection.createChannel();
//宣告要關注的頻道
channel.exchangeDeclare("logs", "fanout");
//channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME, "logs", "");
//建立消費者 ---- 得到訊息後會自動觸發
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body
) throws IOException {
//body為訊息體
String message = new String(body, "UTF-8");
System.out.println("B接收訊息:" + message);
}
};
//訊息消費完成確認
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}
然後分別執行兩個類:
最後由於本文的demo需要持續監聽 , 所以未做關閉連線
如果大家在實際中使用一定記得要關閉連線,不然小心你的記憶體
}finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}catch (Exception e) {
e.printStackTrace();
}
}
好了 , 這樣一個簡單的一對一聊天功能就完成了
我們也可以用 -- 釋出訂閱模式 實現多人線上聊天 , 在這裡我就不演示了,有興趣的小夥伴可以自己做一下
好了,本篇文章就到這了