1. 程式人生 > >mina保持android端\服務端的長連線

mina保持android端\服務端的長連線

更多幹貨

一.mina簡介

Apache Mina是一個能夠幫助使用者開發高效能和高伸縮性網路應用程式的框架。與Netty出自同一人之手,都是一個介於應用程式與網路之間的NIO框架,通過Java nio技術基於TCP/IP和UDP/IP協議提供了抽象的、事件驅動的、非同步的API,使程式設計師從繁瑣的網路操作中解脫出來,花更多的時間在業務處理上。 mina分為三層,如下圖: 1、IOService層:處理IO操作 2、IOFilter層:過濾器鏈,日誌處理、位元組變換、物件轉換等操作 3、IOHandler層:真正的處理業務邏輯的地方這裡寫圖片描述

mina核心類

IoService

IoService用來管理各種IO服務,在mina中,這些服務可以包括session、filter、handler等這裡寫圖片描述

上面的圖簡單介紹了IoService的職責,以及其具體實現類AbstractIoService中的職責。在比較大的框架中,都是採用了大量的抽象類之間繼承,採用層級實現細節這樣的方式來組織程式碼。所以在mina中看到Abstract開頭的類,並不僅僅只是一個抽象,其實裡面也包含很多的實現了。

服務端IoAcceptor及相關類

IOAcceptor相當於是對ServerSocketChannel的封裝,最重要的兩個操作是繫結與接受連線。這裡寫圖片描述 Acceptor執行緒專門負責接受連線,在其上有一個selector,輪詢是否有連線建立上來,當有連線建立上來,呼叫ServerSocketChannel.accept方法來接受連線,這個方法返回一個session物件,然後將這個session物件加入processor中,由processor遍歷每個session來完成真正的IO操作。processor上也有一個selector與一個Processor執行緒,selector用於輪詢session,Processor執行緒處理每個session的IO操作。

客戶端IOConnector及相關類

這裡寫圖片描述 IOConnector的設計與IOAcceptor幾乎完全一樣,唯一不同的是與Acceptor執行緒對應的是Connector執行緒,在完成連線操作後也是扔了一個session物件到Processor中。

過濾器(Filter)

下面是官網提供的過濾器這裡寫圖片描述 可以通過繼承IoFilterAdapter來實現自己的過濾器,但一般不需要這麼做,以下是一些常用的過濾器:

  • LoggingFilter 記錄mina所有日誌
  • ProtocolCodecFilter 協議編碼解碼過濾器
  • CompressionFilter 資料壓縮過濾器
  • SSLFilter 資料加密過濾器

IoSession

Mina每建立一個連線同時會建立一個session物件,用於儲存這次讀寫需要用到的所有資訊。從抽象類AbstractIoSession中可以看出session具有如下功能: 1、從attributes成員可以看出session可以存放使用者關心的鍵值對 2、注意到WriteRequestQueue,這是一個寫請求佇列,processor中呼叫flush或者flushNow方法時會將使用者寫入的資料包裝成一個writeRequest物件,並加入這個佇列中。 3、提供了大量的統計功能,比如接收到了多少訊息、最後讀取時間等 在程式碼中設定session:

// 建立伺服器監聽  
IoAcceptor acceptor = new NioSocketAcceptor();  
// 設定buffer的長度  
acceptor.getSessionConfig().setReadBufferSize(2048);  
// 設定連線超時時間  
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  

連線到來建立一個session,初始化好之後加入到processor負責的一個佇列中。processor執行緒會把佇列中的session對應的通道都註冊到它自己的selector上,然後這個selector輪詢這些通道是否準備就緒,一旦準備就緒就呼叫對應方法進行處理(read or flushNow)。 Mina中的session具有狀態,且狀態之間是可以相互轉化的這裡寫圖片描述 IoFilter與IoHandler就是在這些狀態上面加以干預,下面重點看一下IDLE狀態,它分三種: Idle for read:在規定時間內沒有資料可讀 Idle for write:在規定時間內沒有資料可寫 Idle for both:在規定時間內沒有資料可讀和可寫 這三種狀態分別對應IdleStatus類的三個常量:READER_IDLE、WRITER_IDLE、BOTH_IDLE 前面session的用法中有如下設定:

acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 

二.保持長連線

服務端

引入相關jar包 (1)mina-core-2.0.16.jar (2)slf4j-api-1.7.21.jar及相關jar包

  • MainService.java
