1. 程式人生 > >rabbitMQ學習筆記(三) 訊息確認與公平排程消費者

rabbitMQ學習筆記(三) 訊息確認與公平排程消費者

從本節開始稱Sender為生產者 , Recv為消費者

一、訊息確認

為了確保訊息一定被消費者處理,rabbitMQ提供了訊息確認功能,就是在消費者處理完任務之後,就給伺服器一個回饋,伺服器就會將該訊息刪除,如果消費者超時不回饋,那麼伺服器將就將該訊息重新發送給其他消費者

預設是開啟的,在消費者端通過下面的方式開啟訊息確認,  首先將autoAck自動確認關閉,等我們的任務執行完成之後,手動的去確認,類似JDBC的autocommit一樣

QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck =
false; channel.basicConsume("hello", autoAck, consumer);

在前面的例子中使用的是channel.basicConsume(channelName, true, consumer) ; 在接收到訊息後,就會自動反饋一個訊息給伺服器。

下面這個例子來測試訊息確認的功能。

Sender03.java

package com.zf.rabbitmq03;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 傳送訊息
 * @author zhoufeng
 *
 */
public class Sender03 {
	
	public static void main(String[] args) throws IOException {
		
		
		ConnectionFactory connFac = new ConnectionFactory() ;
		
		//RabbitMQ-Server安裝在本機,所以直接用127.0.0.1
		connFac.setHost("127.0.0.1");
		
		//建立一個連線
		Connection conn = connFac.newConnection() ;
		
		//建立一個渠道
		Channel channel = conn.createChannel() ;
		
		//定義Queue名稱
		String queueName = "queue01" ;
		
		//為channel定義queue的屬性,queueName為Queue名稱
		channel.queueDeclare( queueName , false, false, false, null) ;
		
		String msg = "Hello World!";
		
		//傳送訊息
		channel.basicPublish("", queueName , null , msg.getBytes());
		
		System.out.println("send message[" + msg + "] to "+ queueName +" success!");
		
		channel.close(); 
		conn.close(); 
		
	}

}


與Sender01.java一樣,沒有什麼區別。

Recv03.java

package com.zf.rabbitmq03;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;

/**
 * 接收訊息
 * @author zhoufeng
 *
 */
public class Recv03 {

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		
		ConnectionFactory connFac = new ConnectionFactory() ;
		
		connFac.setHost("127.0.0.1");
		
		Connection conn = connFac.newConnection() ;
		
		Channel channel = conn.createChannel() ;
		
		String channelName = "channel01";
		
		channel.queueDeclare(channelName, false, false, false, null) ;
		
		
		//配置好獲取訊息的方式
		QueueingConsumer consumer = new QueueingConsumer(channel) ;
		

		//取消 autoAck
		boolean autoAck = false ;
		
		channel.basicConsume(channelName, autoAck, consumer) ;
		
		//迴圈獲取訊息
		while(true){
			
			//獲取訊息,如果沒有訊息,這一步將會一直阻塞
			Delivery delivery = consumer.nextDelivery() ;
			
			String msg = new String(delivery.getBody()) ;  
			
			//確認訊息,已經收到
			channel.basicAck(delivery.getEnvelope().getDeliveryTag()
					, false);
			
			System.out.println("received message[" + msg + "] from " + channelName);
		}
		
	}
	
}
注意:一旦將autoAck關閉之後,一定要記得處理完訊息之後,向伺服器確認訊息。否則伺服器將會一直轉發該訊息

如果將上面的channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);註釋掉, Sender03.java只需要執行一次 , Recv03.java每次執行將都會收到HelloWorld訊息

注意:

但是這樣還是不夠的,如果rabbitMQ-Server突然掛掉了,那麼還沒有被讀取的訊息還是會丟失 ,所以我們可以讓訊息持久化。 只需要在定義Queue時,設定持久化訊息就可以了,方法如下:

booleandurable=true;channel.queueDeclare(channelName,durable,false,false,null);
這樣設定之後,伺服器收到訊息後就會立刻將訊息寫入到硬碟,就可以防止突然伺服器掛掉,而引起的資料丟失了。  但是伺服器如果剛收到訊息,還沒來得及寫入到硬碟,就掛掉了,這樣還是無法避免訊息的丟失。

二、公平排程

上一個例子能夠實現傳送一個Message與接收一個Message

從上一個Recv01中可以看出,必須處理完一個訊息,才會去接收下一個訊息。如果生產者眾多,那麼一個消費者肯定是忙不過來的。此時就可以用多個消費者來對同一個Channel的訊息進行處理,並且要公平的分配任務給多個消費者。不能部分很忙  部分總是空閒

實現公平排程的方式就是讓每個消費者在同一時刻會分配一個任務。 通過channel.basicQos(1);可以設定

相關推薦

rabbitMQ學習筆記() 訊息確認公平排程消費者

從本節開始稱Sender為生產者 , Recv為消費者 一、訊息確認 為了確保訊息一定被消費者處理,rabbitMQ提供了訊息確認功能,就是在消費者處理完任務之後,就給伺服器一個回饋,伺服器就會將該訊息刪除,如果消費者超時不回饋,那麼伺服器將就將該訊息重新發送給其他消費者

Spring Cloud學習筆記()——服務發現消費之使用Ribbon

服務消費一般使用ribbon和feign兩種方式。而feign實際上也是以ribbon為基礎的。有多個服務提供者例項的情況下ribbon可以實現負載均衡。 1.pom檔案:這裡與服務提供者不同的是需要引入ribbon包。 <dependency>

Pytorch學習筆記()線性迴歸邏輯迴歸

