1. 程式人生 > >Mina實現Socket通訊完整過程

Mina實現Socket通訊完整過程

目錄

  • 服務端
  • 客戶端
  • 通訊
  • 自定義工廠編解碼
  • 解碼器
  • 編碼器
  • 總結
  • # 加入戰隊
    • 微信公眾號

title: Mina服務端客戶端通訊
date: 2018-09-30 09:00:30
tags:
- [mina]
- [tcp]
categories:
- [程式設計]
permalink: zxh
---

前兩章節已經完整的介紹了理論部分,今天我們就利用這些理論來實現tcp協議的c/s 通訊。首先我們簡單回顧下之前的介紹,

在mina中我們的客戶端和服務端簡直就是一模一樣,只是我們用不同介面卡。但是他的資料處理流程是一樣的。今天我們就重點看看如何建立服務端、客戶端
並且處理兩者之間的訊息通訊處理

服務端

服務端和客戶端不同的就是我們建立的監聽物件不同而已,客戶端傳送訊息到服務端,服務端需要經歷過濾器的處理才能到達訊息中心,但是在過濾器中我們就需要將訊息進行解碼,然後才會到訊息接收的地方處理我們的業務。正常情況下我們處理完訊息需要對客戶端進行迴應。迴應的時候也會經歷過濾器中的編碼邏輯,進行資料編碼然後傳送。資訊傳送到客戶端我們可以看成服務端的方向。也是需要進行編解碼的。下面看看服務端的建立程式碼


//建立監聽物件
IoAcceptor acceptor = new NioSocketAcceptor();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//新增過濾器
acceptor.getFilterChain().addLast("logger",new LoggingFilter());
acceptor.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        textLineCodecFactory
));
//設定時間處理的handler
acceptor.setHandler(new ServerMessageHandler());
//設定讀取資料快取區的大小
acceptor.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//設定多久沒有訊息就進入空閒狀態
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
//繫結埠
try {
    acceptor.bind(new InetSocketAddress(Constaint.REMOTE_PORT));
} catch (IOException e) {
    logger.error(String.format("bind %s error",Constaint.REMOTE_PORT));
    e.printStackTrace();
}
logger.info(String.format("bind %s success",Constaint.REMOTE_PORT));

客戶端

    
//建立監聽物件
IoConnector connector = new NioSocketConnector();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//新增過濾器
//日誌過濾器 。  sltf日誌設定
connector.getFilterChain().addLast("logger",new LoggingFilter());
//在這個過濾器中提供了編解碼,這裡的編碼是以資訊中已\r\n結尾算是一條資訊 
connector.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        new SocketFactory()
));
//設定時間處理的handler , 提供session生命週期的監聽函式,訊息接受,傳送的函式
connector.setHandler(new ClientMessageHandler());
//設定讀取資料快取區的大小
connector.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//設定多久沒有訊息就進入空閒狀態
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
ConnectFuture future = connector.connect(new InetSocketAddress(Constaint.REMOTE_IP,Constaint.REMOTE_PORT));
//是非同步處理,這裡不會造成阻塞
future.addListener(new IoFutureListener<IoFuture>() {
    @Override
    public void operationComplete(IoFuture ioFuture) {
        logger.info("連線準備完成");
        IoSession session = ioFuture.getSession();

    }
});