public class MinaService {
    public static void main(String[] args) {
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
        acceptor.setHandler(new DemoServiceHandler());
        acceptor.getSessionConfig().setMaxReadBufferSize(2048);
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); //10秒沒有讀寫就進入空閒狀態
        try {
            acceptor.bind(new InetSocketAddress(9123));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

   private static class DemoServiceHandler extends IoHandlerAdapter{

        @Override
        public void sessionCreated(IoSession session) throws Exception {
            super.sessionCreated(session);
        }

        @Override
        public void sessionOpened(IoSession session) throws Exception {
            super.sessionOpened(session);
        }

        @Override
        public void sessionClosed(IoSession session) throws Exception {
            super.sessionClosed(session);
        }

        @Override
        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
            super.exceptionCaught(session, cause);
        }

        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            super.messageReceived(session, message);
            String msg = message.toString();
            session.write(new Date());
            System.out.println("接收到的資料:"+msg);

        }

        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            super.messageSent(session, message);
        }
    }
}

客戶端

相關jar包 (1)mina-core-2.0.16.jar (2)slf4j-android-1.6.1-RC1.jar

  • ConnectionManager.java
public class ConnectionManager {
    private static final String BROADCAST_ACTION="com.commonlibrary.mina";
    private static final String MESSAGE="message";
    private ConnectionConfig mConfig;
    private WeakReference<Context> mContext;
    private NioSocketConnector mConnection;
    private IoSession mSession;
    private InetSocketAddress mAddress;

    public ConnectionManager(ConnectionConfig config) {
        this.mConfig = config;
        this.mContext = new WeakReference<Context>(config.getContext());
        init();
    }

    private void init() {
        mAddress = new InetSocketAddress(mConfig.getIp(), mConfig.getPort());
        mConnection = new NioSocketConnector();
        mConnection.getSessionConfig().setReadBufferSize(mConfig.getReadBufferSize());
        mConnection.getFilterChain().addLast("logger", new LoggingFilter());
        mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
        mConnection.setHandler(new DefaultHandler(mContext.get()));
        mConnection.setDefaultRemoteAddress(mAddress);
    }

    public boolean connect() {
        try {
            ConnectFuture future = mConnection.connect();
            future.awaitUninterruptibly();
            mSession = future.getSession();
            SessionManager.getInstance().setSeesion(mSession);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return  mSession != null ? true:false;

    }
    public void disConnection()
    {
        mConnection.dispose();
        mConnection = null;
        mSession = null;
        mAddress = null;
        mContext = null;
    }

    private static class DefaultHandler extends IoHandlerAdapter {
        private final Context mContext;


        public DefaultHandler(Context context) {
            this.mContext = context;
        }

        @Override
        public void sessionCreated(IoSession session) throws Exception {
            super.sessionCreated(session);
        }

        @Override
        public void sessionOpened(IoSession session) throws Exception {
            super.sessionOpened(session);
            //將我們的session儲存到我們的session manager類中, 從而可以傳送訊息到伺服器
        }

        @Override
        public void sessionClosed(IoSession session) throws Exception {
            super.sessionClosed(session);
        }

        @Override
        public void messageReceived(IoSession session, Object message) throws Exception {
            super.messageReceived(session, message);
            if (mContext != null)
            {
                Intent intent = new Intent(BROADCAST_ACTION);
                intent.putExtra(MESSAGE, message.toString());
                LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
            }
        }

        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
            super.messageSent(session, message);

        }
    }
}
  • ConnectionConfig.java
public class ConnectionConfig {
    private Context context;
    private String ip;
    private int port;
    private int readBufferSize;
    private long connectionTimeout;

    public Context getContext() {
        return context;
    }

