rabbitmq技術的一些感悟(一)
Rabbitmq
初識rabbitmq
RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文件會比較困難。不過它也只有幾個關鍵概念,這裡簡單介紹
幾個概念說明:
Broker:簡單來說就是訊息佇列伺服器實體。Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
由Exchange,Queue,RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。
訊息佇列的使用過程大概如下:
(1)客戶端連線到訊息佇列伺服器,開啟一個channel。 (2)客戶端宣告一個exchange,並設定相關屬性。 (3)客戶端宣告一個queue,並設定相關屬性。
exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。
exchange也有幾個型別,完全根據key進行投遞的叫做Direct交換機,例如,繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”
RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分: (1)exchange持久化,在宣告時指定durable => 1
(2)queue持久化,在宣告時指定durable => 1
(3)訊息持久化,在投遞時指定delivery_mode=> 2(1是非持久化)
如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。
安裝開發環境和庫
1.將目錄中的librabbitmq.so.1放到目錄 /usr/local/lib/librabbitmq.so.1
2.安裝rabbitm需要的環境和庫
yum install -y ncurses-devel
yum install gcc
yum install g++
yum install cmake
yum install make
yum install php
yum install mysql
yum install php-process
yum install php-devel
yum install mysql-server
#安裝php的amq支援擴充套件
wget http://pecl.php.net/get/amqp-1.0.3.tgz
tar zxvf amqp-1.0.3.tgz
cd amqp-1.0.3
/usr/bin/phpize
./configure--with-php-config=/usr/bin/php-config --with-amqp
make && make install
#php.ini 新增
vi /etc/php.ini
extension="amqp.so"
#安裝erlang支援
wgethttp://www.erlang.org/download/otp_src_R15B01.tar.gz
tar -zxvf otp_src_R15B01.tar.gz
cd otp_src_R15B01
./configure --prefix=/home/erlang--without-javac
make && make install
ln -s /home/erlang/bin/erl/usr/local/bin/erl
3. 安裝rabbitma
解壓rabbitmq-server-generic-unix-3.3.4.tar
進入sbin目錄:
啟動rabbitmq服務,執行 nohup./rabbitmq-server start &
啟動rabbitmq伺服器以及命令
當第一次啟動服務,檢測資料庫是否未初始化或者被刪除,它會用下面的資源初始化一個新的資料庫:
一個命名為 / 的虛擬宿主一個名為guest密碼也為guest的使用者,他擁有/虛擬宿主的所有許可權如果你的中介軟體是公開訪問的,最好修改guest使用者的密碼。管理概觀rabbitmqctl 是RabbitMQ中介軟體的一個命令列管理工具。它通過連線一箇中間件節點執行所有的動作。本地節點預設被命名為”rabbit”。可以通過這個命令前使用”-n”標誌明確的指定節點名稱, 例如:# rabbitmqctl -n [email protected] add_user tonyg changeit
這個命令指示RabbitMQ中介軟體在[email protected] 節點建立一個tonyg/changeit的使用者。
在一個名為”server.example.com”的主機,RabbitMQ Erlang節點的名稱通常是[email protected](除非RABBITMQ_NODENAM在中介軟體啟動時候被設定)。hostnam -s 的輸出通常是”@”符號正確的字尾。rabbitmqctl 預設產生詳細輸出。通過”-q”標示可選擇安靜模式。rabbitmqctl -q status應用和叢集管理1.停止RabbitMQ應用,關閉節點
# rabbitmqctl stop
2.停止RabbitMQ應用
# rabbitmqctl stop_app
3.啟動RabbitMQ應用
# rabbitmqctl start_app
4.顯示RabbitMQ中介軟體各種資訊
# rabbitmqctl status
5.重置RabbitMQ節點
# rabbitmqctl reset
# rabbitmqctl force_reset
從它屬於的任何叢集中移除,從管理資料庫中移除所有資料,例如配置過的使用者和虛擬宿主, 刪除所有持久化的訊息。
force_reset命令和reset的區別是無條件重置節點,不管當前管理資料庫狀態以及叢集的配置。如果資料庫或者叢集配置發生錯誤才使用這個最後的手段。
注意:只有在停止RabbitMQ應用後,reset和force_reset才能成功。
6.迴圈日誌檔案
# rabbitmqctl rotate_logs[suffix]
7.叢集管理
# rabbitmqctl cluster clusternode…
使用者管理
1.新增使用者
# rabbitmqctl add_user username password
2.刪除使用者
# rabbitmqctl delete_user username
3.修改密碼
# rabbitmqctl change_password usernamenewpassword
4.列出所有使用者
# rabbitmqctl list_users
許可權控制1.建立虛擬主機
# rabbitmqctl add_vhost vhostpath
2.刪除虛擬主機
# rabbitmqctl delete_vhost vhostpath
3.列出所有虛擬主機
# rabbitmqctl list_vhosts
4.設定使用者許可權
# rabbitmqctl set_permissions [-pvhostpath] username regexp regexp regexp
5.清除使用者許可權
# rabbitmqctl clear_permissions [-pvhostpath] username
6.列出虛擬主機上的所有許可權
# rabbitmqctl list_permissions [-pvhostpath]
7.列出使用者許可權
# rabbitmqctl list_user_permissionsusername
例子:
新增 rabbitmqctl add_vhost az
rabbitmqctl set_permissions -p az guest".*" ".*" ".*"
介面描述
amqp_connection_state_tamqp_new_connection(void);
介面說明:宣告一個新的amqp connection
intamqp_open_socket(char const *hostname, int portnumber);
介面說明:獲取socket.
引數說明:hostname RabbitMQ server所在主機
portnumber RabbitMQ server監聽埠
voidamqp_set_sockfd(amqp_connection_state_t state,int sockfd);
介面說明:將amqp connection和sockfd進行繫結
amqp_rpc_reply_tamqp_login(amqp_connection_state_t state, char const *vhost,intchannel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method,...);
介面說明:用於登入RabbitMQ server,主要目的為了進行許可權管理;
引數說明:state amqpconnection
vhost rabbit-mq的虛機主機,是rabbit-mq進行許可權管理的最小單位
channel_max 最大連結數,此處設成0即可
frame_max 和客戶端通訊時所允許的最大的frame size.預設值為131072,增大這個值有助於提高吞吐,降低這個值有利於降低時延
heartbeat 含義未知,預設值填0
sasl_method 用於SSL鑑權,預設值參考後文demo
amqp_channel_open_ok_t*amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);
介面說明:用於關聯conn和channel
amqp_exchange_declare_ok_t*amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive,amqp_boolean_t durable, amqp_table_t arguments);
介面說明:宣告declare
引數說明:state
channel
exchange
type "fanout" "direct" "topic"三選一
passive
curable
arguments
amqp_queue_declare_ok_t*amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable,amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_targuments);
介面說明:宣告queue
引數說明:state amqp connection
channel
queue queue name
passive
durable 佇列是否持久化
exclusive 當前連線不在時,佇列是否自動刪除
aoto_delete 沒有consumer時,佇列是否自動刪除
arguments 用於拓展引數,比如x-ha-policy用於mirrored queue
amqp_queue_bind_ok_t*amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);
介面說明:宣告binding
amqp_basic_qos_ok_t*amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_tprefetch_size, uint16_t prefetch_count, amqp_boolean_t global);
介面說明:qos是 quality of service,我們這裡使用主要用於控制預取訊息數,避免訊息按條數均勻分配,需要和no_ack配合使用
引數說明:state
channel
prefetch_size 以bytes為單位,0為unlimited
prefetch_count 預取的訊息條數
global
amqp_basic_consume_ok_t*amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel,amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local,amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments);
介面說明:開始一個queue consumer
引數說明:state
channel
queue
consumer_tag
no_local
no_ack 是否需要確認訊息後再從佇列中刪除訊息
exclusive
arguments
int amqp_basic_ack(amqp_connection_state_tstate,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple);
intamqp_basic_publish(amqp_connection_state_t state,amqp_channel_tchannel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_tmandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const*properties,amqp_bytes_t body);
介面說明:釋出訊息
引數說明:state
channel
exchange
routing_key 當exchange為預設“”時,此處填寫queue_name,當exchange為direct,此處為binding_key
mandatory 參見參考文獻2
immediate 同上
properties 更多屬性,如何設定訊息持久化,參見文後demo
body 訊息體
amqp_rpc_reply_tamqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,intcode);
amqp_rpc_reply_tamqp_connection_close(amqp_connection_state_t state,int code);
intamqp_destroy_connection(amqp_connection_state_t state);