1. 程式人生 > >rabbitmq技術的一些感悟(一)

rabbitmq技術的一些感悟(一)

Rabbitmq

初識rabbitmq

RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQAMQP(高階訊息佇列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文件會比較困難。不過它也只有幾個關鍵概念,這裡簡單介紹

幾個概念說明:

Broker:簡單來說就是訊息佇列伺服器實體。Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。Binding:繫結,它的作用就是把exchangequeue按照路由規則繫結起來。Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。

vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。producer:訊息生產者,就是投遞訊息的程式。consumer:訊息消費者,就是接受訊息的程式。channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。

由Exchange,Queue,RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。

訊息佇列的使用過程大概如下:

1)客戶端連線到訊息佇列伺服器,開啟一個channel  (2)客戶端宣告一個exchange,並設定相關屬性。  (3)客戶端宣告一個queue,並設定相關屬性。

  (4)客戶端使用routing key,在exchangequeue之間建立好繫結關係。  (5)客戶端投遞訊息到exchange

exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。

exchange也有幾個型別,完全根據key進行投遞的叫做Direct交換機,例如,繫結時設定了routing key”abc”,那麼客戶端提交的訊息,只有設定了key”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”

”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它採取廣播模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。

RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分:  (1exchange持久化,在宣告時指定durable => 1
  (2queue持久化,在宣告時指定durable => 1
  (3)訊息持久化,在投遞時指定delivery_mode=> 21是非持久化)

如果exchangequeue都是持久化的,那麼它們之間的binding也是持久化的。如果exchangequeue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。

安裝開發環境和庫

1.將目錄中的librabbitmq.so.1放到目錄 /usr/local/lib/librabbitmq.so.1

2.安裝rabbitm需要的環境和庫

yum install -y ncurses-devel

yum install gcc

yum install g++

yum install cmake

yum install make

yum install php

yum install mysql

yum install php-process

yum install php-devel

yum install mysql-server

#安裝php的amq支援擴充套件

wget http://pecl.php.net/get/amqp-1.0.3.tgz

tar zxvf amqp-1.0.3.tgz

cd amqp-1.0.3

/usr/bin/phpize

./configure--with-php-config=/usr/bin/php-config --with-amqp

make && make install

#php.ini 新增

vi /etc/php.ini

extension="amqp.so"

#安裝erlang支援

wgethttp://www.erlang.org/download/otp_src_R15B01.tar.gz

tar -zxvf otp_src_R15B01.tar.gz

cd otp_src_R15B01

./configure --prefix=/home/erlang--without-javac

make && make install

ln -s /home/erlang/bin/erl/usr/local/bin/erl

3. 安裝rabbitma

 解壓rabbitmq-server-generic-unix-3.3.4.tar

 進入sbin目錄:

    啟動rabbitmq服務,執行 nohup./rabbitmq-server start &

啟動rabbitmq伺服器以及命令

當第一次啟動服務,檢測資料庫是否未初始化或者被刪除,它會用下面的資源初始化一個新的資料庫:

一個命名為 / 的虛擬宿主一個名為guest密碼也為guest的使用者,他擁有/虛擬宿主的所有許可權如果你的中介軟體是公開訪問的,最好修改guest使用者的密碼。管理概觀rabbitmqctl 是RabbitMQ中介軟體的一個命令列管理工具。它通過連線一箇中間件節點執行所有的動作。本地節點預設被命名為”rabbit”。可以通過這個命令前使用”-n”標誌明確的指定節點名稱, 例如:# rabbitmqctl -n [email protected] add_user tonyg changeit

這個命令指示RabbitMQ中介軟體在[email protected] 節點建立一個tonyg/changeit的使用者。

在一個名為”server.example.com”的主機,RabbitMQ Erlang節點的名稱通常是[email protected](除非RABBITMQ_NODENAM在中介軟體啟動時候被設定)。hostnam -s 的輸出通常是”@”符號正確的字尾。rabbitmqctl 預設產生詳細輸出。通過”-q”標示可選擇安靜模式。rabbitmqctl -q status應用和叢集管理1.停止RabbitMQ應用,關閉節點

# rabbitmqctl stop

2.停止RabbitMQ應用

# rabbitmqctl stop_app

3.啟動RabbitMQ應用

# rabbitmqctl start_app

4.顯示RabbitMQ中介軟體各種資訊

# rabbitmqctl status

5.重置RabbitMQ節點

# rabbitmqctl reset

# rabbitmqctl force_reset

從它屬於的任何叢集中移除,從管理資料庫中移除所有資料,例如配置過的使用者和虛擬宿主, 刪除所有持久化的訊息。

force_reset命令和reset的區別是無條件重置節點,不管當前管理資料庫狀態以及叢集的配置。如果資料庫或者叢集配置發生錯誤才使用這個最後的手段。

注意:只有在停止RabbitMQ應用後,reset和force_reset才能成功。

6.迴圈日誌檔案

# rabbitmqctl rotate_logs[suffix]

7.叢集管理

# rabbitmqctl cluster clusternode…

使用者管理

1.新增使用者

# rabbitmqctl add_user username password

2.刪除使用者

# rabbitmqctl delete_user username

3.修改密碼

# rabbitmqctl change_password usernamenewpassword

4.列出所有使用者

# rabbitmqctl list_users

許可權控制1.建立虛擬主機

# rabbitmqctl add_vhost vhostpath

2.刪除虛擬主機

# rabbitmqctl delete_vhost vhostpath

3.列出所有虛擬主機

# rabbitmqctl list_vhosts

4.設定使用者許可權

# rabbitmqctl set_permissions [-pvhostpath] username regexp regexp regexp

5.清除使用者許可權

# rabbitmqctl clear_permissions [-pvhostpath] username

6.列出虛擬主機上的所有許可權

# rabbitmqctl list_permissions [-pvhostpath]

7.列出使用者許可權

# rabbitmqctl list_user_permissionsusername

例子:

新增  rabbitmqctl add_vhost az

rabbitmqctl set_permissions -p az guest".*" ".*" ".*"

介面描述

amqp_connection_state_tamqp_new_connection(void);

介面說明:宣告一個新的amqp connection

intamqp_open_socket(char const *hostname, int portnumber);

介面說明:獲取socket.

引數說明:hostname        RabbitMQ server所在主機

                 portnumber     RabbitMQ server監聽埠

voidamqp_set_sockfd(amqp_connection_state_t state,int sockfd);

介面說明:將amqp connectionsockfd進行繫結

amqp_rpc_reply_tamqp_login(amqp_connection_state_t state, char const *vhost,intchannel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method,...);

介面說明:用於登入RabbitMQ server,主要目的為了進行許可權管理;

引數說明:state    amqpconnection

                 vhost   rabbit-mq的虛機主機,是rabbit-mq進行許可權管理的最小單位

                 channel_max  最大連結數,此處設成0即可

                 frame_max  和客戶端通訊時所允許的最大的frame size.預設值為131072,增大這個值有助於提高吞吐,降低這個值有利於降低時延

                 heartbeat 含義未知,預設值填0

                 sasl_method  用於SSL鑑權,預設值參考後文demo

amqp_channel_open_ok_t*amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);

