1. 程式人生 > >java+rabbitMQ實現一對一聊天

java+rabbitMQ實現一對一聊天

原始碼地址: https://download.csdn.net/download/weixin_40461281/10321780

上一篇文章講了RabbitMQ的安裝

接下來介紹一下具體的應用

使用java + rabbitMQ實現聊天功能的demo , 非常有助於理解和上手rabbitMQ , 該demo僅限於用來學習rabbitMQ , 實際工作中實現聊天功能不推薦使用rabbitMQ

首先建立一個maven專案,然後在pom.xml檔案中匯入RabbitMQ的jar包

地址如下:

<dependencies>
    <dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>3.6.0</version>
    </dependency>
</dependencies>

工作模式採用-工作佇列 接下來具體講解一下程式碼實現

首先建立一個類A 並建立連線工廠和建立一個新的連線

//建立連線工廠
		ConnectionFactory factory = new ConnectionFactory();
		//設定RabbitMQ地址
		factory.setHost("localhost");//連線地址
		factory.setUsername("guest");//使用者名稱
		factory.setPassword("guest");//密碼
		factory.setPort(5672);//埠號
		//建立一個新的連線
		final Connection connection = factory.newConnection();

然後建立執行緒T1用來發送訊息:

//傳送訊息執行緒
Thread t1 = new Thread(new Runnable() {
    public void run() {
	//建立一個頻道
	Channel channel = null;
	try {
	    channel = connection.createChannel();
	    //宣告要關注的頻道
	    channel.exchangeDeclare("logs", "fanout");
	    //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	} catch (Exception e) {
	    e.printStackTrace();
	}
	while(true) {
	    Scanner scan = new Scanner(System.in);
	    System.out.println("請輸入訊息");
	    String message = scan.nextLine();
	    //傳送訊息到佇列中
	    try {
		channel.basicPublish("logs", QUEUE_NAME, null, message.getBytes());
		//channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
	    } catch (Exception e) {
		e.printStackTrace();
	    }
	System.out.println("B傳送訊息:" + message);
	}
    }
});

建立T2用來監聽接收訊息:

//接收訊息執行緒
Thread t2 = new Thread(new Runnable() {
    public void run() {
	Channel channel = null;
	try {
		channel = connection.createChannel();
		//宣告要關注的頻道
		channel.exchangeDeclare("logs", "fanout");
		//channel.queueDeclare(QUEUE_NAME,false,false,false,null);
		channel.queueBind(QUEUE_NAME, "logs", "");
		//建立消費者 ---- 得到訊息後會自動觸發
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(
				String consumerTag, Envelope envelope, 
				AMQP.BasicProperties properties, byte[] body
				) throws IOException {
				//body為訊息體
				String message = new String(body, "UTF-8");
				System.out.println("B接收訊息:" + message);
			}
		};
		//訊息消費完成確認
		channel.basicConsume(QUEUE_NAME, true, consumer);
	} catch (Exception e) {
		e.printStackTrace();
	}
    }
});

最後別忘了啟動兩個執行緒:

t1.start();
t2.start();

然後我們在建立一個一模一樣的類B

public class B {
	private final static String QUEUE_NAME = "test";
	public static void main(String[] args) throws Exception{
		//建立連線工廠
		ConnectionFactory factory = new ConnectionFactory();
		//設定RabbitMQ地址
		factory.setHost("localhost");//連線地址
		factory.setUsername("guest");//使用者名稱
		factory.setPassword("guest");//密碼
		factory.setPort(5672);//埠號
		//建立一個新的連線
		final Connection connection = factory.newConnection();
		//傳送訊息執行緒
		Thread t1 = new Thread(new Runnable() {
			public void run() {
				//建立一個頻道
				Channel channel = null;
				try {
					channel = connection.createChannel();
					//宣告要關注的頻道
					channel.exchangeDeclare("logs", "fanout");
					//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
				} catch (Exception e) {
					e.printStackTrace();
				}
				while(true) {
					Scanner scan = new Scanner(System.in);
					System.out.println("請輸入訊息");
					String message = scan.nextLine();
					//傳送訊息到佇列中
					try {
						channel.basicPublish("logs", QUEUE_NAME, null, message.getBytes());
						//channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
					} catch (Exception e) {
						e.printStackTrace();
					}
					System.out.println("B傳送訊息:" + message);
				}
			}
		});
		//接收訊息執行緒
		Thread t2 = new Thread(new Runnable() {
			public void run() {
				Channel channel = null;
				try {
					channel = connection.createChannel();
					//宣告要關注的頻道
					channel.exchangeDeclare("logs", "fanout");
					//channel.queueDeclare(QUEUE_NAME,false,false,false,null);
					channel.queueBind(QUEUE_NAME, "logs", "");
					//建立消費者 ---- 得到訊息後會自動觸發
					Consumer consumer = new DefaultConsumer(channel) {
						@Override
						public void handleDelivery(
								String consumerTag, Envelope envelope, 
								AMQP.BasicProperties properties, byte[] body
							) throws IOException {
							//body為訊息體
							String message = new String(body, "UTF-8");
							System.out.println("B接收訊息:" + message);
						}
					};
					//訊息消費完成確認
					channel.basicConsume(QUEUE_NAME, true, consumer);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
		t1.start();
		t2.start();
	}
}

然後分別執行兩個類:

最後由於本文的demo需要持續監聽 , 所以未做關閉連線

如果大家在實際中使用一定記得要關閉連線,不然小心你的記憶體

}finally {
    try {
	if (channel != null) {
		channel.close();
	}
	if (connection != null) {
		connection.close();
	}
    }catch (Exception e) {
	e.printStackTrace();
    }
}

好了 , 這樣一個簡單的一對一聊天功能就完成了

我們也可以用 -- 釋出訂閱模式 實現多人線上聊天 , 在這裡我就不演示了,有興趣的小夥伴可以自己做一下

好了,本篇文章就到這了