1. 程式人生 > >基於執行緒池和NIO技術構建高效的多協議Android通訊框架

基於執行緒池和NIO技術構建高效的多協議Android通訊框架

基於執行緒池和NIO技術構建高效的多協議Android通訊框架

作者孫東風 2011-1-20轉載請註明出處

引言

在多數涉及網路通訊的手機應用中,由於GPRS網路的速度在目前的情況下還不算理想,所以,如何能夠高效的請求得到網路資料就成為大多數應用所面臨的瓶頸問題。同時,在一些應用程式中可能會使用多種協議,比如IM通訊、視訊流型別的應用會犧牲資料的完整性來更高效的獲取資料,在這種型別的應用中,可能需要同時支援TCPUDP以及HTTP協議。本文就嘗試基於Android的多執行緒技術ThreadPoolExecutor以及NIO非阻塞式程式設計構建這樣一個框架,以高效的獲取網路資料並很好的支援多種協議的併發請求。

基本設計思路

既然是基於ThreadPoolExecutor執行緒池來管理多個NIO執行緒的請求的,那麼首先應該有個全域性的ThreadPoolExecutor變數,使用單例模式來實現:

public static synchronized ThreadPoolExecutor setThreadPoolNum(int aThreadPoolMinNum,

int aThreadPoolMaxNum,long keepAliveTime)

{

if(threadPool == null)

{

threadPool =

new ThreadPoolExecutor(aThreadPoolMinNum,aThreadPoolMaxNum,

keepAliveTime,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),

new ThreadPoolExecutor.DiscardOldestPolicy());

}

return threadPool;

}

每個NIO請求被設計為單獨的執行緒,而每個網路通訊的連線都可以附屬在NIO執行緒上,基本的設計思路如圖1所示:

 

圖1

NIO通訊執行緒的設計

NIO通訊執行緒被設計為管理多個不同型別的網路連線,並負責維護每個連線所對應的網路資料處理介面,這些網路資料處理介面包括資料傳送介面processWrite()

、資料接收介面processRead()以及錯誤處理介面processError()等。

在每個NIO通訊執行緒被執行緒池物件啟動之後,首先檢查NIO埠是否有連線註冊的資料到來,如果有資料到來,則提交給相應的連線進行處理,程式碼如下:

try {

if(selector != null)

n = selector.select(3000);

// 如果要shutdown,關閉selector退出

if (shutdown) {

selector.close();

break;

}

} catch (IOException e) {

dispatchErrorToAll(e);

}

// 如果select返回大於0,處理事件

if(n > 0) {

for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {

// 得到下一個Key

SelectionKey sk = i.next();

i.remove();

// 檢查其是否還有效

if(!sk.isValid())

continue;

// 處理

INIOHandler handler = (INIOHandler)sk.attachment();

try {

if(sk.isConnectable())

{

handler.processConnect(sk);

}

else if (sk.isReadable())

{

handler.processRead(sk);

}

} catch (IOException e) {

handler.processError(e);

} catch (RuntimeException e) {

}

}

n = 0;

}

checkNewConnection();

notifySend();

上面的程式碼首先使用Selector類的selectedKyes()方法獲取到所有有事件發生的連線通道,然後遍歷這些通道,並使用attachment()方法獲取到註冊到通道上的處理物件NIOHandler,之後呼叫每個連線通道上註冊的NIOHandler物件的方法進行網路資料的處理。

網路連線的設計

NIO通訊執行緒的設計中知道,每個網路連線都有相應的連線通道以及連線通道的資料處理器,那麼就可以抽象出來這些連線通道以及資料處理器的介面:

public interface IConnection {

/**

* 新增一個包到傳送佇列

*

* @param out

* OutPacket子類

*/

public void add(OutPacket out);

public void clearSendQueue();

public void start();

public String getId();

public void dispose();

public InetSocketAddress getRemoteAddress();

public SelectableChannel channel();

public INIOHandler getNIOHandler();

public boolean isEmpty();

public void receive() throws IOException;

public void send() throws IOException;

public void send(ByteBuffer buffer);

public boolean isConnected();

}

