1. 程式人生 > >rabbitmq 傳送端未關閉channel 造成消費端接收不到

rabbitmq 傳送端未關閉channel 造成消費端接收不到

生產端

package com.cao.rabbitmq.action;

import java.io.IOException;

import com.cao.util.rabbitmq.RabbitMqUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

public class Send {

	
	/**
	 * 功能 入門測試
	 *@date 2018年2月7日下午4:59:07
	 *@author caoheshan
	 *@param 
	 *@returnType void
	 *@return
	 */
	public void testSendHelloWorld(){
		final String QUEUE_NAME = "helloworld";
		try {
			Connection connection = RabbitMqUtil.getConnection();
			Channel channel = connection.createChannel();
			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
			String message  = "hello world";
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" Send message :"+message);
			channel.close();
			connection.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * 功能 佇列傳送
	 *@date 2018年2月7日下午5:08:14
	 *@author caoheshan
	 *@param 
	 *@returnType void
	 *@return
	 */
	public void sendQueque(){
		final String QUEUE_NAME = "workqueue-durable";
		try {
			Connection connection = RabbitMqUtil.getConnection();
			Channel channel = connection.createChannel();
			boolean durable = true;
			
			channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
			
			for (int i = 0; i < 10; i++) {
				String message = " hello :"+i;
				channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
				System.out.println("傳送的資訊:"+ message);
			}
			 channel.close();  
		      connection.close();  
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * 功能 釋出訂閱
	 *@date 2018年2月8日下午3:58:40
	 *@author caoheshan
	 *@param 
	 *@returnType void
	 *@return
	 */
	public void sendPublish(){
		
		try {
			Connection connection = RabbitMqUtil.getConnection();
			Channel channel = connection.createChannel();
			channel.exchangeDeclare("test_fanout", "fanout");
			for (int i = 0; i < 10; i++) {
				String message = " hello :"+i;
				channel.basicPublish("test_fanout", "", null, message.getBytes());
				System.out.println("傳送的資訊:"+ message);
			}
//			channel.close();
//			connection.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

package com.cao.rabbitmq.action;
import java.io.IOException;

import com.cao.util.rabbitmq.RabbitMqUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class Recieve {
/**
	 * 功能  釋出訂閱的消費
	 *@date 2018年2月8日下午4:07:10
	 *@author caoheshan
	 *@param 
	 *@returnType void
	 *@return
	 */
	public void recievePublish(String queueName){
		
		try {
			Connection connection = RabbitMqUtil.getConnection();
			Channel channel = connection.createChannel();
			channel.exchangeDeclare("test_fanout", "fanout");
			channel.queueDeclare(queueName, false, false, false, null);
			channel.queueBind(queueName, "test_fanout", "");
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(queueName, true,consumer);
			
			while(true){
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String message = new String(delivery.getBody());
				System.out.println(queueName+"接受的廣播訊息 "+message);
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		
	}
}

生產者測試

package com.cao.rabbitmq.action;

import org.junit.Test;

public class TestSend {

	@Test
	public void testSendQueue(){
		new Send().sendQueque();
	}
	
	@Test
	public void testSendPublish(){
		new Send().sendPublish();
	}
}

消費者測試

package com.cao.rabbitmq.action;

import org.junit.Test;

public class TestRecive {

	/**
	 * 功能
	 *@date 2018年2月8日下午3:51:17
	 *@author caoheshan
	 *@param 
	 *@returnType void
	 *@return
	 */
	@Test
	public void testrecieveQueue(){
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				// TODO Auto-generated method stub
				new Recieve().recieveQueue();
			}
		}).start();
		new Recieve().recieveQueue();
	}
	
	@Test
	public void testPublish(){
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				new Recieve().recievePublish(" publish_recieve 1");
			}
		}).start();
		
		new Recieve().recievePublish(" publish_recive 2");
	}
}

如果不關閉  生產者的channel和connection  ,則publish 方式  消費者無法接收到訊息