1. 程式人生 > >RabbitMq之Routing,Topics

RabbitMq之Routing,Topics

Direct Exchange – 處理路由鍵。需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。這是一個完整的匹配。

publish端:

        //由exchanges+routingkey匹配才廣播訊息
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String[] messages = new String[]{"111 message.","211 message..","311 message...","411 message....","511 message.....","611 message.....","711 message.....","811 message.....","911 message....."};
		for(String msg : messages){
			//在direct型別中需要指定路由routingkey
		        channel.basicPublish(EXCHANGE_NAME, "routingkey", null, msg.getBytes());
			System.out.println(" [x] Sent '" + msg + "'");
		}
subscrbe:
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
	    //伺服器建立佇列
	    String queueName = channel.queueDeclare().getQueue();
	    //將佇列繫結到exchange,且指定一個路由key
	    channel.queueBind(queueName, EXCHANGE_NAME, "routingkey");


Fanout Exchange – 不處理路由鍵。你只需要簡單的將佇列繫結到交換機上。一個傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上。

publish:

        //建立exchange
        //fanout將訊息廣播到全部繫結的佇列中
        //channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String[] messages = new String[]{"111 message.","211 message..","311 message...","411 message....","511 message.....","611 message.....","711 message.....","811 message.....","911 message....."};
		for(String msg : messages){
		    channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
			System.out.println(" [x] Sent '" + msg + "'");
		}
subscribe:
            //建立exchange
            //fanout將訊息廣播到佇列中
	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    //伺服器建立佇列
	    String queueName = channel.queueDeclare().getQueue();
	    //將佇列繫結到exchange
	    channel.queueBind(queueName, EXCHANGE_NAME, "");

Topic Exchange – 將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。

publish:

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        
        String[] messages = new String[]{"111 message.","211 message..","311 message...","411 message....","511 message.....","611 message.....","711 message.....","811 message.....","911 message....."};
		for(String msg : messages){
			channel.basicPublish(EXCHANGE_NAME, "routingkey.yzr", null, msg.getBytes());
			System.out.println(" [x] Sent '" + msg + "'");
		}

subscribe:
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
	    //伺服器建立佇列
	    String queueName = channel.queueDeclare().getQueue();
	    //將佇列繫結到exchange
	    channel.queueBind(queueName, EXCHANGE_NAME, "routingkey.*");

headers :一個map集合屬性的key來匹配.

publish:

        //建立exchange
        //fanout將訊息廣播到佇列中
        //channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //由exchanges+routingkey匹配才有廣播訊息
        //channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.exchangeDeclare(EXCHANGE_NAME, "headers");
        
        String[] messages = new String[]{"111 message.","211 message..","311 message...","411 message....","511 message.....","611 message.....","711 message.....","811 message.....","911 message....."};
		for(String msg : messages){
			//在direct型別中需要指定佇列名,即路由routingkey
		    //channel.basicPublish(EXCHANGE_NAME, "routingkey", null, msg.getBytes());
		    //channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
			//channel.basicPublish(EXCHANGE_NAME, "routingkey.yzr", null, msg.getBytes());
			
			BasicProperties properties=new BasicProperties();
			Map<String,Object> headers=new HashMap<String,Object>();
			headers.put("key1", "12345");
			properties.builder().headers(headers);
		    channel.basicPublish(EXCHANGE_NAME, "routingkey.yzr", properties, msg.getBytes());
			System.out.println(" [x] Sent '" + msg + "'");
		}
subscribe:
            //建立exchange
            //fanout將訊息廣播到佇列中
	    //channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    //channel.exchangeDeclare(EXCHANGE_NAME, "direct");
	    //channel.exchangeDeclare(EXCHANGE_NAME, "topic");
	    channel.exchangeDeclare(EXCHANGE_NAME, "headers");
	    //伺服器建立佇列
	    String queueName = channel.queueDeclare().getQueue();
	    //將佇列繫結到exchange
	    //channel.queueBind(queueName, EXCHANGE_NAME, "routingkey");
	    //channel.queueBind(queueName, EXCHANGE_NAME, "");
	    //channel.queueBind(queueName, EXCHANGE_NAME, "routingkey.*");
	    Map<String,Object> spec=new HashMap<String,Object>();
	    spec.put("key1", "12345");
	    channel.queueBind(queueName, EXCHANGE_NAME, "",spec);

