Rabbitmq一些基本使用時配置的記錄
阿新 • • 發佈:2018-12-13
- 埠的使用 5672為程式中連結rabbitmq,15672為網頁端訪問rabbitmq控制檯的埠,預設不允許guest登入;
- 各模式下的一些配置情況:
- 在釋出/訂閱(Pub/Sub)模式下: 交換機型別為:fanout
- 在Routing(路由)模式下: 交換機型別為:direct 此模式下可通過routingKey進行訊息的不同消費者的接收
- 在topic模式下: 交換機型別為topic #:匹配一個或多個 *:匹配一個
在springboot中,如果是使用實體類進行資料的生產及消費則需要保證序列化後的實體類在消費者和生產者中的路徑需保持一致,因為其使用的是JDK進行的序列化。 以下是springboot中交換機和佇列,及其繫結的相關程式碼:
@Configuration public class RabbitConfig { @Bean public TopicExchange topicExchange() { return new TopicExchange(這裡是交換機名,自己定義,但需要和消費者保持一致); } @Bean public Queue queue() { /** * durable="true" 持久化 rabbitmq重啟的時候不需要建立新的佇列 * exclusive 表示該訊息佇列是否只在當前connection生效,預設是false * auto-delete 表示訊息佇列沒有在使用時將被自動刪除 預設是false */ return new Queue(這裡是佇列名, true, false, false); } // 繫結交換機和佇列 @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(topicExchange()).with(RabbitConstant.ROUTIN_KEY); } }
以下是生產者傳送訊息的程式碼:
@Component public class Producer { @Resource private RabbitTemplate rabbitTemplate; RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override /** * CorrelationData 訊息的附加資訊,即自定義id * isack 代表訊息是否被broker(MQ)接收 true 代表接收 false代表拒收。 * cause 如果拒收cause則說明拒收的原因,幫助我們進行後續處理 */ public void confirm(CorrelationData correlationData, boolean isAck, String cause) { System.out.println("correlationData-----------" + correlationData); System.out.println("ack-----------" + isAck); if (isAck == false) { System.err.println(cause); } } }; RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int code, String text, String exchange, String routingKey) { System.err.println("Code:" + code + ",text:" + text); System.err.println("Exchange:" + exchange + ",routingKey:" + routingKey); } }; public void sendMsg(Employee employee) { //CorrelationData物件的作用是作為訊息的附加資訊傳遞,通常我們用它來儲存訊息的自定義id CorrelationData data = new CorrelationData(employee.getEmpno() + new Date().getTime()); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_EMP, RabbitConstant.ROUTIN_KEY, new Gson().toJson(employee), data); } }
以下是消費者從佇列中獲取訊息,並對訊息進行處理的方法:
//@RabbitListener註解用於宣告式定義訊息接受的佇列與exhcange繫結的資訊
//在SpringBoot中,消費者這端使用註解獲取訊息
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = RabbitConstant.QUEUE_EMP, durable = "true"),
exchange = @Exchange(value = RabbitConstant.EXCHANGE_EMP, durable = "true", type = "topic"),
key = "hr.#"
)
)
//用於接收訊息的方法
@RabbitHandler
//通知SpringBoot下面的方法用於接收訊息。
// 這個方法執行後將處於等待的狀態,有新的訊息進來就會自動觸發下面的方法處理訊息
//@Payload 代表執行時將訊息反序列化後注入到後面的引數中
public void handleMessage(@Payload String employee, Channel channel,
@Headers Map<String, Object> headers) {
System.out.println("------------------");
System.out.println("接收到:" + employee);
Employee emp = new Gson().fromJson(employee, Employee.class);
System.err.println(emp.toString());
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
rabbitmq使用中還需要進行叢集配置,主要是通過Haproxy進行MQ叢集的負載均衡,以下是Mirror映象叢集的命令:
#修改hostname
vim /etc/hostname
m1
m2
#修改hosts叢集裝置
vim /etc/hosts
192.168.132.137 m1
192.168.132.139 m2
#開放防火牆 4369/5672/15672/25672埠
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
#過載防火牆
firewall-cmd --reload
#重啟伺服器
reboot
#同步.erlang.coolie
find / -name *.cookie
scp /var/lib/rabbitmq/.erlang.cookie 192.168.132.134:/var/lib/rabbitmq/
#兩個電腦啟動MQ服務
rabbit-server
# 停止應用 通過rabbitmqctl status 可以檢視當前rabbitmactl預設操作的節點資訊
rabbitmqctl stop_app
# 將當前節點加入到一個叢集中 預設磁碟節點被加入的節點只要是叢集中的一員,其他節點都能夠馬上感受到叢集節點的變化
rabbitmqctl join_cluster [email protected]
# 重新啟動當前節點
rabbitmqctl start_app
#檢視叢集資訊
rabbitmqctl cluster_status
以下是haproxy.cfg配置檔案的具體內容:
#---------------------------------------------------------------------
# Example configuration for a possible web application. See the
# full configuration options online.
#
# http://haproxy.1wt.eu/download/1.4/doc/configuration.txt
#
#---------------------------------------------------------------------
#---------------------------------------------------------------------
# Global settings
#---------------------------------------------------------------------
global
# to have these messages end up in /var/log/haproxy.log you will
# need to:
#
# 1) configure syslog to accept network log events. This is done
# by adding the '-r' option to the SYSLOGD_OPTIONS in
# /etc/sysconfig/syslog
#
# 2) configure local2 events to go to the /var/log/haproxy.log
# file. A line like the following can be added to
# /etc/sysconfig/syslog
#
# local2.* /var/log/haproxy.log
#
log 127.0.0.1 local2
chroot /var/lib/haproxy
pidfile /var/run/haproxy.pid
maxconn 4000
user haproxy
group haproxy
daemon
# turn on stats unix socket
stats socket /var/lib/haproxy/stats
#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
mode http
log global
option httplog
option dontlognull
option http-server-close
option forwardfor except 127.0.0.0/8
option redispatch
retries 3
timeout http-request 10s
timeout queue 1m
timeout connect 10s
timeout client 1m
timeout server 1m
timeout http-keep-alive 10s
timeout check 10s
maxconn 3000
#對MQ叢集進行監聽
listen rabbitmq_cluster
bind 0.0.0.0:5672
option tcplog
mode tcp
option clitcpka
timeout connect 1s
timeout client 10s
timeout server 10s
balance roundrobin
server node1 192.168.132.137:5672 check inter 5s rise 2 fall 3
server node2 192.168.132.139:5672 check inter 5s rise 2 fall 3
#開啟haproxy監控服務
listen http_front
bind 0.0.0.0:1080
stats refresh 30s
stats uri /haproxy?stats
stats auth admin:admin