rabbitmq的java呼叫例項
阿新 • • 發佈:2019-01-07
本文將介紹rabbitmq在用java進行呼叫的程式碼實現
一、新增maven依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
二、訊息生產者
關鍵點如下:
1、建立ConnectionFactory,包括設定rabbitmq服務的地址與埠號(amqp預設埠是5672)、使用者名稱、密碼、vhost(預設是/) ;
2、獲取Connection ;
3、獲取Channel ,並在其上宣告queue、exchanger,並通過routingkey把queue在exchanger進行繫結 ;
程式碼例項如下:
public class RabbitmqProducerMain { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.107"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("vhostOne"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "queueOne"; String exchangeName = "exchangerOne"; String routingKey = "queueOne"; channel.exchangeDeclare(exchangeName,"direct"); channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,exchangeName,routingKey); int msgCnt =15000; while(msgCnt-->0){ String msg = "msg"+Math.random()*100; channel.basicPublish(exchangeName,routingKey,null,msg.getBytes()); //傳送訊息 System.out.println("produce msg :"+msg); TimeUnit.MILLISECONDS.sleep((long) (Math.random()*500)); } channel.close(); connection.close(); } }
三、訊息消費者
關鍵點如下:
1、建立ConnectionFactory,包括設定rabbitmq服務的地址與埠號(amqp預設埠是5672)、使用者名稱、密碼、vhost(預設是/) ;
2、獲取Connection ;
3、獲取Channel ,並在其上宣告queue ;
4、定義消費訊息Consumer;
5、消費訊息channel.basicConsume(queueName,false,"queueOne",consumer);
程式碼例項如下:
public class RabbitmqConsumerMain { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.107"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("vhostOne"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "queueOne"; channel.queueDeclare(queueName,false,false,false,null); channel.basicQos(5); //每次取5條訊息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //消費消費 String msg = new String(body,"utf-8"); System.out.println("consume msg: "+msg); try { TimeUnit.MILLISECONDS.sleep((long) (Math.random()*500)); } catch (InterruptedException e) { e.printStackTrace(); } //手動訊息確認 getChannel().basicAck(envelope.getDeliveryTag(),false); } }; //呼叫消費訊息 channel.basicConsume(queueName,false,"queueOne",consumer); } }