使用fanout簡單演示的完整程式碼:

package yzr.main;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

	private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //建立exchange
        //fanout將訊息廣播到佇列中
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //由exchanges+routingkey匹配才有廣播訊息
        //channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //channel.exchangeDeclare(EXCHANGE_NAME, "headers");
        
        String[] messages = new String[]{"111 message.","211 message..","311 message...","411 message....","511 message.....","611 message.....","711 message.....","811 message.....","911 message....."};
		for(String msg : messages){
			//在direct型別中需要指定佇列名,即路由routingkey
		    //channel.basicPublish(EXCHANGE_NAME, "routingkey", null, msg.getBytes());
		    channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
			//channel.basicPublish(EXCHANGE_NAME, "routingkey.yzr", null, msg.getBytes());
			
			//BasicProperties properties=new BasicProperties();
			//Map<String,Object> headers=new HashMap<String,Object>();
			//headers.put("key1", "12345");
			//properties.builder().headers(headers);
		    //channel.basicPublish(EXCHANGE_NAME, "routingkey.yzr", properties, msg.getBytes());
			System.out.println(" [x] Sent '" + msg + "'");
		}
		channel.close();
		connection.close();
    }

}

package yzr.main;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLogs {

	private static final String EXCHANGE_NAME = "logs";

	  public static void main(String[] argv) throws Exception {
	    ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
	    //建立exchange
        //fanout將訊息廣播到佇列中
	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    //channel.exchangeDeclare(EXCHANGE_NAME, "direct");
	    //channel.exchangeDeclare(EXCHANGE_NAME, "topic");
	    //channel.exchangeDeclare(EXCHANGE_NAME, "headers");
	    //伺服器建立佇列
	    String queueName = channel.queueDeclare().getQueue();
	    //將佇列繫結到exchange
	    //channel.queueBind(queueName, EXCHANGE_NAME, "routingkey");
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
	    //channel.queueBind(queueName, EXCHANGE_NAME, "routingkey.*");
	    //Map<String,Object> spec=new HashMap<String,Object>();
	    //spec.put("key1", "12345");
	    //channel.queueBind(queueName, EXCHANGE_NAME, "",spec);
	    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

	    Consumer consumer = new DefaultConsumer(channel) {
	      @Override
	      public void handleDelivery(String consumerTag, Envelope envelope,
	                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
	        String message = new String(body, "UTF-8");
	        System.out.println(" [x] Received '" + message + "'");
	      }
	    };
	    channel.basicConsume(queueName, true, consumer);
	  }

}


相關推薦

RabbitMqRouting,Topics

Direct Exchange – 處理路由鍵。需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。這是一個完整的匹配。 publish端: //由exchanges+routingkey匹配才廣播訊息 channel.ex

RabbitMQTopics(萬用字元模式)

說明:此模式實在路由key模式的基礎上,使用了萬用字元來管理消費者接收訊息。生產者P傳送訊息到交換機X,type=topic,交換機根據繫結佇列的routing key的值進行萬用字元匹配; 符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor符號*:只能匹配一個詞l

RabbitMQ學習筆記五:RabbitMQ優先級消息隊列

-c virtual 調用 itl 3.5 rri color images 執行順序 RabbitMQ優先級隊列註意點: 1、只有當消費者不足,不能及時進行消費的情況下,優先級隊列才會生效 2、RabbitMQ3.5以後才支持優先級隊列 代碼在博客:RabbitMQ學習筆

RabbitMQ消費者Demo(隊列參數詳細說明)

per don create pac col div icp rod 忽略 1 package com.jiefupay; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 i

Java使用RabbitMQ公平分發

env 接受 col java catch conn ech exceptio don 發送消息: 1 package org.study.workfair; 2 3 import com.rabbitmq.client.Channel; 4 import co

Java使用RabbitMQ整合Spring(生產者)

獲取 消息 org str 依賴 chang import name 5.1 依賴包 <!--RabbitMQ集成spring--> <!-- https://mvnrepository.com/artifact/org

SpringBoot整合RabbitMQ發送接收消息實戰