在通道資料處理器中,每個連線通道都應該有基本的資料傳送、資料接收以及錯誤處理介面,可以抽象出資料處理器的基本介面如下:

public interface INIOHandler {

public void processConnect(SelectionKey sk) throws IOException;

public void processRead(SelectionKey sk) throws IOException;

public void processWrite() throws IOException;

public void processError(Exception e);

}

那麼每個網路連線都應該繼承上述兩個介面,比如TCP連線需要有SocketChannel通道以及相應的NIOHandler資料處理介面,UDP連線需要有DatagramChannel通道以及相應的NIOHandler資料處理介面,下面是TCPConnection的程式碼:

public class TCPConnection extends ConnectionImp{

/** 用於通訊的channel */

private final SocketChannel channel;

/**

* true表示遠端已經關閉了這個連線

*/

private boolean remoteClosed;

/**

* 構造一個連線到指定地址的TCPPort.

*

* @param address 連線到的地址.

* @throws IOException 埠開啟/埠配置/連線到地址出錯.

*/

public TCPConnection(String id, InetSocketAddress address) throws IOException {

super(id);

channel = SocketChannel.open();

channel.configureBlocking(false);

this.remoteAddress = address;

remoteClosed = false;

}

public void start() {

try {

channel.connect(remoteAddress);

} catch(UnknownHostException e) {

processError(new Exception("Unknown Host"));

} catch(UnresolvedAddressException e) {

processError(new Exception("Unable to resolve server address"));

} catch (IOException e) {

processError(e);

}

}

public SelectableChannel channel() {

return channel;

}

public void receive() throws IOException {

if(remoteClosed)

return;

//接收資料

int oldPos = receiveBuf.position();

for (int r = channel.read(receiveBuf); r > 0; r = channel.read(receiveBuf))

;

byte[] tempBuffer = new byte[1024];

receiveBuf.get(tempBuffer, 0, receiveBuf.position());

Log.e("receive", " = "+new String(tempBuffer,"UTF-8"));

// 得到當前位置

int pos = receiveBuf.position();

receiveBuf.flip();

// 檢查是否讀了0位元組,這種情況一般表示遠端已經關閉了這個連線

if(oldPos == pos) {

remoteClosed = true;

return;

}

InPacket packet = new InPacket(receiveBuf);

inQueue.add(packet);

adjustBuffer(pos);

}

private void adjustBuffer(int pos) {

// 如果0不等於當前pos,說明至少分析了一個包

if(receiveBuf.position() > 0) {

receiveBuf.compact();

receiveBuf.limit(receiveBuf.capacity());

} else {

receiveBuf.limit(receiveBuf.capacity());

receiveBuf.position(pos);

}

}

public void send() throws IOException {

while (!isEmpty()) {

sendBuf.clear();

OutPacket packet = remove();

channel.write(ByteBuffer.wrap(packet.getBody()));

// 新增到重發佇列

packet.setTimeout(System.currentTimeMillis() + EnginConst.QQ_TIMEOUT_SEND);

Log.e("debug","have sended packet - " + packet.toString());

}

}

public void send(OutPacket packet) {

try {

sendBuf.clear();

channel.write(ByteBuffer.wrap(packet.getBody()));

Log.d("debug","have sended packet - " + packet.toString());

} catch (Exception e) {

}

}

public void send(ByteBuffer buffer) {

try {

channel.write(buffer);

} catch (IOException e) {

}

}

public void dispose() {

try {

channel.close();

} catch(IOException e) {

}

}

public boolean isConnected() {

return channel != null && channel.isConnected();

}

public void processConnect(SelectionKey sk) throws IOException {

//完成SocketChannel的連線

channel.finishConnect();

while(!channel.isConnected()) {

try {

Thread.sleep(300);

} catch (InterruptedException e) {

}

channel.finishConnect();

}

sk.interestOps(SelectionKey.OP_READ);

Log.e("debug","hava connected to server");

}

public void processRead(SelectionKey sk) throws IOException {

receive();

}

public void processWrite() throws IOException {

if(isConnected())

send();

}

}

