1. 程式人生 > >Rabbitmq一些基本使用時配置的記錄

Rabbitmq一些基本使用時配置的記錄

  1. 埠的使用 5672為程式中連結rabbitmq,15672為網頁端訪問rabbitmq控制檯的埠,預設不允許guest登入;
  2. 各模式下的一些配置情況:
  • 在釋出/訂閱(Pub/Sub)模式下: 交換機型別為:fanout
  • 在Routing(路由)模式下: 交換機型別為:direct 此模式下可通過routingKey進行訊息的不同消費者的接收
  • 在topic模式下: 交換機型別為topic #:匹配一個或多個 *:匹配一個

在springboot中,如果是使用實體類進行資料的生產及消費則需要保證序列化後的實體類在消費者和生產者中的路徑需保持一致,因為其使用的是JDK進行的序列化。 以下是springboot中交換機和佇列,及其繫結的相關程式碼:

@Configuration
public class RabbitConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(這裡是交換機名,自己定義,但需要和消費者保持一致);
    }

    @Bean
    public Queue queue() {
        /**
         *   durable="true" 持久化 rabbitmq重啟的時候不需要建立新的佇列
         *   exclusive  表示該訊息佇列是否只在當前connection生效,預設是false
         *   auto-delete 表示訊息佇列沒有在使用時將被自動刪除 預設是false
         */

        return new Queue(這裡是佇列名, true, false, false);
    }

	// 繫結交換機和佇列
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(topicExchange()).with(RabbitConstant.ROUTIN_KEY);
    }

}

以下是生產者傳送訊息的程式碼:

@Component
public class Producer {

    @Resource
    private RabbitTemplate rabbitTemplate;

    RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override

        /**
         * CorrelationData 訊息的附加資訊,即自定義id
         * isack 代表訊息是否被broker(MQ)接收 true 代表接收 false代表拒收。
         * cause 如果拒收cause則說明拒收的原因,幫助我們進行後續處理
         */
        public void confirm(CorrelationData correlationData, boolean isAck, String cause) {
            System.out.println("correlationData-----------" + correlationData);
            System.out.println("ack-----------" + isAck);
            if (isAck == false) {
                System.err.println(cause);
            }

        }
    };

    RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int code, String text, String exchange, String routingKey) {
            System.err.println("Code:" + code + ",text:" + text);
            System.err.println("Exchange:" + exchange + ",routingKey:" + routingKey);
        }
    };

    public void sendMsg(Employee employee) {
        //CorrelationData物件的作用是作為訊息的附加資訊傳遞,通常我們用它來儲存訊息的自定義id
        CorrelationData data = new CorrelationData(employee.getEmpno() + new Date().getTime());
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_EMP, RabbitConstant.ROUTIN_KEY, new Gson().toJson(employee), data);

    }

}

以下是消費者從佇列中獲取訊息,並對訊息進行處理的方法:

 //@RabbitListener註解用於宣告式定義訊息接受的佇列與exhcange繫結的資訊
    //在SpringBoot中,消費者這端使用註解獲取訊息
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = RabbitConstant.QUEUE_EMP, durable = "true"),
                    exchange = @Exchange(value = RabbitConstant.EXCHANGE_EMP, durable = "true", type = "topic"),
                    key = "hr.#"
            )
    )
    //用於接收訊息的方法
    @RabbitHandler
    //通知SpringBoot下面的方法用於接收訊息。
    // 這個方法執行後將處於等待的狀態,有新的訊息進來就會自動觸發下面的方法處理訊息
    //@Payload 代表執行時將訊息反序列化後注入到後面的引數中
    public void handleMessage(@Payload String employee, Channel channel,
                              @Headers Map<String, Object> headers) {
        System.out.println("------------------");
        System.out.println("接收到:" + employee);
        Employee emp = new Gson().fromJson(employee, Employee.class);

        System.err.println(emp.toString());

        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

        try {
            channel.basicAck(tag, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

rabbitmq使用中還需要進行叢集配置,主要是通過Haproxy進行MQ叢集的負載均衡,以下是Mirror映象叢集的命令:

#修改hostname
vim /etc/hostname   
m1
m2
#修改hosts叢集裝置
vim /etc/hosts
192.168.132.137 m1
192.168.132.139 m2

#開放防火牆 4369/5672/15672/25672埠
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
#過載防火牆
firewall-cmd --reload

#重啟伺服器
reboot

#同步.erlang.coolie
find / -name *.cookie
scp /var/lib/rabbitmq/.erlang.cookie 192.168.132.134:/var/lib/rabbitmq/

#兩個電腦啟動MQ服務
rabbit-server


# 停止應用 通過rabbitmqctl status 可以檢視當前rabbitmactl預設操作的節點資訊 
rabbitmqctl stop_app  

# 將當前節點加入到一個叢集中 預設磁碟節點被加入的節點只要是叢集中的一員,其他節點都能夠馬上感受到叢集節點的變化  
rabbitmqctl join_cluster [email protected] 

# 重新啟動當前節點  
rabbitmqctl start_app 

#檢視叢集資訊  
rabbitmqctl cluster_status

以下是haproxy.cfg配置檔案的具體內容:

#---------------------------------------------------------------------
# Example configuration for a possible web application.  See the
# full configuration options online.
#
#   http://haproxy.1wt.eu/download/1.4/doc/configuration.txt
#
#---------------------------------------------------------------------

#---------------------------------------------------------------------
# Global settings
#---------------------------------------------------------------------
global
    # to have these messages end up in /var/log/haproxy.log you will
    # need to:
    #
    # 1) configure syslog to accept network log events.  This is done
    #    by adding the '-r' option to the SYSLOGD_OPTIONS in
    #    /etc/sysconfig/syslog
    #
    # 2) configure local2 events to go to the /var/log/haproxy.log
    #   file. A line like the following can be added to
    #   /etc/sysconfig/syslog
    #
    #    local2.*                       /var/log/haproxy.log
    #
    log         127.0.0.1 local2

    chroot      /var/lib/haproxy
    pidfile     /var/run/haproxy.pid
    maxconn     4000
    user        haproxy
    group       haproxy
    daemon

    # turn on stats unix socket
    stats socket /var/lib/haproxy/stats

#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
    mode                    http
    log                     global
    option                  httplog
    option                  dontlognull
    option http-server-close
    option forwardfor       except 127.0.0.0/8
    option                  redispatch
    retries                 3
    timeout http-request    10s
    timeout queue           1m
    timeout connect         10s
    timeout client          1m
    timeout server          1m
    timeout http-keep-alive 10s
    timeout check           10s
    maxconn                 3000

#對MQ叢集進行監聽
listen rabbitmq_cluster
	bind 0.0.0.0:5672
	option tcplog
	mode tcp 
	option  clitcpka
	timeout connect 1s 
	timeout client  10s
	timeout server  10s
	balance roundrobin
	server node1 192.168.132.137:5672 check inter 5s rise 2 fall 3  
    server node2 192.168.132.139:5672 check inter 5s rise 2 fall 3

#開啟haproxy監控服務	
listen http_front
	bind 0.0.0.0:1080
	stats refresh 30s
	stats uri /haproxy?stats
	stats auth admin:admin