    public void setContext(Context context) {
        this.context = context;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public int getReadBufferSize() {
        return readBufferSize;
    }

    public void setReadBufferSize(int readBufferSize) {
        this.readBufferSize = readBufferSize;
    }

    public long getConnectionTimeout() {
        return connectionTimeout;
    }

    public void setConnectionTimeout(long connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    //構建者模式
    public static class Builder{
        private Context context;
        private String ip="10.90.24.139";
        private int port=9123;
        private int readBufferSize=10240;
        private long connectionTimeout=10000;

        public Builder(Context context) {
            this.context = context;
        }

        public Builder setIp(String ip) {
            this.ip = ip;
            return this;
        }

        public Builder setPort(int port) {
            this.port = port;
            return this;
        }

        public Builder setReadBufferSize(int readBufferSize) {
            this.readBufferSize = readBufferSize;
            return this;
        }

        public Builder setConnectionTimeout(long connectionTimeout) {
            this.connectionTimeout = connectionTimeout;
            return this;
        }

        private void applyConfig(ConnectionConfig config)
        {
            config.context = this.context;
            config.ip = this.ip;
            config.port = this.port;
            config.readBufferSize = readBufferSize;
            config.connectionTimeout = this.connectionTimeout;
        }
        public ConnectionConfig builder()
        {
            ConnectionConfig config = new ConnectionConfig();
            applyConfig(config);
            return config;
        }
    }

}
  • SessionManager.java
public class SessionManager {
    private static SessionManager mInstance = null;
    //最終與伺服器進行通訊的物件
    private IoSession mSession;
    public static SessionManager getInstance() {
        if (mInstance == null)
        {
            synchronized (SessionManager.class) {
                if (mInstance == null) {
                    mInstance = new SessionManager();
                }
            }
        }
        return mInstance;
    }

    public void setSeesion(IoSession session){
        this.mSession = session;
    }
    public SessionManager() {
    }

    public SessionManager(IoSession mSession) {
        this.mSession = mSession;
    }

    /**
     * 將物件寫到服務端
     * @param msg
     */
    public void writeToServer(Object msg)
    {
        if (mSession != null) {
            mSession.write(msg);
        }
    }

    public void closeSession()
    {
        if (mSession != null)
            mSession.closeOnFlush();
    }
    public void removeSession()
    {
        this.mSession = null;
    }
}
  • MinaActivity.java
public class MinaActivity extends Activity implements View.OnClickListener{
    private MessageBroadcastReceiver receiver = new MessageBroadcastReceiver();
    private Button btn1, btn2;
    private TextView message;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.mina_test);
        message = (TextView) findViewById(R.id.message);
        btn1 = (Button) findViewById(R.id.btn1);
        btn2 = (Button) findViewById(R.id.btn2);
        btn1.setOnClickListener(this);
        btn2.setOnClickListener(this);
        registerBroadcast();
    }

    private void registerBroadcast() {
        IntentFilter filter = new IntentFilter("com.commonlibrary.mina");
        LocalBroadcastManager.getInstance(this).registerReceiver(receiver, filter);
    }

    private void unregisterBroadcast()
    {
        LocalBroadcastManager.getInstance(this).unregisterReceiver(receiver);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        stopService(new Intent(this, MinaService.class));
        unregisterBroadcast();
    }

    @Override
    public void onClick(View v) {
        switch (v.getId())
        {
            case R.id.btn1:
                SessionManager.getInstance().writeToServer("123");
                break;
            case R.id.btn2:
                Intent intent = new Intent(this, MinaService.class);
                startService(intent);
                break;
        }
    }


    private class MessageBroadcastReceiver extends BroadcastReceiver
    {

        @Override
        public void onReceive(Context context, Intent intent) {
            message.setText(intent.getStringExtra("message"));
        }
    }
}

佈局檔案中就是兩個按鈕和一個文字控制元件,程式碼就不貼了。

  • MinaService.java
public class MinaService extends Service {

    private ConnectionHandlerThread thread;
    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public void onCreate() {
        super.onCreate();
        thread = new ConnectionHandlerThread("mina", getApplicationContext());
        System.out.println("service create:");
        thread.start();
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        thread.disConnection();
    }

    /**
     * 負責呼叫ConnectionManager
     */
    class ConnectionHandlerThread extends HandlerThread {
        private Context context;
        boolean isConnection;
        ConnectionManager mManager;

        public ConnectionHandlerThread(String name, Context context) {
            super(name);
            this.context = context;
            ConnectionConfig config = new ConnectionConfig.Builder(context)
                    .setIp("10.90.24.139").setPort(9123)
                    .setReadBufferSize(10240).setReadBufferSize(10000).builder();
            System.out.println(config.getReadBufferSize());
            mManager = new ConnectionManager(config);
        }

        @Override
        protected void onLooperPrepared() {
            super.onLooperPrepared();
            while (true) {
                isConnection = mManager.connect(); //

                if (isConnection) {
                    break;
                }
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

        public void disConnection() {
            mManager.disConnection();
        }
    }

}

注意: (1)區域性廣播的使用(LocalBroadcastManager) (2)android中AlertDialog使用的構建者模式 (3)HandlerThread的使用