測試程式碼

public void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.main);

AsyncRequest request = new AsyncRequest(new InetSocketAddress("10.0.2.2",8042));

try {

request.setConNumPerNIOThread(5);

AsyncRequest.setThreadPoolNum(2, 5, 2);

request.setUDP(false);

request.setBody("hello world".getBytes());

request.startAsyn(0);

if(!request.isCurrRequestFull())

{

request.setBody("My name is dongfengsun".getBytes());

request.startAsyn(1);

}

} catch (Exception e) {

Log.e("onCreate", " = "+e.getMessage());

e.printStackTrace();

}

}

測試程式碼中設定每個NIO通訊執行緒的連線上限為5,如果每個NIO通訊執行緒還有空餘的連線數,那麼可以繼續在當前的NIO通訊執行緒建立網路連線,如果滿了,則可以重新初始化一個NIO通訊執行緒並加入到執行緒池中,上面的程式碼在伺服器端控制檯列印的資料如圖2所示:

圖2

因為CSDN上無法上傳原始碼,所以需要原始碼的可以在回覆中留下信箱,有時間我會統一發送^_^

原始碼不再逐個傳送,已上傳到口袋網 http://www.pocketcn.com上,請到網站Android板塊進行下載

相關推薦

基於執行NIO技術構建高效協議Android通訊框架

基於執行緒池和NIO技術構建高效的多協議Android通訊框架 作者孫東風 2011-1-20轉載請註明出處 引言 在多數涉及網路通訊的手機應用中,由於GPRS網路的速度在目前的情況下還不算理想,所以,如何能夠高效的請求得到網路資料就成為大多數應用所面臨的瓶頸問題。同時,

JAVA執行(三) 執行鎖的深度化

 github演示程式碼地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo/src/main/java/com/kawa/thread 1.執行緒池  1.1 執行緒池是什麼 Java中的執行緒

Java併發程式設計:4種執行緩衝佇列BlockingQueue

一. 執行緒池簡介 1. 執行緒池的概念:           執行緒池就是首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一

初探佇列訊息:普通http同步請求、基於執行的非同步請求、基於訊息佇列的請求三者的比較

最近忙完了手頭的專案,終於有時間研究之前一直落下的訊息隊列了,順帶手又看了一下多執行緒非同步請求,加上最傳統的http同步請求,正好可以拉出來做個比較,廢話不多說,走起! 場景設計:三個使用者同時向系統傳送一個請求,要求系統進行處理; 通過這個場景設計,我們來看看不同請求方式的表現:

java執行資料庫連線[從學習到工作(二)]

背景:前段時間工作需要開發一個socket服務端,在接受到客戶端發過來的報文資訊後解析報文呼叫資料庫程式完成資料檔案的生成再拼湊結果報文反饋給客戶端。由於客戶數比較多,所以用執行緒池和資料庫連線池。        一.執行緒池   

執行程序

動態建立子程序(函式執行緒)實現併發伺服器的缺點 在前面的文章中我們是通過動態建立子程序(函式執行緒)來實現併發伺服器的,這樣做的缺點如下: 動態建立程序(或執行緒)是比較耗費時間的,這樣導致較慢的客戶響應。 動態建立的子程序(子執行緒)通常只用來為一個客戶服務,這將導致系統上產

【轉】執行Executor框架

  一 使用執行緒池的好處 二 Executor 框架 2.1 簡介 2.2 Executor框架結構(主要由三部分構成)  2.3 Executor框架使用說明示意圖 三 ThreadPoolExecutor詳解 3.1 Thread

執行lambda表示式

執行緒池1.什麼是執行緒池.一個用來建立和管理執行緒的容器;2.執行緒池的作用.提高執行緒的複用性,降低資源消耗提高執行緒的響應速度,提高執行緒的可管理性3.執行緒的核心思想;執行緒的複用 4.執行緒池的建立ExecutorService pools = Executors.newFixedThreadPoo

