Rabbitmq中的工作機制以及相關的springboot整合
阿新 • • 發佈:2018-11-09
Rabbitmq中的工作機制以及相關的springboot整合 1.安裝rabbitmq 獲取rabbitmq映象:docker pull rabbitmq:management 建立並執行容器: docker run -d --hostname my-rabbit --name rabbit -p 8080:15672 rabbitmq:management --hostname:指定容器主機名稱 --name:指定容器名稱 -p:將mq埠號對映到本地 或在執行時設定使用者和密碼 docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management 15672:控制檯埠號 5672:應用訪問埠號 使用http://127.0.0.1:15672訪問rabbit控制檯 2.rabbitmq中的四種工作機制: 左側 P 代表 生產者,也就是往 RabbitMQ 發訊息的程式。 中間即是 RabbitMQ,其中包括了 交換機 和 佇列。 右側 C 代表 消費者,也就是往 RabbitMQ 拿訊息的程式。 那麼,其中比較重要的概念有 4 個,分別為:虛擬主機,交換機,佇列,和繫結。 虛擬主機:一個虛擬主機持有一組交換機、佇列和繫結。為什麼需要多個虛擬主機呢?很簡單,RabbitMQ當中,使用者只能在虛擬主機的粒度進行許可權控制。 因此,如果需要禁止A組訪問B組的交換機/佇列/繫結,必須為A和B分別建立一個虛擬主機。每一個RabbitMQ伺服器都有一個預設的虛擬主機“/”。 交換機:Exchange 用於轉發訊息,但是它不會做儲存 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 傳送過來的訊息。 這裡有一個比較重要的概念:路由鍵 。訊息到交換機的時候,互動機會轉發到對應的佇列中,那麼究竟轉發到哪個佇列,就要根據該路由鍵。 繫結:也就是交換機需要和佇列相繫結,這其中如上圖所示,是多對多的關係。 交換機(Exchange) 交換機的功能主要是接收訊息並且轉發到繫結的佇列,交換機不儲存訊息,在啟用ack模式後,交換機找不到佇列會返回錯誤。交換機有四種類型:Direct, topic, Headers and Fanout Direct:direct 型別的行為是"先匹配, 再投送". 即在繫結時設定一個 routing_key, 訊息的routing_key 匹配時, 才會被交換器投送到繫結的佇列中去. Topic:按規則轉發訊息(最靈活) Headers:設定header attribute引數型別的交換機 Fanout:轉發訊息到所有繫結佇列 Direct Exchange Direct Exchange是RabbitMQ預設的交換機模式,也是最簡單的模式,根據key全文匹配去尋找佇列。 第一個 X - Q1 就有一個 binding key,名字為 orange; X - Q2 就有 2 個 binding key,名字為 black 和 green。當訊息中的 路由鍵 和 這個 binding key 對應上的時候,那麼就知道了該訊息去到哪一個佇列中。 Ps:為什麼 X 到 Q2 要有 black,green,2個 binding key呢,一個不就行了嗎? - 這個主要是因為可能又有 Q3,而Q3只接受 black 的資訊,而Q2不僅接受black 的資訊,還接受 green 的資訊。 Topic Exchange Topic Exchange 轉發訊息主要是根據萬用字元。 在這種交換機下,佇列和交換機的繫結會定義一種路由模式,那麼,萬用字元就要在這種路由模式和路由鍵之間匹配後交換機才能轉發訊息。 在這種交換機模式下: 路由鍵必須是一串字元,用句號(.) 隔開,比如說 agreements.us,或者 agreements.eu.stockholm 等。 路由模式必須包含一個 星號(*),主要用於匹配路由鍵指定位置的一個單詞,比如說,一個路由模式是這樣子:agreements..b.*,那麼就只能匹配路由鍵是這樣子的:第一個單詞是 agreements,第四個單詞是 b。 井號(#)就表示相當於一個或者多個單詞,例如一個匹配模式是agreements.eu.berlin.#,那麼,以agreements.eu.berlin開頭的路由鍵都是可以的。 具體程式碼傳送的時候還是一樣,第一個引數表示交換機,第二個引數表示routing key,第三個引數即訊息。如下: rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is RabbitMQ!"); topic 和 direct 類似, 只是匹配上支援了"模式", 在"點分"的 routing_key 形式中, 可以使用兩個萬用字元: *表示一個詞. #表示零個或多個詞. Headers Exchange headers 也是根據規則匹配, 相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的型別. 在佇列與交換器繫結時, 會設定一組鍵值對規則, 訊息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 訊息被投送到對應佇列. Fanout Exchange Fanout Exchange 訊息廣播的模式,不管路由鍵或者是路由模式,會把訊息發給繫結給它的全部佇列,如果配置了routing_key會被忽略。
3.springboot整合rabbitmq
在yml檔案中進行下列配置:
#rabbitmq spring.rabbitmq.host=10.110.3.62 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #\u6D88\u8D39\u8005\u6570\u91CF spring.rabbitmq.listener.simple.concurrency= 10 spring.rabbitmq.listener.simple.max-concurrency= 10 spring.rabbitmq.listener.simple.prefetch= 1 spring.rabbitmq.listener.simple.auto-startup=true spring.rabbitmq.listener.simple.default-requeue-rejected= true spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0
建立rabbitmqConfig.java
@Configuration public class MQConfig { public static final String MIAOSHA_QUEUE = "miaosha.queue"; public static final String QUEUE = "queue"; public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String HEADER_QUEUE = "header.queue"; public static final String TOPIC_EXCHANGE = "topicExchage"; public static final String FANOUT_EXCHANGE = "fanoutxchage"; public static final String HEADERS_EXCHANGE = "headersExchage"; /** * Direct模式 交換機Exchange * */ @Bean public Queue queue() { return new Queue(QUEUE, true); } /** * Topic模式 交換機Exchange * */ @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2, true); } @Bean public TopicExchange topicExchage(){ return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#"); } /** * Fanout模式 交換機Exchange * */ @Bean public FanoutExchange fanoutExchage(){ return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding FanoutBinding1() { return BindingBuilder.bind(topicQueue1()).to(fanoutExchage()); } @Bean public Binding FanoutBinding2() { return BindingBuilder.bind(topicQueue2()).to(fanoutExchage()); } /** * Header模式 交換機Exchange * */ @Bean public HeadersExchange headersExchage(){ return new HeadersExchange(HEADERS_EXCHANGE); } @Bean public Queue headerQueue1() { return new Queue(HEADER_QUEUE, true); } @Bean public Binding headerBinding() { Map<String, Object> map = new HashMap<String, Object>(); map.put("header1", "value1"); map.put("header2", "value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match(); } }
建立傳送端 mqSend:
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
AmqpTemplate amqpTemplate ;
public void sendMiaoshaMessage(String mm) {
String msg = RedisService.beanToString(mm);
log.info("send message:"+msg);
amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
}
}
建立接收端
Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitListener(queues=MQConfig.MIAOSHA_QUEUE)
public void receive(String message) {
log.info("receive message:"+message);
}
執行檢視結果,看接收端是否收到並顯示資料