1. 程式人生 > >RabbitMQ快速入門

RabbitMQ快速入門

提高 The clas rod serializa java 一個隊列 有一個 中間件

  • 用處:
將一些無需即時返回且耗時的操作提取出來。進行了異步處理,而這種異步處理的方式大大的節省了server的請求響應時間,從而提高了系統的吞吐量。 如登錄系統調用短信系統和安全系統,可以通過消息中間件來調用短信和安全系統。 隊列服務, 會有三個概念: 發消息者、隊列、收消息者, RabbitMQ 在這個基本概念之上, 多做了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發消息者和隊列就沒有直接聯系, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。
  • 重要的概念有 4 個,分別為:虛擬主機,交換機,隊列,和綁定。

虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。為什麽需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”。 交換機:Exchange 用於轉發消息,但是它不會做存儲 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。 這裏有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麽究竟轉發到哪個隊列,就要根據該路由鍵。 交換機有四種類型:Direct, topic, Headers and Fanout。 綁定:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關系。
  • 發送(單次發送):

<wiz_code_mirror>
        /**
         * 創建連接連接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //設置MabbitMQ所在主機ip或者主機名
        factory.setHost("127.0.0.1");
        //創建一個連接
        Connection connection = factory.newConnection();
        //創建一個頻道
        Channel channel = connection.createChannel();
        //指定一個隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //發送的消息
        String message = "hello world!";
        //往隊列中發出一條消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//channel.basicPublish("", QUEUE_NAME, null, SerializationUtils.serialize(messageInfo));實體使用序列化
        System.out.println("Sent ‘" + message + "‘");
        //關閉頻道和連接
        channel.close();
        connection.close();
  • 接收(實時接收、循環調用):
<wiz_code_mirror>
//打開連接和創建頻道,與發送端一樣
        ConnectionFactory factory = new ConnectionFactory();
        //設置MabbitMQ所在主機ip或者主機名
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");
?
        //創建隊列消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消費隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true)
        {
            //nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            //MessageInfo messageInfo = (MessageInfo)SerializationUtils.deserialize(body);實體使用反序列化
            System.out.println("Received ‘" + message + "‘");
        }
  • 配置文件:
默認端口5672. \RabbitMQ Server\rabbitmq_server-3.7.4\etc\rabbitmq.config.example <wiz_code_mirror>
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

RabbitMQ快速入門