18.Java語言執行Lambda表示式

執行緒等待喚醒機制 1.執行緒間的通訊:        一個程式完成某個任務,需要多個執行緒協調,就需要執行緒之間存在“通訊”,比如生產者和消費者,只有生產了才能被消費。當生產者生產完成才能告知消費者可以消費,那麼告知的過程就是執行緒間的通訊。 2.等待與喚醒機制:

netty原始碼解解析(4.0)-6 執行模型-IO執行EventLoopGroupNIO實現(一)

介面定義 io.netty.channel.EventLoopGroup extends EventExecutorGroup 方法 說明

OKHttp 3.10原始碼解析(一):執行任務佇列

OKhttp是Android端最火熱的網路請求框架之一,它以高效的優點贏得了廣大開發者的喜愛,下面是OKhttp的主要特點: 1.支援HTTPS/HTTP2/WebSocket 2.內部維護執行緒池佇列,提高併發訪問的效率 3.內部維護連線池,支援多路複用,減少連線建立開銷 4.

基於執行執行售票demo

廢話不多說,直接就開擼 import org.springframework.util.StopWatch; import java.util.concurrent.*; /** * 基於執行緒池實現的多執行緒賣票demo * joey li * 2018-4-12 *

netty原始碼解解析(4.0)-7 執行模型-IO執行EventLoopGroupNIO實現(二)

把NIO事件轉換成對channel unsafe的呼叫或NioTask的呼叫 processSelectedKeys()方法是處理NIO事件的入口: private void processSelectedKeys() { if (selectedKeys != null) {

[python] ThreadPoolExecutor執行ProcessPoolExecutor程序

引言 Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼,但是當專案達到一定的規模,頻繁建立/銷燬程序或者執行緒是非常消耗資源的,這個時候我們就要編寫自己的執行緒池/程序池,以空間換時間。但從Py

執行執行相關類

執行緒池概述 系統啟用一個新執行緒的成本是比較高的,因為它涉及與作業系統互動。在這種情形下,使用執行緒池可以很好的提高效能。執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個Runnable物件或Callable物件傳給執行緒池,執行緒池會啟動一個執行緒來執行它們的run()或call方

初識Java中的四大執行ThreadPoolExecutor的使用(歡迎指正)

初識Java中的四大執行緒池和ThreadPoolExecutor的使用(轉載+自身心得) 為什麼用執行緒池? 1.建立/銷燬執行緒伴隨著系統開銷,過於頻繁的建立/銷燬執行緒,會很大程度上影響處-理效率; 2.執行緒併發數量過多,搶佔系統資源從而導致阻塞; 3.對執行緒進行一些簡單的

使用執行直接new 一個Thread執行對比

大家new Thread的方式會建立一個執行緒,在我們有大量的建立執行緒的時候這樣的方法還會可靠嗎?每一次new Thread都會重新建立一個執行緒,而執行緒的建立和銷燬都需要耗時的。在jdk1.5的concurrent包中有一個Executors,他能使我們建立的執行緒得

盤點java併發包提供的執行佇列

執行緒池 newCachedThreadPool() newFixedThreadPool(int nThreads) newSingleThreadPoolExecutor() newScheduledThreadPool(int corePoolSize

jdk單例執行sping執行使用

 java提供的原生執行緒池技術處理原理很清晰,故只要使用自己的原生執行緒池技術一般都能滿足專案的需求。java提供了很好的執行緒池實現,比我們自己的實現要更加健壯以及高效,同時功能也更加強大,不建議自己編寫。另外有同學可能用過spring的執行緒池,那麼spring執行緒

常見執行啟動定時器執行-筆記整理7

常見執行緒池和啟動定時器執行緒池 1.執行緒池的概念:執行緒池是一種多執行緒處理形式,處理過程中將任務新增到佇列,如果執行緒池中有空閒的執行緒,則由該執行緒去完成這些任務。 2.Excecutors類的應用   (1)建立固定大小的執行緒池:Executors.new