通訊

  • 其實上面服務端,客戶端兩邊建立好就應經在通訊了,在上面建立的時候我們發現。建立的時候需要指定訊息處理器(IoHandlerAdapter) , 這個在IoService中會排在IoFilter之後執行。在過濾器執行之後我們就會呼叫我們的訊息處理器。

    private static Logger logger = LogManager.getLogger(ServerMessageHandler.class);
    public void sessionCreated(IoSession session) throws Exception {
        super.sessionCreated(session);
        logger.info("sessionCreated");
    }

    public void sessionOpened(IoSession session) throws Exception {
        super.sessionOpened(session);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        logger.info("sessionOpened");
    }

    public void sessionClosed(IoSession session) throws Exception {
        super.sessionClosed(session);
        logger.info("sessionClosed");
    }

    public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception {
        super.sessionIdle(session,idleStatus);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        //        logger.info("sessionIdle");
    }

    public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception {
        logger.info("exceptionCaught");
        throwable.printStackTrace();
    }

    public void messageReceived(IoSession session, Object message) throws Exception {
        super.messageReceived(session, message);
        String info = message.toString();
        Date date = new Date(System.currentTimeMillis());
        SimpleDateFormat sdf = new  SimpleDateFormat("yy-MM-dd HH:mm:ss");
        String time = sdf.format(date);
        session.write(time);
        System.out.println("接收到的訊息:"+info);
    }

    public void messageSent(IoSession session, Object message) throws Exception {
        super.messageSent(session, message);
        logger.info("messageSent");
    }
  • 這裡訊息處理器,提供了幾個時刻可以控制,比如session建立、銷燬的時候執行的地方。訊息接收的地方,訊息傳送成功的地方。這些控制力度可以根據我們的需要進行適度的複寫。

自定義工廠編解碼

  • 工廠是提供編解碼的方法。這個工廠是載入在ProtocolCodecFilter這個過濾器中的。我們也可以自定義過濾器,在自定義的過濾器中我們也可以載入我們自定義的工廠,實現編解碼。我們在編解碼的地方,就可以加入我們的業務程式碼。比如解碼通過約定的協議方式讀取到內容後通過ProtocolDecoderOutput 將訊息寫出去就可以在我們的IoHandlerAdapter的messageReceived方法中獲取到訊息。然後業務書寫。這樣做到程式碼的解耦。

public class SocketFactory  implements ProtocolCodecFactory {
    private MessageDecoder decoder;
    private MessageEncoder encoder;

    public SocketFactory() {
        decoder = new MessageDecoder();
        encoder = new MessageEncoder();
    }

    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return this.decoder;
    }

    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return this.encoder;
    }
}

解碼器

  • 上面的工廠就是提供編解碼的。和我們生活中一樣工廠提供功能,但是實際並不是工廠做的,工廠可能只代理功能,僅僅是個加工廠而已。mina通訊也是如此。真正編解碼的並不是工廠執行的,本節將揭露解碼者CumulativeProtocolDecoder

  • 解碼器寫好之後只需要在上面自定義工廠中建立就好了。至於自定義編碼器只需要繼承CumulativeProtocolDecoder這個類就好了。而且複寫doDecode方法就好了。這個方法的返回值是boolean型別。返回值不同代表意義不一。這裡需要重點理清楚

+ true:  返回true表示你已經從CumulativeProtocolDecoder的訊息中消費了資訊,在編碼器中返回true之前也應該呼叫ProtocolDecoderOutput 的wirte將訊息釋出到IoHandAdaptor中進行業務處理。但是這裡會出現其他情況,應為我們服務端客戶端是長連線所以在我們訊息中訊息是不斷髮過來的,我們快取中的訊息可能是完整一條訊息,也可能不夠一整條訊息,也可能是一整條多了一點,
   1、如果不是一條完整(半包)的那麼我們返回falsed等待客戶端繼續傳送
   2、如果正好是一整條,那麼我們接受到之後返回true的時候我們快取中就沒有資料了,在CumulativeProtocolDecoder會停止對解碼中doDecode的呼叫了,這種情況不會出現意外
   3、資料比一條完整資訊(粘包)多,那麼我們處理到一條資訊後也需要返回true,但是CumulativeProtocolDecoder會將剩餘的快取繼續拼裝,剩餘訊息就相當於內部進行了第二次解碼。如果不過那麼相當於上面第一種情況

   記住三種情況  半包 、 正常 、 粘包
+ false: 返回false就是快取中的資料不夠我們一整條訊息,需要繼續等待客戶端的訊息。CumulativeProtocolDecoder中的快取機制會不斷的將客戶端發過來的資料拼接到快取中

