RabbitMQ 之Hello World 與 工作佇列 (一)
阿新 • • 發佈:2018-12-14
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.2</version>
</dependency>
Hello World
建立一對一簡單通訊
//生產者 public class Send { private final static String QUEUE_NAME = "hello world"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.56.128"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello world"; //第一個引數 為"" 表示使用預設的Exchange channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); channel.close(); connection.close(); } }
//消費者 public class Recv { private static final String QUEUE_NAME = "hello world"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.56.128"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.print(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
Work Queues
建立一對多通訊,和上述例子一致,只是開了多個消費者執行緒。
//生產者 public class NewTask { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.56.128"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); String[] messages = new String[]{"First message.", "Second message..", "Third message...", "Fourth message....", "Fifth message....."}; for(String message : messages){ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("[x] Sent " + message); } channel.close(); connection.close(); } }
//消費者
public class Worker {
private static final String QUEUE_NAME = "hello";
private static void doWork(String task) throws InterruptedException{
for(char ch : task.toCharArray()){
if(ch == '.'){
Thread.sleep(1000);
}
}
}
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.128");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body,"UTF-8");
System.out.println(" [X] received " +message);
try{
doWork(message);//模擬長時工作
}catch (InterruptedException e){
e.printStackTrace();
}finally {
System.out.println(" [x] done");
}
}
};
//公平分發,RabbitMQ同一時刻最多接收一條訊息,在一個worker完成之前不會在分訊息給它,會把訊息分給不那麼忙的worker
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//message acknowledged 訊息確認標識
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
開啟多個worker消費者時,生產者的訊息會迴圈分發
NewTask
[x] Sent First message. [x] Sent Second message.. [x] Sent Third message... [x] Sent Fourth message.... [x] Sent Fifth message.....
worker-1
[X] received First message. [x] done [X] received Third message... [x] done [X] received Fifth message..... [x] done
worker-2
[X] received Second message.. [x] done [X] received Fourth message.... [x] done