1. 程式人生 > >DDPush開源推送框架原始碼分析之Client到DDPush(UDP模式)

DDPush開源推送框架原始碼分析之Client到DDPush(UDP模式)

在前一篇文章中我們主要分析了AppServer是如何連線到DDPush,並向DDPush推送訊息,還沒有看過的朋友請移步DDPush開源推送框架原始碼分析之APPServer到DDPush

本篇文章主要講解Client(客戶端)如何連線到DDPush,並向DDPush傳送訊息(主要是心跳包和確認資訊),和如何接收APPServer推送給DDPush的訊息,本篇文章分析官方推薦的UDP工作模式。

UDP模式主要涉及到以下幾個重要的類:

1、UdpConnector監聽埠的UDP資料包

2、Receiver接受終端訊息

3、Sender 向終端傳送訊息

4、Messenger 從Receiver的訊息佇列中取出訊息, 再從記憶體中查詢對應的狀態機以及需要傳送給終端的訊息, 最後加入到Sender的訊息佇列並由Sender發出去


一、UdpConnector

關鍵方法start()

public void start() throws Exception{
		if(antenna != null){
			throw new Exception("antenna is not null, may have run before");
		}
		antenna = DatagramChannel.open();
		antenna.socket().bind(new InetSocketAddress(port));
		System.out.println("udp connector port:"+port);
		//non-blocking
		antenna.configureBlocking(false);
		antenna.socket().setReceiveBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_RECEIVE"));
		antenna.socket().setSendBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_SEND"));
		System.out.println("udp connector recv buffer size:"+antenna.socket().getReceiveBufferSize());
		System.out.println("udp connector send buffer size:"+antenna.socket().getSendBufferSize());
		
		
		this.receiver = new Receiver(antenna);
		this.receiver.init();
		this.sender = new Sender(antenna);
		this.sender.init();
		
		this.senderThread = new Thread(sender,"AsynUdpConnector-sender");
		this.receiverThread = new Thread(receiver,"AsynUdpConnector-receiver");
		this.receiverThread.start();
		this.senderThread.start();
	}
第6行:繫結一個埠埠,監聽此埠的資料包

第16行:receiver是一個runnable物件,用來接受Client發來的訊息

第19行:sender是一個runnable物件,用來向Client推送訊息

第21-24行:分別啟動receiver和sender

二、Receiver

第一步:run()方法

public void run(){
		while(!this.stoped){
			try{
				//synchronized(enQueSignal){
					processMessage();
				//	if(mq.isEmpty() == true){
				//		enQueSignal.wait();
				//	}
				//}
			}catch(Exception e){
				e.printStackTrace();
			}catch(Throwable t){
				t.printStackTrace();
			}
		}
	}
這是一個while迴圈,用來不斷的接收訊息,主要看第5行的processMessage()方法

第二步:processMessage()方法
protected void processMessage() throws Exception{
		address = null;
		buffer.clear();
		try{
			address = this.channel.receive(buffer);
		}catch(SocketTimeoutException timeout){
			
		}
		if(address == null){
			try{
				Thread.sleep(1);
			}catch(Exception e){
				
			}
			return;
		}
		
		buffer.flip();
		byte[] swap = new byte[buffer.limit() - buffer.position()];
		System.arraycopy(buffer.array(), buffer.position(), swap, 0, swap.length);

		ClientMessage m = new ClientMessage(address,swap);
		
		enqueue(m);
		//System.out.println(DateTimeUtil.getCurDateTime()+" r:"+StringUtil.convert(m.getData())+" from:"+m.getSocketAddress().toString());

	}
第5行:接收資料填充到buffer,因為DatagramChannel設定成非阻塞了,所以此方法不管有無資料都會立即返回,所以第9行會做一個判斷

第19-20:將buffer緩衝區的資料拷貝到swap資料

第22行:將byte[]陣列和Client的address資訊封裝成ClientMessage物件

第24行:將ClientMessage物件加入到訊息佇列,等待處理(Messenger會從此佇列取訊息)


三、Sender

第一步:run()方法

public void run(){
		while(!this.stoped){
			try{
				synchronized(enQueSignal){
					while(mq.isEmpty() == true && stoped == false){
						try{
							enQueSignal.wait(1);
						}catch(InterruptedException e){
							
						}
						//System.out.println("sender wake up");
					}
					processMessage();
					
				}
			}catch(Exception e){
				e.printStackTrace();
			}catch(Throwable t){
				t.printStackTrace();
			}
		}
	}
這是一個while迴圈,用來不斷的傳送訊息,主要看第13行的processMessage()方法

第二步:processMessage()方法