public class MessageDecoder extends CumulativeProtocolDecoder {
    /**
     * 此方法return true : 表示父類中CumulativeProtocolDecoder會不斷的呼叫此方法進行訊息的消費
     *       return false: 表示訊息已經消費完全了,快取中就算有資料也不會再消費了。等待再次客戶端
     *       傳送訊息時會觸發訊息傳送介面,此時會將新舊訊息拼接再一起進行處理
     * @param ioSession
     * @param ioBuffer
     * @param protocolDecoderOutput
     * @return
     * @throws Exception
     */
    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        IoBuffer buffer = IoBuffer.allocate(10);
        while (ioBuffer.hasRemaining()) {
            if (ioBuffer.remaining()<3) {
                //繼續接受
                return false;
            }
            //獲取三個位元組
            int oldLimit = ioBuffer.limit();
            ioBuffer.limit(ioBuffer.position()+3);
            String text = ioBuffer.getString(Charset.forName("UTF-8").newDecoder());
            protocolDecoderOutput.write(text);
            ioBuffer.limit(oldLimit);
            if (ioBuffer.hasRemaining()) {
                return true;
            }
        }
        return false;
    }
}

編碼器

  • 編碼器相對解碼器簡單很多,編碼器就是加入我們的協議,正常情況就是我們業務程式碼中訊息是一個Java實體,我們需要做的是將Java實體按照協議轉換成IoBuffer進行傳送。但是我們mina中傳送訊息是通過IoSession中write方法傳送的。我們檢視原始碼發現在IoSession.write(Object o),傳送的如果是IoBuffer那麼就不經過我們的編碼器,否則會經過我們編碼器進行編碼最終將轉換後的IoBuffer傳送出去。


public class MessageEncoder extends ProtocolEncoderAdapter {
    @Override
    public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
        //TODO  根據協議編碼
        //組裝好之後  ioSession.write(IoBuffer)寫出
        System.out.println(o);
    }
}

總結

加入戰隊

# 加入戰隊

微信公眾號

相關推薦

Mina實現Socket通訊完整過程

目錄 服務端 客戶端 通訊 自定義工廠編解碼 解碼器 編碼器 總結 # 加入戰隊 微信公眾號

C#實現Socket通訊(同時監聽多客戶端)

//建立socket物件 //第一個引數:設定網路定址的協議、第二引數設定資料傳輸的方式、第三個引數設定通訊協議 Socket serverSocket = new Socket(AddressFamily.InterNetwork, SocketT

glib實現Socket通訊

server #include <glib.h> #include <gio/gio.h> #include <stdio.h> gchar *buffer; gboolean network_read(GIOChann

C++:實現socket通訊(TCP/IP)例項

       首先宣告,博主之前從來沒有寫過通訊方面的東西,這次之所以寫這個是因為專案需要,因此本文主要介紹一個使用C++語言及Socket來實現TCP/IP通訊的例項,希望可以幫助入門者。 一、什麼是TCP/IP?         TCP提供基於IP環境下的資料可靠性傳

python實現socket通訊 TCP

 Server:  # server import socket address = ('127.0.0.1', 31500) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s = socket.socket(

Java Socket實戰之六 使用NIO包實現Socket通訊

前面幾篇文章介紹了使用java.io和java.net類庫實現的Socket通訊,下面介紹一下使用java.nio類庫實現的Socket。 java.nio包是Java在1.4之後增加的,用來提高I/O操作的效率。在nio包中主要包括以下幾個類或介面: * Bu

實現socket通訊的簡單demo

//伺服器類 public class Server {public static void main(String[] args) {         try {             //1.建立一個伺服器端Socket,即ServerSocket,指定繫結的埠,並

Android客戶端與PC伺服器實現Socket通訊(wifi)

本文介紹Android終端持續掃描AP資訊併發送給伺服器端的實現。首先基於TCP協議在Android終端和PC兩端之間形成網路虛擬鏈路。使用ServerSocket建立TCP伺服器端,然後在Android客戶端使用Socket的構造器來連線伺服器。其中Android終端通過WIFI連線和PC處於同一區域網

