叢集與負載均衡系列(4)——訊息佇列之Rabbitmq的搭建
前面的三篇文章介紹了共享session,從這篇文章開始介紹訊息佇列,這裡用的是Rabbitmq。對於Rabbitmq的一些基本概念,不打算在這裡總結了。因為網上有大把總結的不錯的文章,比如點選開啟連結
這篇文章介紹Rabbitmq的安裝。
- 下載安裝erlang
由於rabbitmq是基於erlang的,因此先下載安裝erlang。我這裡下載的版本為erlang-18.1-1.el6.x86_64.rpm
rpm -ihv erlang-18.1-1.el6.x86_64.rpm
- 下載安裝Rabbitmq
一定要下載和erlang對應版本的Rabbitmq,對於erlang-18.1-1.el6.x86_64的對應Rabbitmq為rabbitmq-server-3.5.6-1.noarch
rpm -ihv rabbitmq-server-3.5.6-1.noarch.rpm
- 啟動服務
/sbin/service rabbitmq-server start
- 新增使用者
./rabbitmqctl add_user admin nmamtf
- 授權
./rabbitmqctl set_user_tags admin administrator
- 開啟瀏覽器管理
./rabbitmq-plugins enable
./rabbitmq-plugins enable rabbitmq_management
- 在瀏覽器中管理
地址為: ip:15672/#/
- 使用者管理
許可權是安裝virtual hosts為粒度進行管理的
到此為止,RabbitMq已經搭建完成
- 測試程式碼
接下來,我們寫點簡單的程式碼來測試一下
生產端傳送資訊
1、建立rabbitmq服務連線
2、建立連線佇列的channel
3、建立佇列
4、傳送訊息
5、關閉連線和channel
package com.wlf.demo;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class SendTest {
private static String QUEUE_NAME="test";
public static void main(String[] argv) throws Exception{
//create connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.58.144");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("nmamtf");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
//create channel
Channel channel = connection.createChannel();
//create queue
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//finally
channel.close();
connection.close();
}
}
這個時候,我們能看到test佇列裡有一條待處理的訊息
消費端處理訊息
1、建立rabbitmq服務連線
2、建立到佇列的channel
3、處理訊息的回撥
4、處理訊息
package com.wlf.demo;
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
public class RecvTest {
private static String QUEUE_NAME="test";
public static void main(String[] argv) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.58.140");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("nmamtf");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages.");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
同樣,我們可以看到訊息被處理了