container 會同 prope spring 註解 流行 pin public lin 實戰前言 前幾篇文章中,我們介紹了SpringBoot整合RabbitMQ的配置以及實戰了Spring的事件驅動模型,這兩篇文章對於我們後續實戰RabbitMQ其他知識要點將起到奠

SpringBoot整合RabbitMQ典型應用場景實戰二

factor aid 分享圖片 actor esp rem 排隊 stc tps 實戰前言RabbitMQ 作為目前應用相當廣泛的消息中間件,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模塊中具有重要的作用,比如業務服務模塊解耦、異步通信、

SpringBoot整合RabbitMQ典型應用場景實戰三

分布 boot 自動刪除 blog jce 地址 這樣的 實施 微服務 實戰前言RabbitMQ 作為目前應用相當廣泛的消息中間件,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模塊中具有重要的作用,比如業務服務模塊解耦、異步通信、高並發限流

RabbitMQwin下安裝

1、安裝前準備 RabbitMQ 是建立在強大的Erlang OTP平臺上,因此安裝RabbitMQ之前要先安裝Erlang。 erlang:http://www.erlang.org/download.html rabbitmq:http://www.rabbitmq.com/down

SpringBoot整合RabbitMQ 典型應用場景實戰二

實戰前言 RabbitMQ 作為目前應用相當廣泛的訊息中介軟體,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模組中具有重要的作用,比如業務服務模組解耦、非同步通訊、高併發限流、超時業務、資料延遲處理等。上一篇博文我分享了RabbitMQ在業務服務模組解耦,非

SpringBoot整合RabbitMQ 典型應用場景實戰一

實戰前言 RabbitMQ 作為目前應用相當廣泛的訊息中介軟體,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模組中具有重要的作用,比如業務服務模組解耦、非同步通訊、高併發限流、超時業務、資料延遲處理等。 RabbitMQ 官網拜讀 首先,讓我們先拜讀

SpringBoot整合RabbitMQSpring事件驅動模型

實戰背景:在進入RabbitMQ各大技術知識點之前,我們先來談談跟事件驅動息息相關的ApplicationEvent、ApplicationListener以及ApplicationEventPublisher這三大元件,點選進去看其原始碼可以發現裡面使用的CachingConnectionFa

SpringBoot整合RabbitMQ整合配置篇

實戰背景:RabbitMQ實戰第一階段-RabbitMQ的官網拜讀已經結束了,相信諸位童鞋或多或少都能入了個門,如果還是覺得迷迷糊糊似懂非懂的,那我建議諸位可以親自去拜讀拜讀官網的技術手冊或者看多幾篇我的視訊跟原始碼!因為接下來我們將進入第二階段,即應用實戰階段。其中,第一階段的內容因為屬於入門

RabbitMQ路由模式RoutingKey模式

生產者傳送訊息到交換機並指定一個路由key,消費者佇列繫結到交換機時要制定路由key(key匹配就能接受訊息,key不匹配就不能接受訊息) 例如:我們可以把路由key設定為insert ,那麼消費者佇列key指定包含insert才可以接收訊息,消費者佇列key定義為update或者delete就不能接收

SpringBoot整合RabbitMQ基礎例項2

此文承接SpringBoot整合RabbitMQ之基礎例項1的所有配置 配置交換機,佇列及繫結關係 package com.etoak.crazy.config.rabbitmq; import org.springframework.amqp.core.Binding; im

SpringBoot整合RabbitMQ基礎例項1

在pom檔案中匯入依賴 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</

springboot+RabbitMqtopic和fanout

1.建立一個springboot專案,匯入如下maven座標 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

RabbitMQ概念介紹(二)

簡介:   MQ全稱為Message Queue,訊息佇列是一種應用程式間的通訊方法。   其是消費者-生產者模型的一個典型代表,一端往訊息佇列中不斷寫入訊息,另一端不斷讀取/訂閱訊息。 簡單使用場景舉例:   將專案中無需即時返回且耗時的操作提取出來,進行非同步處理,大大降低了伺服器的請求響應時間。

rabbitmq學習(三):rabbitmq扇形交換機、主題交換機

 前言 上篇我們學習了rabbitmq的作用以及直連交換機的程式碼實現,這篇我們繼續看如何用程式碼實現扇形交換機和主題交換機 一、扇形交換機   1.生產者    /** * 生產者 */ public class LogProducer { //交換機名稱 pu