DDPush開源推送框架原始碼分析之Client到DDPush(UDP模式)
本篇文章主要講解Client(客戶端)如何連線到DDPush,並向DDPush傳送訊息(主要是心跳包和確認資訊),和如何接收APPServer推送給DDPush的訊息,本篇文章分析官方推薦的UDP工作模式。
UDP模式主要涉及到以下幾個重要的類:
1、UdpConnector監聽埠的UDP資料包
2、Receiver接受終端訊息
3、Sender 向終端傳送訊息
4、Messenger 從Receiver的訊息佇列中取出訊息, 再從記憶體中查詢對應的狀態機以及需要傳送給終端的訊息, 最後加入到Sender的訊息佇列並由Sender發出去
一、UdpConnector
關鍵方法start()
第6行:繫結一個埠埠,監聽此埠的資料包public void start() throws Exception{ if(antenna != null){ throw new Exception("antenna is not null, may have run before"); } antenna = DatagramChannel.open(); antenna.socket().bind(new InetSocketAddress(port)); System.out.println("udp connector port:"+port); //non-blocking antenna.configureBlocking(false); antenna.socket().setReceiveBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_RECEIVE")); antenna.socket().setSendBufferSize(1024*1024*PropertyUtil.getPropertyInt("CLIENT_UDP_BUFFER_SEND")); System.out.println("udp connector recv buffer size:"+antenna.socket().getReceiveBufferSize()); System.out.println("udp connector send buffer size:"+antenna.socket().getSendBufferSize()); this.receiver = new Receiver(antenna); this.receiver.init(); this.sender = new Sender(antenna); this.sender.init(); this.senderThread = new Thread(sender,"AsynUdpConnector-sender"); this.receiverThread = new Thread(receiver,"AsynUdpConnector-receiver"); this.receiverThread.start(); this.senderThread.start(); }
第16行:receiver是一個runnable物件,用來接受Client發來的訊息
第19行:sender是一個runnable物件,用來向Client推送訊息
第21-24行:分別啟動receiver和sender
二、Receiver
第一步:run()方法
這是一個while迴圈,用來不斷的接收訊息,主要看第5行的processMessage()方法public void run(){ while(!this.stoped){ try{ //synchronized(enQueSignal){ processMessage(); // if(mq.isEmpty() == true){ // enQueSignal.wait(); // } //} }catch(Exception e){ e.printStackTrace(); }catch(Throwable t){ t.printStackTrace(); } } }
第二步:processMessage()方法
protected void processMessage() throws Exception{
address = null;
buffer.clear();
try{
address = this.channel.receive(buffer);
}catch(SocketTimeoutException timeout){
}
if(address == null){
try{
Thread.sleep(1);
}catch(Exception e){
}
return;
}
buffer.flip();
byte[] swap = new byte[buffer.limit() - buffer.position()];
System.arraycopy(buffer.array(), buffer.position(), swap, 0, swap.length);
ClientMessage m = new ClientMessage(address,swap);
enqueue(m);
//System.out.println(DateTimeUtil.getCurDateTime()+" r:"+StringUtil.convert(m.getData())+" from:"+m.getSocketAddress().toString());
}
第5行:接收資料填充到buffer,因為DatagramChannel設定成非阻塞了,所以此方法不管有無資料都會立即返回,所以第9行會做一個判斷
第19-20:將buffer緩衝區的資料拷貝到swap資料
第22行:將byte[]陣列和Client的address資訊封裝成ClientMessage物件
第24行:將ClientMessage物件加入到訊息佇列,等待處理(Messenger會從此佇列取訊息)
三、Sender
第一步:run()方法
public void run(){
while(!this.stoped){
try{
synchronized(enQueSignal){
while(mq.isEmpty() == true && stoped == false){
try{
enQueSignal.wait(1);
}catch(InterruptedException e){
}
//System.out.println("sender wake up");
}
processMessage();
}
}catch(Exception e){
e.printStackTrace();
}catch(Throwable t){
t.printStackTrace();
}
}
}
這是一個while迴圈,用來不斷的傳送訊息,主要看第13行的processMessage()方法第二步:processMessage()方法
protected void processMessage() throws Exception{
buffer.clear();
ServerMessage pendingMessage = dequeue();
if(pendingMessage == null){
//Thread.yield();
return;
}
buffer.put(pendingMessage.getData());
buffer.flip();
channel.send(buffer, pendingMessage.getSocketAddress());
//System.out.println(DateTimeUtil.getCurDateTime()+" s:"+StringUtil.convert(pendingMessage.getData())+" to :"+pendingMessage.getSocketAddress().toString());
}
第3行:從訊息傳送佇列取出一條訊息,ServerMessage物件主要包括訊息內容和Client地址資訊兩個屬性
第8行:將訊息內容放到緩衝區
第10行:將訊息內容傳送給Client
那麼問題來了,Receiver中加入佇列的ClientMessage怎麼轉為ServerMessage並加入到Sender的訊息傳送佇列的呢?這就涉及另外一個重要的角色Messenger
四、Messenger
這也是一個實現Runnable介面的物件,所以首先找到run()方法
第一步:run()方法
@Override
public void run() {
this.started = true;
while(stoped == false){
try{
procMessage();
}catch(Exception e){
e.printStackTrace();
}catch(Throwable t){
t.printStackTrace();
}
}
}
這是一個while迴圈,用來不斷的傳送訊息,主要看第7行的procMessage()方法
第二步:procMessage()方法
private void procMessage() throws Exception{
ClientMessage m = this.obtainMessage();
if(m == null){
try{
Thread.sleep(5);
}catch(Exception e){
;
}
return;
}
// 對終端釋出訊息
this.deliverMessage(m);
}
第2行:從佇列取出一條Client發過來的訊息,ClientMessage是不是很眼熟?我們看下this.obtainMessage()這個方法
private ClientMessage obtainMessage() throws Exception{return connector.receive();
}
它又呼叫了connector的receiver()方法,connector就是UdpConnector物件,那麼我們看下UdpConnector的receiver()方法
public ClientMessage receive() throws Exception {
return receiver.receive();
}
什麼?它又呼叫了receiver的receiver()方法,好吧,繞了半天,原來最終是從receiver的訊息佇列中取得訊息的。
第12行:處理Client發來的訊息,那麼我們具體看下deliverMessage(m)這個方法
第三步、deliverMessage(ClientMessage m)方法
private void deliverMessage(ClientMessage m) throws Exception{
//System.out.println(this.hostThread.getName()+" receive:"+StringUtil.convert(m.getData()));
//System.out.println(m.getSocketAddress().getClass().getName());
String uuid = m.getUuidHexString();
//ClientStatMachine csm = NodeStatus.getInstance().getClientStat(uuid);
ClientStatMachine csm = nodeStat.getClientStat(uuid); // 查詢記憶體中的狀態機
if(csm == null){//
csm = ClientStatMachine.newByClientTick(m); // 建立狀態機
if(csm == null){
return;
}
nodeStat.putClientStat(uuid, csm);
}
// 查詢是否有訊息傳送給終端
ArrayList<ServerMessage> smList = csm.onClientMessage(m);
if(smList == null){
return;
}
for(int i = 0; i < smList.size(); i++){
ServerMessage sm = smList.get(i);
if(sm.getSocketAddress() == null)continue;
this.connector.send(sm);
}
}
第4行:是一個標準的UUID,用來唯一標識每一個使用者
第6行:根據UUID去記憶體中查詢有沒有狀態機csm,可以理解為一個線上使用者物件
第7-13行:記憶體中沒有該使用者則建立一個並儲存
第15行:根據Client傳送過來的訊息,判斷需不需要回復、是否有離線的訊息需要發給此使用者,訊息封裝為ServerMessage並加入到集合,因為可能會有多條訊息需要下發
第22行:將ServerMessage推送給客戶端,看到這裡,我們不禁有疑問,Sender不是用來給Client推送訊息的嗎?我們看下this.connector.send(sm);這行程式碼執行了什麼,它是呼叫UdpConnector物件的send(ServerMessage message)方法,讓我們看下這個方法
public boolean send(ServerMessage message) throws Exception {
return sender.send(message);
}
它又呼叫了Sender物件的send(ServerMessage message)方法,繼續跟蹤下去
public boolean send(ServerMessage message){
return enqueue(message);
}
看到這裡,我們應該能明白了,這個方法的作用是將需要推送的ServerMessage物件加入到Sender的訊息佇列,Sender的run()方法會從佇列去取訊息再推送給客戶端。
到此,我們應該明白了UDP模式Client與DDPush的互動流程。好了,本篇文章的講解也就到此結束了,如有問題,歡迎大家指正!