1. 程式人生 > >rabbitmq的java呼叫例項

rabbitmq的java呼叫例項

本文將介紹rabbitmq在用java進行呼叫的程式碼實現

一、新增maven依賴

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

二、訊息生產者

關鍵點如下:

1、建立ConnectionFactory,包括設定rabbitmq服務的地址與埠號(amqp預設埠是5672)、使用者名稱、密碼、vhost(預設是/) ;

2、獲取Connection ;

3、獲取Channel ,並在其上宣告queue、exchanger,並通過routingkey把queue在exchanger進行繫結 ;

程式碼例項如下:

public class RabbitmqProducerMain {


    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.107");
factory.setPort(5672
); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("vhostOne"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "queueOne"; String exchangeName = "exchangerOne"; String routingKey = "queueOne"; channel.exchangeDeclare(exchangeName,
"direct"); channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,exchangeName,routingKey); int msgCnt =15000; while(msgCnt-->0){ String msg = "msg"+Math.random()*100; channel.basicPublish(exchangeName,routingKey,null,msg.getBytes()); //傳送訊息 System.out.println("produce msg :"+msg); TimeUnit.MILLISECONDS.sleep((long) (Math.random()*500)); } channel.close(); connection.close(); } }

三、訊息消費者

關鍵點如下:

1、建立ConnectionFactory,包括設定rabbitmq服務的地址與埠號(amqp預設埠是5672)、使用者名稱、密碼、vhost(預設是/) ;

2、獲取Connection ;

3、獲取Channel ,並在其上宣告queue ;

4、定義消費訊息Consumer;

5、消費訊息channel.basicConsume(queueName,false,"queueOne",consumer);

程式碼例項如下:

public class RabbitmqConsumerMain {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.0.107");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("vhostOne");
Connection connection =  factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "queueOne";
channel.queueDeclare(queueName,false,false,false,null);
channel.basicQos(5);  //每次取5條訊息
Consumer consumer = new DefaultConsumer(channel){
            @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //消費消費
String msg = new String(body,"utf-8");
System.out.println("consume msg: "+msg);
                try {
                    TimeUnit.MILLISECONDS.sleep((long) (Math.random()*500));
} catch (InterruptedException e) {
                    e.printStackTrace();
}
                //手動訊息確認
getChannel().basicAck(envelope.getDeliveryTag(),false);
}
        };
//呼叫消費訊息
channel.basicConsume(queueName,false,"queueOne",consumer);
}
}