一、rabbitMQ的安裝和java客戶端使用
阿新 • • 發佈:2019-01-07
1. rabbitma入門
1.1 RabbitMQ的安裝
這裡可以看一下Rabbit的文件,每個版本依賴的erlang環境也是不一樣的
1.安裝erlang:
# 安裝依賴,主要是ncurses-devel這個依賴
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget
# 解壓:
tar -xvf otp_src_18.3.tar.gz
#切換到解壓目錄
cd otp_src_18.3
# 配置環境
./configure --prefix=/usr/local/erlang
# 編譯安裝
make && install
2.RabbitMQ安裝
#解壓
xz -d rabbitmq-server-generic-unix-3.6.1.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.1.tar
3.開放防火牆埠
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --permanent --add-port=5672/tcp
systemctl restart firewalld.service
4.配置環境變數
vi /etc/profile
# erlang environment
ERL_HOME=/usr/local/software/otp_src_20.3
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
# rabbit mq
export PATH=/usr/local/software/rabbitMQ/rabbitmq_server-3.7.4/sbin:$PATH
# 任意目錄下使用erl,可以出現以下結果
[root@iZwz91i5xmdmgfhamgl6dxZ sbin]# erl
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:1 :1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V9.3 (abort with ^G)
1>
rabbitMQ啟動
./rabbitmq-server -detached # 後端啟動
rabbitmqctl status
rabbitmqctl stop
rabbitMQ新增賬戶
rabbitmqctl add_user admin admin #新增使用者,後面兩個引數分別是使用者名稱和密碼,我這都用superrd了。
# set_permissions [-p <vhost>] <username> <conf> <write> <read>
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" #新增許可權
rabbitmqctl set_user_tags admin administrator #修改使用者角色
網頁圖形化介面的使用
mkdir /etc/rabbitmq
rabbitmq-plugins enable rabbitmq_management
瀏覽器中訪問:
http://ip:15672
1.2 java客戶端的使用
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
package com.itcloud.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitProducer {
private static final String HOST = "your ip"; //這裡填寫你的ip地址
private static final Integer PORT = 5672;
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routing_key_demo";
private static final String QUEUE_NAME = "queue_demo";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "hello world";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
package com.itcloud.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer {
private static final String HOST = "YOUR IP"; // 這裡填寫你的ip地址
private static final Integer PORT = 5672;
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
private static final String QUEUE_NAME = "queue_demo";
public static void main(String[] args) {
Address[] addresses = new Address[]{new Address(HOST, PORT)};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = null;
final Channel channel;
try {
connection = factory.newConnection(addresses);
channel = connection.createChannel();
channel.basicQos(64);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("rev message" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
靜待後續。。。。。。。。。。。。。。。。。。