socket通訊過程

TCP方式下,伺服器端在accept處會阻塞,直到客戶端發起connect,建立連線後執行之後的語句 recv函式執行時如果對方已經有send,猜想send已經把內容發到本機的緩衝區,recv函式會取出資料。若對方沒有send(猜想即緩衝區裡沒東西可取),則阻塞,等待對方s

Android簡單實現Socket通訊,客戶端連線伺服器後,伺服器向客戶端傳送文字資料

案例實現的是簡單的Socket通訊,當客戶端(Android客戶端)連線到指定伺服器以後,伺服器向客戶端傳送一句話文字資訊(你可以拓展其它的了) 先看一下服務端程式的實現吧 Server.java import java.io.IOException;

AJAX的實現步驟(完整過程

AJAX全稱為“Asynchronous JavaScript and XML”(非同步JavaScript和XML),是一種建立互動式網頁應用的網頁開發技術。Ajax的工作原理相當於在使用者和伺服器之間加了—箇中間層(AJAX引擎),使使用者操作與伺服器響應非同步化。並不是

三、ESP8266LUA開發之建立TCP伺服器,實現socket通訊控制繼電器,串列埠,伺服器,客戶端收發資料小感悟

8266做伺服器,實現TCP通訊 注,實際燒錄的時候會因為註釋過多造成燒錄不進去的情況,這個時候需要刪除註釋! 先來測試8266建立伺服器,並分得IP 燒錄時,先燒wifi.lua,然後再燒init.lua。 init.lua

python實現socket通訊

server端 import socket import commands HOST='10.13.7.2' PORT=50007 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind((

Netty框架的簡單使用,實現socket通訊

個人部落格:haichenyi.com。感謝關注   題外話,很多人都把JDK1.4提供的NIO稱之為非同步非阻塞I/O;其實,並不然,從嚴格意義上面講,它只能稱為非阻塞I/O。在JDK1.7提供的NIO 2.0,新增了非同步的套接字通道Channel,它才是

OpenStack建立例項完整過程原始碼詳細分析(14)----依據AMQP通訊架構實現訊息接收機制解析之一

感謝朋友支援本部落格,歡迎共同探討交流,由於能力和時間有限,錯誤之處在所難免,歡迎指正! 如果轉載,請保留作者資訊。 部落格地址:http://blog.csdn.net/gaoxingnengjisuan 郵箱地址:[email protected] 這篇博文

基於nginx+keepalived架構實現完整過程

安裝 nginx 負載均衡器 nginx+keepalived 監控Nginx 安裝1.安裝PCRE軟件包tar zxvf pcre-8.10.tar.gzcd pcre-8.10./configuremake && make install2.安裝Nginx軟件包tar zxvf nginx

java代碼實現socket接口通訊(堵塞I/O)

write ice 代碼 args fin true finall tar 輸出 傳統的java實現socket通訊比較簡單實現,不過它屬於堵塞式的I/O流存取,只能由一個線程完成當前任務才能起下個一個線程,無法解決高並發; 1、簡單的socketservice

實現socket的服務和客戶端通訊

對學習過程中自己敲的一些關於socket有關的程式碼做了個簡單總結,在這分享一下,給有需要的同學借鑑一下。 什麼是socket? 網路上的兩個程式通過一個雙向的通訊連線實現資料的交換,這個連線的一端稱為一個socket。 建立網路通訊連線至少要一對埠號(socket)。socke

TCP和UDP並實現socket的簡單通訊

http://www.cnblogs.com/IPrograming/archive/2012/10/15/CSharp_Socket_4.html http://www.cnblogs.com/dolphinX/p/3460545.html 一、TCP和UDP的區別和聯絡 1.五層協議

三、UDP實現簡單的socket通訊例項

1.伺服器端程式碼 package udp; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; /** * 類功能描述:伺服器端,先啟動伺服器端後啟動