android socket連結 NIO非阻塞方式
阿新 • • 發佈:2018-12-30
最近在研究android的推送,一開始準備自己搭建推送伺服器,並在android機上建立一個socket長連線,由於之前一直是用c++,在網上搜索一些資料後,臨時寫了一個基於NIO模式的客戶端socket連結類,測試後能使用,當然還有很多問題,沒有去修改了,因為最後發現,現在國內已經有現成的第三方推送平臺,基於專案的開發時間有限,準備使用第三方推送平臺,廢話不多說,直接貼程式碼,說明都在程式碼中.
package cn.kidstone.cartoon.net; import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; /** * 通過NIO方式進行socket連結 * @author He Zhongqiu * */ public class TCPClient { /** 通道選擇器 */ private Selector mSelector; /** 伺服器通訊的通道 */ private SocketChannel mChannel; /** 遠端伺服器ip地址 */ private String mRemoteIp; /** 遠端伺服器埠 */ private int mPort; /** 是否載入過的標識 */ private boolean mIsInit = false; /** 單鍵例項 */ private static TCPClient gTcp; private TCPClientEventListener mEventListener; /** 預設連結超時時間 */ public static final int TIME_OUT = 10000; /** 讀取buff的大小 */ public static final int READ_BUFF_SIZE = 1024; /** 訊息流的格式 */ public static final String BUFF_FORMAT = "utf-8"; public static synchronized TCPClient instance() { if ( gTcp == null ) { gTcp = new TCPClient(); } return gTcp; } private TCPClient() { } /** * 連結遠端地址 * @param remoteIp * @param port * @param TCPClientEventListener * @return */ public void connect( String remoteIp, int port, TCPClientEventListener tcel ) { mRemoteIp = remoteIp; mPort = port; mEventListener = tcel; connect(); } /** * 連結遠端地址 * @param remoteIp * @param port * @return */ public void connect( String remoteIp, int port ) { connect(remoteIp,port,null); } private void connect() { //需要在子執行緒下進行連結 MyConnectRunnable connect = new MyConnectRunnable(); new Thread(connect).start(); } /** * 傳送字元 * @param msg * @return */ public boolean sendMsg(String msg) { boolean bRes = false; try { bRes = sendMsg(msg.getBytes(BUFF_FORMAT)); } catch ( Exception e ) { e.printStackTrace(); } return bRes; } /** * 傳送資料,此函式需要在獨立的子執行緒中完成,可以考慮做一個傳送佇列 * 自己開一個子執行緒對該佇列進行處理,就好像connect一樣 * @param bt * @return */ public boolean sendMsg( byte[] bt ) { boolean bRes = false; if ( !mIsInit ) { return bRes; } try { ByteBuffer buf = ByteBuffer.wrap(bt); int nCount = mChannel.write(buf); if ( nCount > 0 ) { bRes = true; } } catch ( Exception e ) { e.printStackTrace(); } return bRes; } public Selector getSelector() { return mSelector; } /** * 是否連結著 * @return */ public boolean isConnect() { if ( !mIsInit ) { return false; } return mChannel.isConnected(); } /** * 關閉連結 */ public void close() { mIsInit = false; mRemoteIp = null; mPort = 0; try { if ( mSelector != null ) { mSelector.close(); } if ( mChannel != null ) { mChannel.close(); } } catch ( Exception e ) { e.printStackTrace(); } } /** * 重連 * @return */ public void reConnect() { close(); connect(); } /** * 傳送一個測試資料到伺服器,檢測伺服器是否關閉 * @return */ public boolean canConnectServer() { boolean bRes = false; if ( !isConnect() ) { return bRes; } try { mChannel.socket().sendUrgentData(0xff); } catch ( Exception e ) { e.printStackTrace(); } return bRes; } /** * 每次讀完資料後,需要重新註冊selector讀取資料 * @return */ private synchronized boolean repareRead() { boolean bRes = false; try { //開啟並註冊選擇器到通道 mSelector = Selector.open(); if ( mSelector != null ) { mChannel.register(mSelector, SelectionKey.OP_READ); bRes = true; } } catch ( Exception e ) { e.printStackTrace(); } return bRes; } public void revMsg() { if ( mSelector == null ) { return; } boolean bres = true; while ( mIsInit ) { if ( !isConnect() ) { bres = false; } if ( !bres ) { try { Thread.sleep(100); } catch ( Exception e ) { e.printStackTrace(); } continue; } try { //有資料就一直接收 while (mIsInit && mSelector.select() > 0) { for ( SelectionKey sk : mSelector.selectedKeys() ) { //如果有可讀資料 if ( sk.isReadable() ) { //使用NIO讀取channel中的資料 SocketChannel sc = (SocketChannel)sk.channel(); //讀取快取 ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFF_SIZE); //實際的讀取流 ByteArrayOutputStream read = new ByteArrayOutputStream(); int nRead = 0; int nLen = 0; //單個讀取流 byte[] bytes; //讀完為止 while ( (nRead = sc.read(readBuffer) ) > 0 ) { //整理 readBuffer.flip(); bytes = new byte[nRead]; nLen += nRead; //將讀取的資料拷貝到位元組流中 readBuffer.get(bytes); //將位元組流新增到實際讀取流中 read.write(bytes); ///////////////////////////////////// //@ 需要增加一個解析器,對資料流進行解析 ///////////////////////////////////// readBuffer.clear(); } if ( nLen > 0 ) { if ( mEventListener != null ) { mEventListener.recvMsg(read); } else { String info = new String(read.toString(BUFF_FORMAT)); System.out.println("rev:"+info); } } //為下一次讀取做準備 sk.interestOps(SelectionKey.OP_READ); } //刪除此SelectionKey mSelector.selectedKeys().remove(sk); } } } catch ( Exception e ) { e.printStackTrace(); } } } public interface TCPClientEventListener { /** * 多執行緒下接收到資料 * @param read * @return */ void recvMsg(ByteArrayOutputStream read); } /** * 連結執行緒 * @author HeZhongqiu * */ private class MyConnectRunnable implements Runnable { @Override public void run() { // TODO Auto-generated method stub try { //開啟監聽通道,並設定為非阻塞模式 SocketAddress ad = new InetSocketAddress(mRemoteIp, mPort); mChannel = SocketChannel.open( ad ); if ( mChannel != null ) { mChannel.socket().setTcpNoDelay(false); mChannel.socket().setKeepAlive(true); //設定超時時間 mChannel.socket().setSoTimeout(TIME_OUT); mChannel.configureBlocking(false); mIsInit = repareRead(); //建立讀執行緒 RevMsgRunnable rev = new RevMsgRunnable(); new Thread(rev).start(); } } catch ( Exception e ) { e.printStackTrace(); } finally { if ( !mIsInit ) { close(); } } } } private class RevMsgRunnable implements Runnable { @Override public void run() { // TODO Auto-generated method stub revMsg(); } } }
其中,還需要完善的是:
1.傳送訊息,需要自己再建立一個訊息佇列,在另外的一個執行緒處理,如果在主執行緒下,會丟擲異常,
2.自定義訊息解析器,解析器的基類我就沒定義了,可根據實際需求進行新增,根據自己的訊息結構體,對訊息資料流進行解析
3.連結斷開後,收訊息的輪詢處理需要退出