Dubbo原始碼解析(五)註冊中心——multicast
註冊中心——multicast
目標:解釋以為multicast實現的註冊中心原理,理解單播、廣播、多播區別,解讀duubo-registry-multicast的原始碼
這是dubbo實現註冊中心的第二種方式,也是dubbo的demo模組中用的註冊中心實現方式。multicast其實是用到了MulticastSocket來實現的。
我這邊稍微補充一點關於多點廣播,也就是MulticastSocket的介紹。MulticastSocket類是繼承了DatagramSocket類,DatagramSocket只允許把資料報傳送給一個指定的目標地址,而MulticastSocket可以將資料報以廣播的形式傳送給多個客戶端。它的思想是MulticastSocket會把一個資料包傳送給一個特定的多點廣播地址,這個多點廣播地址是一組特殊的網路地址,當客戶端需要傳送或者接收廣播資訊時,只要加入該組就好。IP協議為多點廣播提供了一批特殊的IP地址,地址範圍是224.0.0.0至239.255.255.255。MulticastSocket類既可以將資料報傳送到多點廣播地址,也可以接收其他主機的廣播資訊。
以上是對multicast背景的簡略介紹,接下來讓我們具體的來看dubbo怎麼把MulticastSocket運用到註冊中心的實現中。
我們先來看看包下面有哪些類:
可以看到跟預設的註冊中心的包結構非常類似。接下來我們就來解讀一下這兩個類。
(一)MulticastRegistry
該類繼承了FailbackRegistry類,該類就是針對註冊中心核心的功能註冊、訂閱、取消註冊、取消訂閱,查詢註冊列表進行展開,利用廣播的方式去實現。
1.屬性
// logging output
// 日誌記錄輸出
private static final Logger logger = LoggerFactory.getLogger(MulticastRegistry.class);
// 預設的多點廣播埠
private static final int DEFAULT_MULTICAST_PORT = 1234;
// 多點廣播的地址
private final InetAddress mutilcastAddress;
// 多點廣播
private final MulticastSocket mutilcastSocket;
// 多點廣播埠
private final int mutilcastPort;
//收到的URL
private final ConcurrentMap<URL,Set<URL>> received = new ConcurrentHashMap<URL,Set<URL>>();
// 任務排程器
private final ScheduledExecutorService cleanExecutor = Executors.newScheduledThreadPool(1,new NamedThreadFactory("DubboMulticastRegistryCleanTimer",true));
// 清理計時器,一定時間清理過期的url
private final ScheduledFuture<?> cleanFuture;
// 清理的間隔時間
private final int cleanPeriod;
// 管理員許可權
private volatile boolean admin = false;
複製程式碼
看上面的屬性,需要關注以下幾個點:
- mutilcastSocket,該類是muticast註冊中心實現的關鍵,這裡補充一下單播、廣播、以及多播的區別,因為下面會涉及到。單播是每次只有兩個實體相互通訊,傳送端和接收端都是唯一確定的;廣播目的地址為網路中的全體目標,而多播的目的地址是一組目標,加入該組的成員均是資料包的目的地。
- 關注任務排程器和清理計時器,該類封裝了定時清理過期的服務的策略。
2.構造方法
public MulticastRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
if (!isMulticastAddress(url.getHost())) {
throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ",scope: 224.0.0.0 - 239.255.255.255");
}
try {
mutilcastAddress = InetAddress.getByName(url.getHost());
// 如果url攜帶的配置中沒有埠號,則使用預設埠號
mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
mutilcastSocket = new MulticastSocket(mutilcastPort);
// 禁用多播資料報的本地環回
mutilcastSocket.setLoopbackMode(false);
// 加入同一組廣播
mutilcastSocket.joinGroup(mutilcastAddress);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[2048];
// 例項化資料報
DatagramPacket recv = new DatagramPacket(buf,buf.length);
while (!mutilcastSocket.isClosed()) {
try {
// 接收資料包
mutilcastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
msg = msg.substring(0,i).trim();
}
// 接收訊息請求,根據訊息並相應操作,比如註冊,訂閱等
MulticastRegistry.this.receive(msg,(InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf,(byte) 0);
} catch (Throwable e) {
if (!mutilcastSocket.isClosed()) {
logger.error(e.getMessage(),e);
}
}
}
}
},"DubboMulticastRegistryReceiver");
// 設定為守護程式
thread.setDaemon(true);
// 開啟執行緒
thread.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(),e);
}
// 優先從url中獲取清理延遲配置,若沒有,則預設為60s
this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY,Constants.DEFAULT_SESSION_TIMEOUT);
// 如果配置了需要清理
if (url.getParameter("clean",true)) {
// 開啟計時器
this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// 清理過期的服務
clean(); // Remove the expired
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at clean expired provider,cause: " + t.getMessage(),t);
}
}
},cleanPeriod,TimeUnit.MILLISECONDS);
} else {
this.cleanFuture = null;
}
}
複製程式碼
這個構造器最關鍵的就是一個執行緒和一個定時清理任務。
- 執行緒中做的工作是根據接收到的訊息來判定是什麼請求,作出對應的操作,只要mutilcastSocket沒有斷開,就一直接收訊息,內部的實現體現在receive方法中,下文會展開講述。
- 定時清理任務是清理過期的註冊的服務。通過兩次socket的嘗試來判定是否過期。clean方法下文會展開講述
3.isMulticastAddress
private static boolean isMulticastAddress(String ip) {
int i = ip.indexOf('.');
if (i > 0) {
String prefix = ip.substring(0,i);
if (StringUtils.isInteger(prefix)) {
int p = Integer.parseInt(prefix);
return p >= 224 && p <= 239;
}
}
return false;
}
複製程式碼
該方法很簡單,為也沒寫註釋,就是判斷是否為多點廣播地址,地址範圍是224.0.0.0至239.255.255.255。
4.clean
private void clean() {
// 當url中攜帶的服務介面配置為是*時候,才可以執行清理
if (admin) {
for (Set<URL> providers : new HashSet<Set<URL>>(received.values())) {
for (URL url : new HashSet<URL>(providers)) {
// 判斷是否過期
if (isExpired(url)) {
if (logger.isWarnEnabled()) {
logger.warn("Clean expired provider " + url);
}
//取消註冊
doUnregister(url);
}
}
}
}
}
複製程式碼
該方法也比較簡單,關機的是如何判斷過期以及做的取消註冊的操作。下面會展開講解這幾個方法。
5.isExpired
private boolean isExpired(URL url) {
// 如果為非動態管理模式或者協議是consumer、route或者override,則沒有過期
if (!url.getParameter(Constants.DYNAMIC_KEY,true)
|| url.getPort() <= 0
|| Constants.CONSUMER_PROTOCOL.equals(url.getProtocol())
|| Constants.ROUTE_PROTOCOL.equals(url.getProtocol())
|| Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
return false;
}
Socket socket = null;
try {
// 利用url攜帶的主機地址和埠號例項化socket
socket = new Socket(url.getHost(),url.getPort());
} catch (Throwable e) {
// 如果例項化失敗,等待100ms重試第二次,如果還失敗,則判定已過期
try {
// 等待100ms
Thread.sleep(100);
} catch (Throwable e2) {
}
Socket socket2 = null;
try {
socket2 = new Socket(url.getHost(),url.getPort());
} catch (Throwable e2) {
return true;
} finally {
if (socket2 != null) {
try {
socket2.close();
} catch (Throwable e2) {
}
}
}
} finally {
if (socket != null) {
try {
socket.close();
} catch (Throwable e) {
}
}
}
return false;
}
複製程式碼
這個方法就是判斷服務是否過期,有兩次嘗試socket的操作,如果嘗試失敗,則判斷為過期。
6.receive
private void receive(String msg,InetSocketAddress remoteAddress) {
if (logger.isInfoEnabled()) {
logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
}
// 如果這個訊息是以register、unregister、subscribe開頭的,則進行相應的操作
if (msg.startsWith(Constants.REGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
// 註冊服務
registered(url);
} else if (msg.startsWith(Constants.UNREGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
// 取消註冊服務
unregistered(url);
} else if (msg.startsWith(Constants.SUBSCRIBE)) {
URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
// 獲得以及註冊的url集合
Set<URL> urls = getRegistered();
if (urls != null && !urls.isEmpty()) {
for (URL u : urls) {
// 判斷是否合法
if (UrlUtils.isMatch(url,u)) {
String host = remoteAddress != null && remoteAddress.getAddress() != null
? remoteAddress.getAddress().getHostAddress() : url.getIp();
// 建議服務提供者和服務消費者在不同機器上執行,如果在同一機器上,需設定unicast=false
// 同一臺機器中的多個程式不能單播單播,或者只有一個程式接收資訊,發給消費者的單播訊息可能被提供者搶佔,兩個消費者在同一臺機器也一樣,
// 只有multicast註冊中心有此問題
if (url.getParameter("unicast",true) // Whether the consumer's machine has only one process
&& !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
unicast(Constants.REGISTER + " " + u.toFullString(),host);
} else {
broadcast(Constants.REGISTER + " " + u.toFullString());
}
}
}
}
}/* else if (msg.startsWith(UNSUBSCRIBE)) {
}*/
}
複製程式碼
可以很清楚的看到,根據接收到的訊息開頭的資料來判斷需要做什麼型別的操作,重點在於訂閱,可以選擇單播訂閱還是廣播訂閱,這個取決於url攜帶的配置是什麼。
7.broadcast
private void broadcast(String msg) {
if (logger.isInfoEnabled()) {
logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
// 例項化資料報,重點是目的地址是mutilcastAddress
DatagramPacket hi = new DatagramPacket(data,data.length,mutilcastAddress,mutilcastPort);
// 傳送資料報
mutilcastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(),e);
}
}
複製程式碼
這是廣播的實現方法,重點是資料報的目的地址是mutilcastAddress。代表著一組地址
8.unicast
private void unicast(String msg,String host) {
if (logger.isInfoEnabled()) {
logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
// 例項化資料報,重點是目的地址是隻是單個地址
DatagramPacket hi = new DatagramPacket(data,InetAddress.getByName(host),e);
}
}
複製程式碼
這是單播的實現,跟廣播的區別就只是目的地址不一樣,單播的目的地址就只是一個地址,而廣播的是一組地址。
9.doRegister && doUnregister && doSubscribe && doUnsubscribe
@Override
protected void doRegister(URL url) {
broadcast(Constants.REGISTER + " " + url.toFullString());
}
@Override
protected void doUnregister(URL url) {
broadcast(Constants.UNREGISTER + " " + url.toFullString());
}
@Override
protected void doSubscribe(URL url,NotifyListener listener) {
// 當url中攜帶的服務介面配置為是*時候,才可以執行清理,類似管理員許可權
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
admin = true;
}
broadcast(Constants.SUBSCRIBE + " " + url.toFullString());
// 對監聽器進行同步鎖
synchronized (listener) {
try {
listener.wait(url.getParameter(Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT));
} catch (InterruptedException e) {
}
}
}
@Override
protected void doUnsubscribe(URL url,NotifyListener listener) {
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY,true)) {
unregister(url);
}
broadcast(Constants.UNSUBSCRIBE + " " + url.toFullString());
}
複製程式碼
這幾個方法就是實現了父類FailbackRegistry的抽象方法。都是呼叫了broadcast方法。
10.destroy
@Override
public void destroy() {
super.destroy();
try {
// 取消清理任務
if (cleanFuture != null) {
cleanFuture.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(),t);
}
try {
// 把該地址從組內移除
mutilcastSocket.leaveGroup(mutilcastAddress);
// 關閉mutilcastSocket
mutilcastSocket.close();
} catch (Throwable t) {
logger.warn(t.getMessage(),t);
}
// 關閉執行緒池
ExecutorUtil.gracefulShutdown(cleanExecutor,cleanPeriod);
}
複製程式碼
該方法的邏輯跟dubbo註冊中心的destroy方法類似,就多了把該地址從組內移除的操作。gracefulShutdown方法我在《dubbo原始碼解析(四)註冊中心——dubbo》中已經講到。
11.register
@Override
public void register(URL url) {
super.register(url);
registered(url);
}
複製程式碼
protected void registered(URL url) {
// 遍歷訂閱的監聽器集合
for (Map.Entry<URL,Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
// 判斷是否合法
if (UrlUtils.isMatch(key,url)) {
// 通過消費者url獲得接收到的服務url集合
Set<URL> urls = received.get(key);
if (urls == null) {
received.putIfAbsent(key,new ConcurrentHashSet<URL>());
urls = received.get(key);
}
// 加入服務url
urls.add(url);
List<URL> list = toList(urls);
for (NotifyListener listener : entry.getValue()) {
// 把服務url的變化通知監聽器
notify(key,listener,list);
synchronized (listener) {
listener.notify();
}
}
}
}
}
複製程式碼
可以看到該類重寫了父類的register方法,不過邏輯沒有過多的變化,就是把需要註冊的url放入快取中,如果通知監聽器url的變化。
12.unregister
@Override
public void unregister(URL url) {
super.unregister(url);
unregistered(url);
}
複製程式碼
protected void unregistered(URL url) {
// 遍歷訂閱的監聽器集合
for (Map.Entry<URL,Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key,url)) {
Set<URL> urls = received.get(key);
// 快取中移除
if (urls != null) {
urls.remove(url);
}
if (urls == null || urls.isEmpty()){
if (urls == null){
urls = new ConcurrentHashSet<URL>();
}
// 設定攜帶empty協議的url
URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
urls.add(empty);
}
List<URL> list = toList(urls);
// 通知監聽器 服務url變化
for (NotifyListener listener : entry.getValue()) {
notify(key,list);
}
}
}
}
複製程式碼
這個邏輯也比較清晰,把需要取消註冊的服務url從快取中移除,然後如果沒有接收的服務url了,就加入一個攜帶empty協議的url,然後通知監聽器服務變化。
13.lookup
@Override
public List<URL> lookup(URL url) {
List<URL> urls = new ArrayList<URL>();
// 通過消費者url獲得訂閱的服務的監聽器
Map<String,List<URL>> notifiedUrls = getNotified().get(url);
// 獲得註冊的服務url集合
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> values : notifiedUrls.values()) {
urls.addAll(values);
}
}
// 如果為空,則從記憶體快取properties獲得相關value,並且返回為註冊的服務
if (urls.isEmpty()) {
List<URL> cacheUrls = getCacheUrls(url);
if (cacheUrls != null && !cacheUrls.isEmpty()) {
urls.addAll(cacheUrls);
}
}
// 如果還是為空則從快取registered中獲得已註冊 服務URL 集合
if (urls.isEmpty()) {
for (URL u : getRegistered()) {
if (UrlUtils.isMatch(url,u)) {
urls.add(u);
}
}
}
// 如果url攜帶的配置服務介面為*,也就是所有服務,則從快取subscribed獲得已註冊 服務URL 集合
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
for (URL u : getSubscribed().keySet()) {
if (UrlUtils.isMatch(url,u)) {
urls.add(u);
}
}
}
return urls;
}
複製程式碼
該方法是返回註冊的服務url列表,可以看到有很多種獲得的方法這些快取都儲存在AbstractRegistry類中,相關的介紹可以檢視《dubbo原始碼解析(三)註冊中心——開篇》。
14.subscribe && unsubscribe
@Override
public void subscribe(URL url,NotifyListener listener) {
super.subscribe(url,listener);
subscribed(url,listener);
}
@Override
public void unsubscribe(URL url,NotifyListener listener) {
super.unsubscribe(url,listener);
received.remove(url);
}
複製程式碼
protected void subscribed(URL url,NotifyListener listener) {
// 查詢註冊列表
List<URL> urls = lookup(url);
// 通知url
notify(url,urls);
}
複製程式碼
這兩個重寫了父類的方法,分別是訂閱和取消訂閱。邏輯很簡單。
(二)MulticastRegistryFactory
該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原始碼:
public class MulticastRegistryFactory extends AbstractRegistryFactory {
@Override
public Registry createRegistry(URL url) {
return new MulticastRegistry(url);
}
}
複製程式碼
可以看到就是例項化了MulticastRegistry而已,所有這裡就不解釋了。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了dubbo利用multicast來實現註冊中心,其中關鍵的是需要弄明白MulticastSocket以及單播、廣播、多播的概念,其他的邏輯並不複雜。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見。