1. 程式人生 > 其它 >java操作RabbitMq

java操作RabbitMq

最近學習了一下RabbitMq,記錄一下,主要是java對RabbitMq的一些基本操作,後期會更新springboot整合RabbitMq的文章以及git地址

一、扇形交換機

扇形交換機是最基本的交換機型別,它所能做的事情非常簡單———廣播訊息。扇形交換機會把能接收到的訊息全部發送給繫結在自己身上的佇列。因為廣播不需要“思考”,所以扇形交換機處理訊息的速度也是所有的交換機型別裡面最快的。

1、rabbitmq連結工具類

package com.heyu.rabbitmq.demo.fanout;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: HEYU
 * @date: 2020/12/24 11:24
 * @Description:
 */
public class RabbitConnUtil {
    public static Connection getConn() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("heyu");
        connectionFactory.setPassword("heyu123");
        connectionFactory.setVirtualHost("heyu");
        connectionFactory.setPort(5672);
        return connectionFactory.newConnection();

    }
}

2、生產者

package com.heyu.rabbitmq.demo.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author: HEYU
 * @date: 2020/12/24 14:15
 * @Description: 生產者   扇形交換機      交換機不做轉存  只做轉發
 */
public class FanoutProducer {
    private static final String EXCHANGE_NAME = "peng_fanout";

    public static void main(String[] args) {

        try {
            Connection conn = RabbitConnUtil.getConn();
//            建立通道
            Channel channel = conn.createChannel();
//            繫結交換機 扇形交換機
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String MSG = "交換機不做轉存 只做轉發";
            channel.basicPublish(EXCHANGE_NAME, "", null, MSG.getBytes(StandardCharsets.UTF_8));
            channel.close();
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

3、消費者

package com.heyu.rabbitmq.demo.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: HEYU
 * @date: 2020/12/24 15:21
 * @Description: 郵件消費者
 */
public class EmailConsumer {
    private static final String EMAIL_QUEUE_NAME = "peng_email_queue";
    private static final String EXCHANGE_NAME = "peng_fanout";

    public static void main(String[] args) {
        try {
            Connection conn = RabbitConnUtil.getConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null);
//            消費者繫結交換機 主題佇列名稱   交換機名稱  路由鍵
            channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,"");
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("郵件消費者讀取訊息:"+msg);
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag,false);
                }
            };
            channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

package com.heyu.rabbitmq.demo.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: HEYU
 * @date: 2020/12/24 14:16
 * @Description:  簡訊消費者
 */
public class SmsConsumer {
    private static final String SMS_QUEUE_NAME = "peng_sms_queue";
    private static final String EXCHANGE_NAME = "peng_fanout";

    public static void main(String[] args) {
        try {
            Connection conn = RabbitConnUtil.getConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
//            消費者繫結交換機
            channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"");
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("簡訊消費者讀取訊息:"+msg);
                }
            };
            channel.basicConsume(SMS_QUEUE_NAME,true,defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

二、主題交換機

主題交換機上的訊息需要攜帶指定規則的routing_key,主題交換機會根據這個規則將資料傳送到對應的(多個)佇列
1、RabbitMq工具連線類

package com.heyu.rabbitmq.demo.fanout;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: HEYU
 * @date: 2020/12/24 11:24
 * @Description:
 */
public class RabbitConnUtil {
    public static Connection getConn() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("heyu");
        connectionFactory.setPassword("heyu123");
        connectionFactory.setVirtualHost("heyu");
        connectionFactory.setPort(5672);
        return connectionFactory.newConnection();

    }
}

2、生產者

package com.heyu.rabbitmq.demo.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author: HEYU
 * @date: 2020/12/24 14:15
 * @Description: 釋出訂閱
 */
public class TopicProducer {
    private static final String EXCHANGE_NAME = "peng_topic_fanout";

    public static void main(String[] args) {

        try {
            Connection conn = RabbitConnUtil.getConn();
//            建立通道
            Channel channel = conn.createChannel();
//            繫結交換機 扇形交換機
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String MSG = "routingKey 訊息轉發  測試";
            channel.basicPublish(EXCHANGE_NAME, "log", null, MSG.getBytes(StandardCharsets.UTF_8));
            channel.close();
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

3、消費者

package com.heyu.rabbitmq.demo.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: Melody
 * @date: 2020/12/24 15:21
 * @Description: 郵件消費者
 */
public class EmailConsumer {
    private static final String EMAIL_QUEUE_NAME = "topic_email_queue";
    private static final String EXCHANGE_NAME = "peng_topic_fanout";

    public static void main(String[] args) {
        try {
            Connection conn = RabbitConnUtil.getConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null);
//            消費者繫結交換機 主題佇列名稱   交換機名稱  路由鍵
            channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,"log");
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("郵件消費者讀取訊息:"+msg);
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag,false);
                }
            };
            channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

package com.heyu.rabbitmq.demo.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: Melody
 * @date: 2020/12/24 14:16
 * @Description:  消費者
 */
public class SmsConsumer {
    private static final String SMS_QUEUE_NAME = "topic_sms_queue";
    private static final String EXCHANGE_NAME = "peng_topic_fanout";

    public static void main(String[] args) {
        try {
            Connection conn = RabbitConnUtil.getConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
//            消費者繫結交換機
            channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"log");
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("簡訊消費者讀取訊息:"+msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(SMS_QUEUE_NAME,false,defaultConsumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}