1. 程式人生 > 資訊 >貼身即發熱:雪中飛秋衣套裝 29.9 元(不定時結束)

貼身即發熱:雪中飛秋衣套裝 29.9 元(不定時結束)

RabbitMQ簡介

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現

核心概念

Message

訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成, 這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可 能需要永續性儲存)等。

Publisher

訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。

Exchange型別

交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。

Exchange有4種類型:direct(預設),fanout,topic,和headers,不同型別的Exchange轉發訊息的策略有所區別

Direct Exchange

訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配,如果一個佇列繫結到交換機要求路由鍵為“dog”,則只轉發 routingkey 標記為“dog”的訊息,不會轉發“dog.puppy”,也不會轉發“dog.guard” 等等。它是完全匹配、單播的模式。

Fanout Exchange

每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。fanout 交換器不處理路由鍵,只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。fanout 型別轉發訊息是最快的。

Topic Exchange

topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上。 它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號“#”和符號 。匹配0個或多個單詞,匹配一個單詞。

Queue

訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直 在佇列裡面,等待消費者連線到這個佇列將其取走。

Binding

繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交 換器理解成一個由繫結構成的路由表。

Exchange和Queue的繫結可以是多對多的關係。

Connection

網路連線,比如一個TCP連線。

Channel

通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP命令都是通過通道 發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬TCP都 是非常昂貴的開銷,所以引入了通道的概念,以複用一條TCP連線。

Consumer

訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。

Virtual Host

虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加 密環境的獨立伺服器域。每個 vhost 本質上就是一個mini版的RabbitMQ 伺服器,擁 有自己的佇列、交換器、繫結和許可權機制。vhost是AMQP概念的基礎,必須在連線時 指定,RabbitMQ 預設的vhost是/。

Broker

表示訊息佇列伺服器實體

Docker安裝RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p  25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

訪問15672埠

https://www.rabbitmq.com/networking.html

SpringCloud整合RabbitMQ

引入RabbitMQ包

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

引入RabbitMQ,RabbitAutoConfiguration就會自動生效

給容器中自動配置了RabbitTemplate、AmqpAdmin等等

配置檔案

spring.rabbitmq.host=192.168.195.100
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

測試類

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void sendMessageTest() {
        //因為存到rabbit中是經過序列化的,所以加上配置轉成json發出去
        OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
        orderReturnReasonEntity.setId(1L);
        orderReturnReasonEntity.setCreateTime(new Date());
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
        log.info("訊息傳送成功");
    }

    @Test
    void createExchange() {
        //建立了一個Direct型別的交換機  是否持久化 是否自動刪除
        DirectExchange directExchange=new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange建立成功");

    }

    @Test
    void createQueue() {
        Queue queue=new Queue("hello-java-Queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue建立成功");
    }

    @Test
    void createBinding() {
        //將exchange指定的交換機和Directnation目的地進行繫結,使用routingkey作為路由鍵
        Binding binding=new Binding("hello-java-Queue",
                Binding.DestinationType.QUEUE,
                "hello-java-exchange",
                "hello-java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding繫結成功");
    }
}

先建立交換機,然後建立對佇列,繫結路由鍵,利用rabbitTemplate傳送訊息

@RabbitListenter&@RabbitHandler接收訊息

@RabbitListenter監聽訊息

    @RabbitListener(queues = {"hello-java-Queue"})
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel)
    {
        System.out.println("接收到訊息內容:"+message+"內容==》"+content);
    }

如果有多個客戶端,只有一個會收到訊息,並且只有當一個訊息處理完才會收到下一個訊息

如果需要監聽一個佇列裡的多個訊息,訊息的型別都不一樣利用@RabbitHandler

監聽hello-java-Queue佇列裡不同的訊息

@RabbitListener(queues = {"hello-java-Queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    @RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content)
    {
        System.out.println("接收到訊息內容:"+message+"內容==》"+content);
    }

    @RabbitHandler
    public  void  recieveMessage2(OrderEntity orderEntity)
    {
        System.out.println("接收到訊息內容:"+orderEntity);
    }

}

控制器

@Slf4j
@Controller
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMq")
    public String sendMessageTest() {
        for (int i = 0; i < 10; i++) {
            if(i%2==0) {
                //因為存到rabbit中是經過序列化的,所以加上配置轉成json發出去
                OrderReturnReasonEntity orderReturnReasonEntity=new OrderReturnReasonEntity();
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnReasonEntity);
                log.info("訊息傳送成功");
            }
            else {
                OrderEntity orderEntity=new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderEntity);
                log.info("訊息傳送成功");
            }
        }
        return  "";

    }
}