protected void processMessage() throws Exception{
		buffer.clear();
		ServerMessage pendingMessage = dequeue();
		if(pendingMessage == null){
			//Thread.yield();
			return;
		}
		buffer.put(pendingMessage.getData());
		buffer.flip();
		channel.send(buffer, pendingMessage.getSocketAddress());
		//System.out.println(DateTimeUtil.getCurDateTime()+" s:"+StringUtil.convert(pendingMessage.getData())+" to  :"+pendingMessage.getSocketAddress().toString());
	}
第3行:從訊息傳送佇列取出一條訊息,ServerMessage物件主要包括訊息內容和Client地址資訊兩個屬性

第8行:將訊息內容放到緩衝區

第10行:將訊息內容傳送給Client
那麼問題來了,Receiver中加入佇列的ClientMessage怎麼轉為ServerMessage並加入到Sender的訊息傳送佇列的呢?這就涉及另外一個重要的角色Messenger

四、Messenger

這也是一個實現Runnable介面的物件,所以首先找到run()方法

第一步:run()方法

@Override
	public void run() {
		this.started = true;
		
		while(stoped == false){
			try{
				procMessage();
			}catch(Exception e){
				e.printStackTrace();
			}catch(Throwable t){
				t.printStackTrace();
			}
		}

	}
這是一個while迴圈,用來不斷的傳送訊息,主要看第7行的procMessage()方法

第二步:procMessage()方法

private void procMessage() throws Exception{
		ClientMessage m = this.obtainMessage();
		if(m == null){
			try{
				Thread.sleep(5);
			}catch(Exception e){
				;
			}
			return;
		}
		// 對終端釋出訊息
		this.deliverMessage(m);
		
	}
第2行:從佇列取出一條Client發過來的訊息,ClientMessage是不是很眼熟?我們看下this.obtainMessage()這個方法 private ClientMessage obtainMessage() throws Exception{
  return connector.receive();
 }
它又呼叫了connector的receiver()方法,connector就是UdpConnector物件,那麼我們看下UdpConnector的receiver()方法
public ClientMessage receive() throws Exception {
  return receiver.receive();
 }

什麼?它又呼叫了receiver的receiver()方法,好吧,繞了半天,原來最終是從receiver的訊息佇列中取得訊息的。

第12行:處理Client發來的訊息,那麼我們具體看下deliverMessage(m)這個方法

第三步、deliverMessage(ClientMessage m)方法

private void deliverMessage(ClientMessage m) throws Exception{
		//System.out.println(this.hostThread.getName()+" receive:"+StringUtil.convert(m.getData()));
		//System.out.println(m.getSocketAddress().getClass().getName());
		String uuid = m.getUuidHexString();
		//ClientStatMachine csm = NodeStatus.getInstance().getClientStat(uuid);
		ClientStatMachine csm = nodeStat.getClientStat(uuid); // 查詢記憶體中的狀態機
		if(csm == null){//
			csm = ClientStatMachine.newByClientTick(m); // 建立狀態機
			if(csm == null){
				return;
			}
			nodeStat.putClientStat(uuid, csm);
		}
		// 查詢是否有訊息傳送給終端
		ArrayList<ServerMessage> smList = csm.onClientMessage(m);
		if(smList == null){
			return;
		}
		for(int i = 0; i < smList.size(); i++){
			ServerMessage sm = smList.get(i);
			if(sm.getSocketAddress() == null)continue;
			this.connector.send(sm);
		}
		
	}
第4行:是一個標準的UUID,用來唯一標識每一個使用者

第6行:根據UUID去記憶體中查詢有沒有狀態機csm,可以理解為一個線上使用者物件

第7-13行:記憶體中沒有該使用者則建立一個並儲存

第15行:根據Client傳送過來的訊息,判斷需不需要回復、是否有離線的訊息需要發給此使用者,訊息封裝為ServerMessage並加入到集合,因為可能會有多條訊息需要下發

第22行:將ServerMessage推送給客戶端,看到這裡,我們不禁有疑問,Sender不是用來給Client推送訊息的嗎?我們看下this.connector.send(sm);這行程式碼執行了什麼,它是呼叫UdpConnector物件的send(ServerMessage message)方法,讓我們看下這個方法

public boolean send(ServerMessage message) throws Exception {
		return sender.send(message);
		
	}
它又呼叫了Sender物件的send(ServerMessage message)方法,繼續跟蹤下去
public boolean send(ServerMessage message){
		return enqueue(message);
	}
看到這裡,我們應該能明白了,這個方法的作用是將需要推送的ServerMessage物件加入到Sender的訊息佇列,Sender的run()方法會從佇列去取訊息再推送給客戶端。

到此,我們應該明白了UDP模式Client與DDPush的互動流程。好了,本篇文章的講解也就到此結束了,如有問題,歡迎大家指正!