rabbitmq 傳送端未關閉channel 造成消費端接收不到
阿新 • • 發佈:2019-02-01
生產端
package com.cao.rabbitmq.action; import java.io.IOException; import com.cao.util.rabbitmq.RabbitMqUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; public class Send { /** * 功能 入門測試 *@date 2018年2月7日下午4:59:07 *@author caoheshan *@param *@returnType void *@return */ public void testSendHelloWorld(){ final String QUEUE_NAME = "helloworld"; try { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" Send message :"+message); channel.close(); connection.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 功能 佇列傳送 *@date 2018年2月7日下午5:08:14 *@author caoheshan *@param *@returnType void *@return */ public void sendQueque(){ final String QUEUE_NAME = "workqueue-durable"; try { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); for (int i = 0; i < 10; i++) { String message = " hello :"+i; channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("傳送的資訊:"+ message); } channel.close(); connection.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 功能 釋出訂閱 *@date 2018年2月8日下午3:58:40 *@author caoheshan *@param *@returnType void *@return */ public void sendPublish(){ try { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("test_fanout", "fanout"); for (int i = 0; i < 10; i++) { String message = " hello :"+i; channel.basicPublish("test_fanout", "", null, message.getBytes()); System.out.println("傳送的資訊:"+ message); } // channel.close(); // connection.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package com.cao.rabbitmq.action; import java.io.IOException; import com.cao.util.rabbitmq.RabbitMqUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Recieve { /** * 功能 釋出訂閱的消費 *@date 2018年2月8日下午4:07:10 *@author caoheshan *@param *@returnType void *@return */ public void recievePublish(String queueName){ try { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("test_fanout", "fanout"); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, "test_fanout", ""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(queueName+"接受的廣播訊息 "+message); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
生產者測試
package com.cao.rabbitmq.action;
import org.junit.Test;
public class TestSend {
@Test
public void testSendQueue(){
new Send().sendQueque();
}
@Test
public void testSendPublish(){
new Send().sendPublish();
}
}
消費者測試
package com.cao.rabbitmq.action; import org.junit.Test; public class TestRecive { /** * 功能 *@date 2018年2月8日下午3:51:17 *@author caoheshan *@param *@returnType void *@return */ @Test public void testrecieveQueue(){ new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub new Recieve().recieveQueue(); } }).start(); new Recieve().recieveQueue(); } @Test public void testPublish(){ new Thread(new Runnable() { @Override public void run() { new Recieve().recievePublish(" publish_recieve 1"); } }).start(); new Recieve().recievePublish(" publish_recive 2"); } }
如果不關閉 生產者的channel和connection ,則publish 方式 消費者無法接收到訊息