1. 程式人生 > 其它 >Mac環境下RabbitMq安裝與測試教程 Mac環境下RabbitMq安裝與測試教程

Mac環境下RabbitMq安裝與測試教程 Mac環境下RabbitMq安裝與測試教程

https://www.cnblogs.com/yihuihui/p/9095130.html

Mac環境下RabbitMq安裝與測試教程

 

RabbitMq安裝與測試教程

Installing on Mac

I. 安裝

1
2
3
4
5
6
7
8
9
brew install rabbitmq

## 進入安裝目錄
cd /usr/local/Cellar/rabbitmq/3.7.5

# 啟動
brew services start rabbitmq
# 當前視窗啟動
rabbitmq-server

啟動控制檯之前需要先開啟外掛

1
./rabbitmq-plugins enable rabbitmq_management

進入控制檯: http://localhost:15672/

使用者名稱和密碼:guest,guest

II. 配置與測試

1. 新增賬號

首先是得啟動mq

1
2
3
4
5
6
## 新增賬號
./rabbitmqctl add_user admin admin
## 新增訪問許可權
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設定超級許可權
./rabbitmqctl set_user_tags admin administrator

2. 編碼實測

pom引入依賴

1
2
3
4
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>

開始寫程式碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class RabbitMqTest {

//訊息佇列名稱
private final static String QUEUE_NAME = "hello";

@Test
public void send() throws java.io.IOException, TimeoutException {

//建立連線工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//建立連線
Connection connection = factory.newConnection();

//建立訊息通道
Channel channel = connection.createChannel();

//生成一個訊息佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);


for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;

//釋出訊息,第一個引數表示路由(Exchange名稱),未""則表示使用預設訊息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");
}


//關閉訊息通道和連線
channel.close();
connection.close();

}


@Test
public void consumer() throws java.io.IOException, java.lang.InterruptedException, TimeoutException {

//建立連線工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

//建立連線
Connection connection = factory.newConnection();

//建立訊息通道
Channel channel = connection.createChannel();

//訊息佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");

AtomicInteger count = new AtomicInteger(0);

//消費者用於獲取訊息通道繫結的訊息佇列中的資訊
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");

try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);

Thread.sleep(1000 * 60);
}
}

需要注意的一點是:

  • 生產訊息: channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 消費訊息: channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 生產和消費都宣告channel,要求兩者的配置引數一致,否則無法消費資料

3. 輸出說明

首先執行塞入資料,執行完畢之後,可以到控制檯進行檢視:

可以看到多出了一個Queue,對列名為hello,總共有10條資料


接下來就是消費資料了,執行consumer方法,輸出日誌

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[*] Waiting for message. To exist press CTRL+C
[x] Received 'Hello World RabbitMQ count: 0
[x] Done
[x] Received 'Hello World RabbitMQ count: 1
[x] Done
[x] Received 'Hello World RabbitMQ count: 2
[x] Done
[x] Received 'Hello World RabbitMQ count: 3
[x] Done
[x] Received 'Hello World RabbitMQ count: 4
[x] Done
[x] Received 'Hello World RabbitMQ count: 5
[x] Done
[x] Received 'Hello World RabbitMQ count: 6
[x] Done
[x] Received 'Hello World RabbitMQ count: 7
[x] Done
[x] Received 'Hello World RabbitMQ count: 8
[x] Done
[x] Received 'Hello World RabbitMQ count: 9
[x] Done

回頭去檢視queue,發現總得資料量為0了

4. ACK問題

對於ack的問題,如果在消費資料的時候,出現異常,而我不希望資料丟失,這個時候就需要考慮手動ack的機制來保證了

首先需要設定手動ack

1
2
// 設定autoAck為false
channel.basicConsume(QUEUE_NAME, false, consumer);

其次在消費資料完畢之後,主動ack/nack

1
2
3
4
5
if (success) {
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
channel.basicNack(envelope.getDeliveryTag(), false, false);
}

RabbitMq安裝與測試教程

Installing on Mac

I. 安裝

1
2
3
4
5
6
7
8
9
brew install rabbitmq

## 進入安裝目錄
cd /usr/local/Cellar/rabbitmq/3.7.5

# 啟動
brew services start rabbitmq
# 當前視窗啟動
rabbitmq-server

啟動控制檯之前需要先開啟外掛

1
./rabbitmq-plugins enable rabbitmq_management

進入控制檯: http://localhost:15672/

使用者名稱和密碼:guest,guest

II. 配置與測試

1. 新增賬號

首先是得啟動mq

1
2
3
4
5
6
## 新增賬號
./rabbitmqctl add_user admin admin
## 新增訪問許可權
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設定超級許可權
./rabbitmqctl set_user_tags admin administrator

2. 編碼實測

pom引入依賴

1
2
3
4
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>

開始寫程式碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class RabbitMqTest {

//訊息佇列名稱
private final static String QUEUE_NAME = "hello";

@Test
public void send() throws java.io.IOException, TimeoutException {

//建立連線工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//建立連線
Connection connection = factory.newConnection();

//建立訊息通道
Channel channel = connection.createChannel();

//生成一個訊息佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);


for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;

//釋出訊息,第一個引數表示路由(Exchange名稱),未""則表示使用預設訊息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");
}


//關閉訊息通道和連線
channel.close();
connection.close();

}


@Test
public void consumer() throws java.io.IOException, java.lang.InterruptedException, TimeoutException {

//建立連線工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

//建立連線
Connection connection = factory.newConnection();

//建立訊息通道
Channel channel = connection.createChannel();

//訊息佇列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");

AtomicInteger count = new AtomicInteger(0);

//消費者用於獲取訊息通道繫結的訊息佇列中的資訊
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");

try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);

Thread.sleep(1000 * 60);
}
}

需要注意的一點是:

  • 生產訊息: channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 消費訊息: channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 生產和消費都宣告channel,要求兩者的配置引數一致,否則無法消費資料

3. 輸出說明

首先執行塞入資料,執行完畢之後,可以到控制檯進行檢視:

可以看到多出了一個Queue,對列名為hello,總共有10條資料


接下來就是消費資料了,執行consumer方法,輸出日誌

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[*] Waiting for message. To exist press CTRL+C
[x] Received 'Hello World RabbitMQ count: 0
[x] Done
[x] Received 'Hello World RabbitMQ count: 1
[x] Done
[x] Received 'Hello World RabbitMQ count: 2
[x] Done
[x] Received 'Hello World RabbitMQ count: 3
[x] Done
[x] Received 'Hello World RabbitMQ count: 4
[x] Done
[x] Received 'Hello World RabbitMQ count: 5
[x] Done
[x] Received 'Hello World RabbitMQ count: 6
[x] Done
[x] Received 'Hello World RabbitMQ count: 7
[x] Done
[x] Received 'Hello World RabbitMQ count: 8
[x] Done
[x] Received 'Hello World RabbitMQ count: 9
[x] Done

回頭去檢視queue,發現總得資料量為0了

4. ACK問題

對於ack的問題,如果在消費資料的時候,出現異常,而我不希望資料丟失,這個時候就需要考慮手動ack的機制來保證了

首先需要設定手動ack

1
2
// 設定autoAck為false
channel.basicConsume(QUEUE_NAME, false, consumer);

其次在消費資料完畢之後,主動ack/nack

1
2
3
4
5
if (success) {
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
channel.basicNack(envelope.getDeliveryTag(), false, false);
}