RabbitMQ訊息確認機制-可靠抵達

保證訊息不丟失,可靠抵達,可以使用事務訊息,效能下降250倍,為此引入確認機制

publisher confirmCallback 確認模式

publisher returnCallback 未投遞到 queue 退回模式

consumer ack機制

可靠抵達-ConfirmCallback

如果要使用confirmCallback ,需要配置

#開啟發送端確認
spring.rabbitmq.publisher-confirm-type=correlated
  1. 在建立 connectionFactory 的時候設定 PublisherConfirms(true) 選項,開啟 confirmcallback 。
  2. CorrelationData:用來表示當前訊息唯一性。
  3. 生產者只要把訊息傳送給Broker,訊息只要被 broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會呼叫 confirmCallback。
  4. 被 broker 接收到只能表示 message 已經到達伺服器,並不能保證訊息一定會被投遞到目標 queue 裡。所以需要用到接下來的 returnCallback 。
  @PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 當前訊息的唯一關聯資料(訊息的唯一id)
            * @param ack 訊息是否成功收到
            * @param cuase 失敗的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });
    }

可靠抵達-ReturnCallback

開啟發送訊息抵達佇列的確認

spring.rabbitmq.publisher-returns=true
#只要抵達佇列,以非同步發動有限回撥我們這個returnconfig
spring.rabbitmq.template.mandatory=true

只有當訊息沒有抵達佇列才會觸發方法

  @PostConstruct
    public  void initRabbitTemplate()
    {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /*
            * @param correlationData 當前訊息的唯一關聯資料(訊息的唯一id)
            * @param ack 訊息是否成功收到
            * @param cuase 失敗的原因
            * */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cuase) {
                System.out.println("confirm... cottrlationData"+correlationData+",[ack]"+ack+",cuase"+cuase);
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            //投遞失敗的詳細資訊 回覆的狀態碼 回覆的文字內容 當時這個訊息給給哪個交換機 當時訊息的路由鍵
            @Override
            public void returnedMessage(Message message, int replaycode, String replytext, String exchange, String routekey) {
                System.out.println("Fail...message"+message+",[replaycode]"+replaycode+",[replytext]"+replytext+",[exchange]"+exchange+",[routekey]"+routekey);
            }
        });
    }

可靠抵達-Ack訊息確認機制

在不開啟手動確認的時候,傳送訊息突然伺服器關機會導致訊息丟失,因此需要開啟手動模式保證訊息的可達性

#手動確認訊息達到
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消費者手動確認模式下 只要沒有明確確認訊息,就一直是unached狀態,即使關機 訊息也不會丟失,會重新變為Ready

    @RabbitHandler
    public  void  recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {
        System.out.println("接收到訊息內容:"+message+"內容==》"+content);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //簽收訊息
        channel.basicAck(deliveryTag,false);//true就是重新發回伺服器
        System.out.println("訊息簽收"+deliveryTag);
    }


    @RabbitHandler
    public  void  recieveMessage2(Message message,OrderEntity orderEntity,Channel channel) throws IOException {

        System.out.println("接收到訊息內容:"+orderEntity);
        long deliveryTag=message.getMessageProperties().getDeliveryTag();
        //簽收訊息
        channel.basicNack(deliveryTag,false,true);// 退貨  true就是重新發回伺服器
        System.out.println("沒有簽收"+deliveryTag);
    }

訊息處理成功,ack(),接受下一個訊息,此訊息broker就會移除

訊息處理失敗,nack()/reject(),重新發送給其他人進行處理,或者容錯處理後ack

訊息一直沒有呼叫ack/nack方法,broker認為此訊息正在被處理,不會投遞給別人,此時客戶端斷開,訊息不會被broker移除,會投遞給別人

如何簽收

channel.basicAck(deliveryTag,false) 簽收
channel.basicNack(deliveryTag,false,true); 拒籤