介面說明:用於關聯connchannel

amqp_exchange_declare_ok_t*amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive,amqp_boolean_t durable, amqp_table_t arguments); 

介面說明:宣告declare

引數說明:state

                 channel

                 exchange

                 type     "fanout" "direct" "topic"三選一

                 passive

                 curable

                 arguments

amqp_queue_declare_ok_t*amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable,amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_targuments); 

介面說明:宣告queue

引數說明:state   amqp connection

                 channel 

                 queue  queue name

                 passive 

                 durable  佇列是否持久化

                 exclusive  當前連線不在時,佇列是否自動刪除

                 aoto_delete 沒有consumer時,佇列是否自動刪除

                 arguments 用於拓展引數,比如x-ha-policy用於mirrored queue

amqp_queue_bind_ok_t*amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);

介面說明:宣告binding    

amqp_basic_qos_ok_t*amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_tprefetch_size, uint16_t prefetch_count, amqp_boolean_t global);

介面說明:qos quality of service,我們這裡使用主要用於控制預取訊息數,避免訊息按條數均勻分配,需要和no_ack配合使用

引數說明:state

                  channel

                  prefetch_size bytes為單位,0unlimited

                  prefetch_count 預取的訊息條數

                  global

amqp_basic_consume_ok_t*amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local,amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments); 

介面說明:開始一個queue consumer

引數說明:state

                  channel

                  queue

                  consumer_tag

                  no_local

                  no_ack    是否需要確認訊息後再從佇列中刪除訊息

                  exclusive

                  arguments

int amqp_basic_ack(amqp_connection_state_tstate,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple);

intamqp_basic_publish(amqp_connection_state_t state,amqp_channel_tchannel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_tmandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const*properties,amqp_bytes_t body);

介面說明:釋出訊息

引數說明:state 

                 channel

                 exchange  

                 routing_key  exchange為預設“”時,此處填寫queue_name,當exchangedirect,此處為binding_key

                 mandatory 參見參考文獻2

                 immediate 同上

                 properties 更多屬性,如何設定訊息持久化,參見文後demo

                 body 訊息體

amqp_rpc_reply_tamqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,intcode);

amqp_rpc_reply_tamqp_connection_close(amqp_connection_state_t state,int code);

intamqp_destroy_connection(amqp_connection_state_t state);