在瞭解了Pytorch的一些機制後,當然要進行一些例項的學習,畢竟實踐出真知嘛。 對於所有的機器學習愛好者來說,第一個要學的模型無疑是線性迴歸 所謂線性迴歸,指的就是用對輸入資料的每個維度進行線性組合擬合Label-y。最簡單的線性迴歸即是二維平面內的直線擬

RabbitMQ學習筆記:Exchange的學習(1)

一、概述     上一篇文章中講述了一個簡單的訊息傳遞模型,訊息從生產者傳送到消費者再發送到佇列,實際的工作中生產者不知道要把訊息傳送給哪個佇列,可能有多個消費者要生產者的訊息,也可能有的消費者不需要生產者的全部訊息,比如日誌系統,一個消費者需要info級別的資訊,另一個消

RabbitMQ學習筆記八:RabbitMQ訊息確認

來源: https://blog.csdn.net/chenxyt/article/details/79259838 一、概述     前文說到RabbitMQ的交換機、佇列、訊息的持久化並不能100%的保證訊息不會丟失。首先從生產者端,持久化的訊息在

rabbitMQ學習筆記(二) 簡單的傳送接收訊息 HelloWorld

首先要下載rabbitmq的javaClient庫,然後加入到專案中,下載地址為:http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-bin-3.1.5.zip

Linux學習筆記():系統執行級執行級的切換

查看 用戶操作 回車 water hat ntsysv tde 文件表 config 1.Linux系統與其它的操作系統不同,它設有執行級別。該執行級指定操作系統所處的狀態。Linux系統在不論什麽時候都執行於某個執行級上,且在不同的執行級上執行的程序和服務都不同,所要

RabbitMQ學習筆記(1)----RabbitMQ簡介安裝

·1. 什麼是RabbitMQ?   RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。   而AMQP協議則是指:即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協

RabbitMQ學習筆記)-----------------RabbitMQ不同的交換機進行路由

專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Exchange_RabbitMQ 專案結構: 流程圖 補充知識: Connection是RabbitMQ的sockert連

RabbitMQ學習筆記(二)-----------------RabbitMQ生產消費訊息

專案地址:https://github.com/gongxianshengjiadexiaohuihui/RabbitMQ/tree/master/Hello_RabbitMQ 專案結構 需要的jar包 專案流程圖 x 首先是生產者的類,我們需要與RabbitServ

opencv學習筆記十二:Haar特徵積分影象

一、 Haar特徵定義         Haar特徵是基於“塊”的特徵,也被稱為矩形特徵。Haar特徵(模板)分為三類:邊緣特徵、線性特徵、中心特徵和對角線特徵。特徵模板內有白色和黑色兩種矩形,並定義該模板的特徵值為白色矩形畫素和減去黑色矩形畫素和。Haar特徵值反映了影象

opencv學習筆記十六:AKAZE特徵點檢測匹配

KAZE是日語音譯過來的 , KAZE與SIFT、SURF最大的區別在於構造尺度空間,KAZE是利用非線性方式構造,得到的關鍵點也就更準確(尺度不變性 ); Hessian矩陣特徵點檢測 ,方向指定,基於一階微分影象(旋轉不變性 ) ; 描述子生成 ,歸一化處理(光照不變

RabbitMq學習筆記

RabbitMQ原生API三種交換模式 1. Hello World 在這裡沒有宣告交換機(exchange),也沒有宣告繫結(bind),RabbitMQ會使用預設的交換機(AMQP default)路由鍵就是佇列名稱 【生產者】 /** * 消費者

MongoDB學習筆記()——安裝配置MongoDB(Linux)

前一篇博文講解了如何安裝與配置MongoDB的windows版,本篇博文接著上一篇講解如何在Linux系統中安裝與配置MongoDB,為了演示,我問同事要了它的雲伺服器用於演示,當然我自己也有,但是已經安裝了,就不解除安裝重新裝了[笑臉]。 下載Linux版的

資料庫系統實現學習筆記(更新異常規範化設計)--by穆晨

前言         在前兩篇中,主要講了ER建模和關係建模。在具體分析如何用資料庫管理軟體RDBMS(Relational Database Management System)實現這些關係前,我想有必要思考下面這個問題:  

angularjs學習筆記——AngularJSMVC模式

本文主要解決一下幾個問題: 什麼是MVCMVC優點缺點AngularJS的MVC是怎樣的來個簡單的示例 一、什麼是MVC        MVC模式非常重要,無論你是B/S還是C/S的開發者(實際上個人認為B/S也是C/S的一種特殊形式),甚至你是純Client或Browse

RabbitMq學習筆記(四)—— 訊息路由(Routing)

//宣告直連交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 獲取匿名佇列名稱 String queueName = channel.queueDeclare().getQueue()

RabbitMQ學習筆記七:交換機、佇列、訊息的持久化

一、概述     在生產過程中,難免會發生伺服器宕機的事情,RabbitMQ也不例外,可能由於某種特殊情況下的異常而導致RabbitMQ宕機從而重啟,那麼這個時候對於訊息佇列裡的資料,包括交換機、佇列以及佇列中存在訊息恢復就顯得尤為重要了。RabbitMQ本身帶有持久化機制

Vue學習筆記:使用vuexlocalstorage管理登入許可權

使用vuex與localstorage管理登入許可權 此次採用vuex + localstorage 配合使用來管理使用者的登入狀態,只使用vuex的話在使用者進行重新整理時將會自動刪除,所以配合localstorage,這樣可以讓SPA應用中既可以同步

《R語言入門實踐》學習筆記

第三課任務: 完成專案玩撲克牌前1/2,並通過專案學會以下技能:    1)儲存新的資料型別,比如字串和邏輯值。    2)將資料集儲存為向量,矩陣,陣列,列表或者資料框型別。    3)用R載入或儲存下載的資料。 …………………………………………………………………… 可以