1. 程式人生 > >RabbitMQ的訂閱和釋出步驟詳解

RabbitMQ的訂閱和釋出步驟詳解

一、關於RabbitMQ搭建和基本概念這裡不做介紹,下面給出實用的參考部落格

RabbitMQ基礎概念及詳細介紹參考文件:http://blog.csdn.net/whycold/article/details/41119807

RabbitMQ入門及環境的搭建:http://m.blog.csdn.net/article/details?id=50487028

RabbitMQ網頁控制檯開啟方式:http://blog.csdn.net/spyiu/article/details/24697221

二、RabbitMQ的釋出

(1)釋出端的連線方法

釋出端的連線只需要建立一個ConnectionFactory然後建立一個連線,然後建立一個頻道,

宣告一個路由器,指定名稱、模式、及其是否durable

public int BaseConnection(){
        /**
         * 建立連線連線到RabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //設定MabbitMQ所在主機ip或者主機名
        factory.setHost(IP);
        factory.setPort(5672);
        factory.setUsername(user);
        factory.setPassword(password);

        //建立一個連線
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            System.out.println("[x] 請確認輸入的IP地址、使用者名稱、密碼是否準確!");
            return  -1;
        } catch (Exception e) {
            System.out.println("[x] 連線RabbitMQ超時,請重試!");
            return  -1;
        }

        //建立一個頻道
        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            System.out.println("[x] 建立頻道出錯,請重試!");
            return -1;
        }

        //宣告一個路由器,指定名稱、模式、及其是否durable
        try {
            channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
        } catch (IOException e) {
            System.out.println("[x] 路由器宣告失敗,請重試!");
            return -1;
        }

        System.out.println("[x] 釋出訊息者成功連線至RabbitMQ !");
        return 0;
    }

(2)釋出端釋出訊息的方法

釋出端釋出訊息往指定的exchange(路由)傳送的時候需要指定一個routingKey,topic型別時當接收端的bindingKey和routingKey相匹配的時候才能接收到訂閱的訊息

/**
     * 釋出訊息
     * @param routingKey routingKey
     * @param msg 訊息
     * @return
     */
    public boolean Publish(String routingKey,byte[] msg){
            try {
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg);
                System.out.println("[x] Sender Sent " + routingKey + " : " + msg);
                return true;
            } catch (IOException e) {
                System.out.println("[y] Sender failed to basic publish.");
                return false;
            }
    }
三、RabbitMQ的訂閱

(1)訂閱端的連線方法

訂閱端的連線在釋出端連線的基礎上還需要給該exchange繫結

/**
     * 建立連線連線到RabbitMQ
     * @return   0 means success
     *           -1 means failure
     */
    public int BaseConnection(){
        ConnectionFactory factory = new ConnectionFactory();
        //設定MabbitMQ所在主機ip或者主機名
        factory.setHost(IP);
        factory.setPort(5672);
        factory.setUsername(user);
        factory.setPassword(password);

        //建立一個連線
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            System.out.println("[y] 請確認輸入的IP地址、使用者名稱、密碼是否準確!");
            return  -1;
        } catch (Exception e) {
            System.out.println("[y] 連線RabbitMQ超時,請重試!");
            return  -1;
        }

        //建立一個頻道
        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            System.out.println("[y] 建立頻道出錯,請重試!");
            return -1;
        }

        //宣告一個路由器,指定名稱、模式、及其是否durable
        try {
            channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
        } catch (IOException e) {
            System.out.println("[y] 路由器宣告失敗,請重試!");
            return -1;
        }

        //指定一個佇列,隨機佇列名,non-durable,exclusive,not auto-delete
//        QUEUE_NAME = MqConfig.QUEUE_NAME;

        Map<String,Object> args = new HashMap();
        args.put("x-message-ttl",message_ttl);
        try {
            channel.queueDeclare(QUEUE_NAME,QUEUE_DURABLE,QUEUE_EXCLUSIVE,QUEUE_AUTO_DELETE,args);
        } catch (IOException e) {
            System.out.println("[y] 佇列宣告失敗,請重試!");
        }
        
        try {
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,bindingKey);
        } catch (IOException e) {
            System.out.println("[y] 佇列繫結路由關鍵字 " + bindingKey + " 時出錯!");
        }

        System.out.println("[y] 訂閱訊息者成功連線至RabbitMQ !");
        return 0;
    }

(2)訂閱端的訂閱方法

/**
     * 訂閱訊息函式
     */
    public boolean Consume(){
        System.out.println("[y] Receiver " + QUEUE_NAME + " Waiting for messages.To exit press CTRL+C.");
        //訊息訂閱
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                team.iOceanPlus.PB.Config.PBConfig rec=team.iOceanPlus.PB.Config.PBConfig.parseFrom(body);
//                rec.getConfigTargetDistribution();
//                rec.getConfigTargetDistribution().getConfigType();
                System.out.println(rec.getConfigTargetDistribution(0));
            }
        };

        //訊息反饋
        try {
            channel.basicConsume(QUEUE_NAME,true,consumer);
            return true;
        } catch (IOException e) {
            System.out.println("[y] Receiver "+ QUEUE_NAME + " failed to basic consume." );
            return false;
        }
    }