mina保持android端\服務端的長連線
更多幹貨
一.mina簡介
Apache Mina是一個能夠幫助使用者開發高效能和高伸縮性網路應用程式的框架。與Netty出自同一人之手,都是一個介於應用程式與網路之間的NIO框架,通過Java nio技術基於TCP/IP和UDP/IP協議提供了抽象的、事件驅動的、非同步的API,使程式設計師從繁瑣的網路操作中解脫出來,花更多的時間在業務處理上。 mina分為三層,如下圖: 1、IOService層:處理IO操作 2、IOFilter層:過濾器鏈,日誌處理、位元組變換、物件轉換等操作 3、IOHandler層:真正的處理業務邏輯的地方
mina核心類
IoService
IoService用來管理各種IO服務,在mina中,這些服務可以包括session、filter、handler等
服務端IoAcceptor及相關類
IOAcceptor相當於是對ServerSocketChannel的封裝,最重要的兩個操作是繫結與接受連線。 Acceptor執行緒專門負責接受連線,在其上有一個selector,輪詢是否有連線建立上來,當有連線建立上來,呼叫ServerSocketChannel.accept方法來接受連線,這個方法返回一個session物件,然後將這個session物件加入processor中,由processor遍歷每個session來完成真正的IO操作。processor上也有一個selector與一個Processor執行緒,selector用於輪詢session,Processor執行緒處理每個session的IO操作。
客戶端IOConnector及相關類
IOConnector的設計與IOAcceptor幾乎完全一樣,唯一不同的是與Acceptor執行緒對應的是Connector執行緒,在完成連線操作後也是扔了一個session物件到Processor中。
過濾器(Filter)
下面是官網提供的過濾器 可以通過繼承IoFilterAdapter來實現自己的過濾器,但一般不需要這麼做,以下是一些常用的過濾器:
- LoggingFilter 記錄mina所有日誌
- ProtocolCodecFilter 協議編碼解碼過濾器
- CompressionFilter 資料壓縮過濾器
- SSLFilter 資料加密過濾器
IoSession
Mina每建立一個連線同時會建立一個session物件,用於儲存這次讀寫需要用到的所有資訊。從抽象類AbstractIoSession中可以看出session具有如下功能: 1、從attributes成員可以看出session可以存放使用者關心的鍵值對 2、注意到WriteRequestQueue,這是一個寫請求佇列,processor中呼叫flush或者flushNow方法時會將使用者寫入的資料包裝成一個writeRequest物件,並加入這個佇列中。 3、提供了大量的統計功能,比如接收到了多少訊息、最後讀取時間等 在程式碼中設定session:
// 建立伺服器監聽
IoAcceptor acceptor = new NioSocketAcceptor();
// 設定buffer的長度
acceptor.getSessionConfig().setReadBufferSize(2048);
// 設定連線超時時間
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
連線到來建立一個session,初始化好之後加入到processor負責的一個佇列中。processor執行緒會把佇列中的session對應的通道都註冊到它自己的selector上,然後這個selector輪詢這些通道是否準備就緒,一旦準備就緒就呼叫對應方法進行處理(read or flushNow)。 Mina中的session具有狀態,且狀態之間是可以相互轉化的 IoFilter與IoHandler就是在這些狀態上面加以干預,下面重點看一下IDLE狀態,它分三種: Idle for read:在規定時間內沒有資料可讀 Idle for write:在規定時間內沒有資料可寫 Idle for both:在規定時間內沒有資料可讀和可寫 這三種狀態分別對應IdleStatus類的三個常量:READER_IDLE、WRITER_IDLE、BOTH_IDLE 前面session的用法中有如下設定:
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
二.保持長連線
服務端
引入相關jar包 (1)mina-core-2.0.16.jar (2)slf4j-api-1.7.21.jar及相關jar包
- MainService.java
public class MinaService {
public static void main(String[] args) {
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
acceptor.setHandler(new DemoServiceHandler());
acceptor.getSessionConfig().setMaxReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); //10秒沒有讀寫就進入空閒狀態
try {
acceptor.bind(new InetSocketAddress(9123));
} catch (IOException e) {
e.printStackTrace();
}
}
private static class DemoServiceHandler extends IoHandlerAdapter{
@Override
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
super.exceptionCaught(session, cause);
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
String msg = message.toString();
session.write(new Date());
System.out.println("接收到的資料:"+msg);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
}
}
}
客戶端
相關jar包 (1)mina-core-2.0.16.jar (2)slf4j-android-1.6.1-RC1.jar
- ConnectionManager.java
public class ConnectionManager {
private static final String BROADCAST_ACTION="com.commonlibrary.mina";
private static final String MESSAGE="message";
private ConnectionConfig mConfig;
private WeakReference<Context> mContext;
private NioSocketConnector mConnection;
private IoSession mSession;
private InetSocketAddress mAddress;
public ConnectionManager(ConnectionConfig config) {
this.mConfig = config;
this.mContext = new WeakReference<Context>(config.getContext());
init();
}
private void init() {
mAddress = new InetSocketAddress(mConfig.getIp(), mConfig.getPort());
mConnection = new NioSocketConnector();
mConnection.getSessionConfig().setReadBufferSize(mConfig.getReadBufferSize());
mConnection.getFilterChain().addLast("logger", new LoggingFilter());
mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
mConnection.setHandler(new DefaultHandler(mContext.get()));
mConnection.setDefaultRemoteAddress(mAddress);
}
public boolean connect() {
try {
ConnectFuture future = mConnection.connect();
future.awaitUninterruptibly();
mSession = future.getSession();
SessionManager.getInstance().setSeesion(mSession);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return mSession != null ? true:false;
}
public void disConnection()
{
mConnection.dispose();
mConnection = null;
mSession = null;
mAddress = null;
mContext = null;
}
private static class DefaultHandler extends IoHandlerAdapter {
private final Context mContext;
public DefaultHandler(Context context) {
this.mContext = context;
}
@Override
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
//將我們的session儲存到我們的session manager類中, 從而可以傳送訊息到伺服器
}
@Override
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
if (mContext != null)
{
Intent intent = new Intent(BROADCAST_ACTION);
intent.putExtra(MESSAGE, message.toString());
LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
}
}
}
- ConnectionConfig.java
public class ConnectionConfig {
private Context context;
private String ip;
private int port;
private int readBufferSize;
private long connectionTimeout;
public Context getContext() {
return context;
}
public void setContext(Context context) {
this.context = context;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getReadBufferSize() {
return readBufferSize;
}
public void setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
}
public long getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(long connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
//構建者模式
public static class Builder{
private Context context;
private String ip="10.90.24.139";
private int port=9123;
private int readBufferSize=10240;
private long connectionTimeout=10000;
public Builder(Context context) {
this.context = context;
}
public Builder setIp(String ip) {
this.ip = ip;
return this;
}
public Builder setPort(int port) {
this.port = port;
return this;
}
public Builder setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
public Builder setConnectionTimeout(long connectionTimeout) {
this.connectionTimeout = connectionTimeout;
return this;
}
private void applyConfig(ConnectionConfig config)
{
config.context = this.context;
config.ip = this.ip;
config.port = this.port;
config.readBufferSize = readBufferSize;
config.connectionTimeout = this.connectionTimeout;
}
public ConnectionConfig builder()
{
ConnectionConfig config = new ConnectionConfig();
applyConfig(config);
return config;
}
}
}
- SessionManager.java
public class SessionManager {
private static SessionManager mInstance = null;
//最終與伺服器進行通訊的物件
private IoSession mSession;
public static SessionManager getInstance() {
if (mInstance == null)
{
synchronized (SessionManager.class) {
if (mInstance == null) {
mInstance = new SessionManager();
}
}
}
return mInstance;
}
public void setSeesion(IoSession session){
this.mSession = session;
}
public SessionManager() {
}
public SessionManager(IoSession mSession) {
this.mSession = mSession;
}
/**
* 將物件寫到服務端
* @param msg
*/
public void writeToServer(Object msg)
{
if (mSession != null) {
mSession.write(msg);
}
}
public void closeSession()
{
if (mSession != null)
mSession.closeOnFlush();
}
public void removeSession()
{
this.mSession = null;
}
}
- MinaActivity.java
public class MinaActivity extends Activity implements View.OnClickListener{
private MessageBroadcastReceiver receiver = new MessageBroadcastReceiver();
private Button btn1, btn2;
private TextView message;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.mina_test);
message = (TextView) findViewById(R.id.message);
btn1 = (Button) findViewById(R.id.btn1);
btn2 = (Button) findViewById(R.id.btn2);
btn1.setOnClickListener(this);
btn2.setOnClickListener(this);
registerBroadcast();
}
private void registerBroadcast() {
IntentFilter filter = new IntentFilter("com.commonlibrary.mina");
LocalBroadcastManager.getInstance(this).registerReceiver(receiver, filter);
}
private void unregisterBroadcast()
{
LocalBroadcastManager.getInstance(this).unregisterReceiver(receiver);
}
@Override
protected void onDestroy() {
super.onDestroy();
stopService(new Intent(this, MinaService.class));
unregisterBroadcast();
}
@Override
public void onClick(View v) {
switch (v.getId())
{
case R.id.btn1:
SessionManager.getInstance().writeToServer("123");
break;
case R.id.btn2:
Intent intent = new Intent(this, MinaService.class);
startService(intent);
break;
}
}
private class MessageBroadcastReceiver extends BroadcastReceiver
{
@Override
public void onReceive(Context context, Intent intent) {
message.setText(intent.getStringExtra("message"));
}
}
}
佈局檔案中就是兩個按鈕和一個文字控制元件,程式碼就不貼了。
- MinaService.java
public class MinaService extends Service {
private ConnectionHandlerThread thread;
@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onCreate() {
super.onCreate();
thread = new ConnectionHandlerThread("mina", getApplicationContext());
System.out.println("service create:");
thread.start();
}
@Override
public void onDestroy() {
super.onDestroy();
thread.disConnection();
}
/**
* 負責呼叫ConnectionManager
*/
class ConnectionHandlerThread extends HandlerThread {
private Context context;
boolean isConnection;
ConnectionManager mManager;
public ConnectionHandlerThread(String name, Context context) {
super(name);
this.context = context;
ConnectionConfig config = new ConnectionConfig.Builder(context)
.setIp("10.90.24.139").setPort(9123)
.setReadBufferSize(10240).setReadBufferSize(10000).builder();
System.out.println(config.getReadBufferSize());
mManager = new ConnectionManager(config);
}
@Override
protected void onLooperPrepared() {
super.onLooperPrepared();
while (true) {
isConnection = mManager.connect(); //
if (isConnection) {
break;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void disConnection() {
mManager.disConnection();
}
}
}
注意: (1)區域性廣播的使用(LocalBroadcastManager) (2)android中AlertDialog使用的構建者模式 (3)HandlerThread的使用