RabbitMQ學習——生產者與消費者入門例子
阿新 • • 發佈:2018-12-08
生產者
package com.learn.rabbitmqapi.message;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory. setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost
//2. 通過連線工廠建立連線
Connection connection = connectionFactory.newConnection();
//3. 通過connection建立一個Channel
Channel channel = connection.createChannel ();
//4. 通過Channel傳送資料
for (int i = 0; i < 10; i++) {
String message = "Hello" + i;
//exchange為"",則通過routingKey取尋找佇列
channel.basicPublish("","testQueue",null,message.getBytes());
}
//5. 關閉連線
channel.close();
connection.close();
}
}
消費者
package com.learn.rabbitmqapi.message;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;
public static final String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws Exception {
//1. 建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost
//2. 通過連線工廠建立連線
Connection connection = connectionFactory.newConnection();
//3. 通過connection建立一個Channel
Channel channel = connection.createChannel();
//4. 宣告(建立)一個佇列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//5. 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6. 設定Channel
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
int num = 0;
//7. 獲取訊息
while (true) {
//nextDelivery 會阻塞直到有訊息過來
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("收到:" + message);
num++;
if (num == 8) {
break;
}
}
channel.close();
connection.close();
}
}
先啟動消費者,消費者程式碼會新建一個佇列,再啟動生成者