Java使用RabbitMQ之公平分發
阿新 • • 發佈:2018-09-13
env 接受 col java catch conn ech exceptio don
發送消息:
1 package org.study.workfair; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import org.junit.Test; 6 import org.study.utils.ConnectionUtils; 7 8 import java.io.IOException; 9 import java.util.concurrent.TimeoutException; 10 11 public class Sender {12 public static final String QUEUE_NAME = "test_simple_queue"; 13 14 @Test 15 public void send() throws IOException, TimeoutException, InterruptedException { 16 // 獲取連接 17 Connection conn = ConnectionUtils.getConnection(); 18 // 獲取通道 19 Channel channel = conn.createChannel();20 //創建隊列 21 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 22 //每個消費者發送確認消息前,只發送一條消息 23 channel.basicQos(1); 24 String msg = "hello rabbitmq!"; 25 26 for (int i = 0; i < 50; i++) { 27 String tempStr = i + " " + msg; 28 //發送消息 29 channel.basicPublish("", QUEUE_NAME, null, tempStr.getBytes()); 30 System.out.println("[send] msg " + i + ": " + msg); 31 Thread.sleep(100); 32 } 33 34 channel.close(); 35 conn.close(); 36 } 37 }
接受消息:
1 package org.study.workfair; 2 3 import com.rabbitmq.client.*; 4 import org.junit.Test; 5 import org.study.utils.ConnectionUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Recv { 11 public static final String QUEUE_NAME = "test_simple_queue"; 12 13 @Test 14 public void recv() throws IOException, TimeoutException, InterruptedException { 15 Connection conn = ConnectionUtils.getConnection(); 16 Channel channel = conn.createChannel(); 17 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 18 channel.basicQos(1); 19 20 //定義消費者 21 DefaultConsumer consumer = new DefaultConsumer(channel) { 22 //重寫獲取到達消息 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 // super.handleDelivery(consumerTag, envelope, properties, body); 26 String msg = new String(body, "utf-8"); 27 System.out.println("[1] recv: " + msg); 28 29 try { 30 Thread.sleep(1000); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 }finally { 34 System.out.println("[1] done!"); 35 channel.basicAck(envelope.getDeliveryTag(),false); 36 } 37 } 38 }; 39 40 while (true) { 41 //監聽隊列 42 channel.basicConsume(QUEUE_NAME, false, consumer); 43 Thread.sleep(100); 44 } 45 46 47 } 48 }
Java使用RabbitMQ之公平分發