1. 程式人生 > >springmvc+maven+netty-socketio服務端構建實時通訊

springmvc+maven+netty-socketio服務端構建實時通訊

Socket.IO:支援WebSocket協議、用於實時通訊和跨平臺的框架

WebSocket是HTML5的一種新通訊協議,它實現了瀏覽器與伺服器之間的雙向通訊。而Socket.IO是一個完全由JavaScript實現、基於Node.js、支援WebSocket的協議用於實時通訊、跨平臺的開源框架,它包括了客戶端的JavaScript和伺服器端的Node.js。Socket.IO除了支援WebSocket通訊協議外,還支援許多種輪詢(Polling)機制以及其它實時通訊方式,並封裝成了通用的介面,並且在服務端實現了這些實時機制的相應程式碼。Socket.IO實現的Polling通訊機制包括Adobe Flash Socket、AJAX長輪詢、AJAX multipart streaming、持久Iframe、JSONP輪詢等。Socket.IO能夠根據瀏覽器對通訊機制的支援情況自動地選擇最佳的方式來實現網路實時應用。當前,Socket.IO最新版本是於2015年1月19日釋出的
1.3.0版本
,該版本增強了穩定性和提高了效能,並修復了大量Bug。

Socket.IO設計的目標是構建能夠在不同瀏覽器和移動裝置上良好執行的實時應用,如實時分析系統、二進位制流資料處理應用、線上聊天室、線上客服系統、評論系統、WebIM等。目前,Socket.IO已經支援主流PC瀏覽器(如IE、Safari、Chrome、Firefox、Opera等)和移動平臺上的瀏覽器(iOS平臺下的Safari、Android平臺下的基於Webkit的瀏覽器等)。

Socket.IO實現了實時、雙向、基於事件的通訊機制,它解決了實時的通訊問題,並統一了服務端與客戶端的程式設計方式。啟動了Socket以後,就像建立了一條客戶端與服務端的管道,兩邊可以互通有無。它還能夠和Express.js提供的傳統請求方式很好的結合,即可以在同一個域名,同一個埠提供兩種連線方式:

request/response, websocket(flashsocket,ajax…).


netty-socketio   客戶端和服務端下載地址:https://github.com/mrniko/netty-socketio

1.maven引入依賴jar包

 <dependency>
          <groupId>com.corundumstudio.socketio</groupId>
          <artifactId>netty-socketio</artifactId>
          <version>1.7.7</version>
</dependency>

socketio服務端:SocketIo_Server.java

import java.util.Map;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;

public class SocketIo_Server {
    public static void main(String[] args) throws InterruptedException {
        Configuration config = new Configuration();
        //伺服器主機ip,這裡配置本機
        config.setHostname("localhost");
        //埠,任意
        config.setPort(9092);
        config.setMaxFramePayloadLength(1024 * 1024);
        config.setMaxHttpContentLength(1024 * 1024);
        SocketIOServer server = new SocketIOServer(config);
        //監聽廣告推送事件,advert_info為事件名稱,自定義
        server.addEventListener("advert_info", String.class, new DataListener<String>(){
            @Override
            public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws ClassNotFoundException {
                //客戶端推送advert_info事件時,onData接受資料,這裡是string型別的json資料,還可以為Byte[],object其他型別
                
                String sa = client.getRemoteAddress().toString();
                String clientIp = sa.substring(1,sa.indexOf(":"));//獲取客戶端連線的ip
                Map params = client.getHandshakeData().getUrlParams();//獲取客戶端url引數
                System.out.println(clientIp+":客戶端:************"+data);
            }
        });
        //監聽通知事件
        server.addEventListener("notice_info", String.class, new DataListener<String>() {
            @Override    
            public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
                //同上
            }
        });
        
        /**
         * 監聽其他事件
         */
        
        //新增客戶端連線事件
        server.addConnectListener(new ConnectListener() {
            @Override
            public void onConnect(SocketIOClient client) {
                // TODO Auto-generated method stub
                String sa = client.getRemoteAddress().toString();
                String clientIp = sa.substring(1,sa.indexOf(":"));//獲取裝置ip
                System.out.println(clientIp+"-------------------------"+"客戶端已連線");
                Map params = client.getHandshakeData().getUrlParams();
                
                //給客戶端傳送訊息
                client.sendEvent("advert_info",clientIp+"客戶端你好,我是服務端,有什麼能幫助你的?");
            }
        });
        //新增客戶端斷開連線事件
        server.addDisconnectListener(new DisconnectListener(){
            @Override
            public void onDisconnect(SocketIOClient client) {
                // TODO Auto-generated method stub
                String sa = client.getRemoteAddress().toString();
                String clientIp = sa.substring(1,sa.indexOf(":"));//獲取裝置ip
                System.out.println(clientIp+"-------------------------"+"客戶端已斷開連線");
                
                //給客戶端傳送訊息
                client.sendEvent("advert_info",clientIp+"客戶端你好,我是服務端,期待下次和你見面");
            }
        });
          server.start();
          
        Thread.sleep(Integer.MAX_VALUE);

        server.stop();
    }
}

socketio客戶端:SocketIo_Client.java

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;

public class SocketIo_Client {
    public static void main(String[] args) {
        try{
            IO.Options options = new IO.Options();    
            options.forceNew = true;
            options.reconnection = true;
            final Socket socket = IO.socket("http://localhost:9092?deviceId=ZYLPC", options);
           
            socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
                @Override
                public void call(Object... args) {
                    System.out.println("connect");
//                    socket.close();
                }
            }).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() {
                @Override
                public void call(Object... args) {
                    System.out.println("connect timeout");
                }
            }).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
                @Override    
                public void call(Object... args) {
                    System.out.println("connect error");
                }
            }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
                @Override
                public void call(Object... args) {    
                    System.out.println("disconnect");
                }
            }).on("advert_info", new Emitter.Listener() {
                @Override
                public void call(Object... args) {
                    String data = (String)args[0];
                    System.out.println("服務端:************"+data.toString());
                    //給服務端傳送資訊
                    socket.emit("advert_info", "服務端你好,我是客戶端,我有問題想諮詢你!");
                }
            }).on("notice_info", new Emitter.Listener(){
                @Override
                public void call(Object... args){
                    String data = (String)args[0];
                }
            });
            socket.open();
        }catch(Exception e){
            
        }
    }
}

與spring整合:

服務層:SocketIoService.java

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Service;

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;


@Service("socketIoService")
public class SocketIoService {
    static SocketIOServer server;
    static Map<String, SocketIOClient> clientsMap = new HashMap<String, SocketIOClient>();
    
    public void startServer() throws InterruptedException{
        Configuration config = new Configuration();
        //伺服器主機ip    
        config.setHostname("localhost");
        //埠
        config.setPort(9092);
        config.setMaxFramePayloadLength(1024 * 1024);
        config.setMaxHttpContentLength(1024 * 1024);
        server = new SocketIOServer(config);
        //監聽廣告推送事件,advert_info為事件名稱,自定義
        server.addEventListener("advert_info", String.class, new DataListener<String>(){
            @Override
            public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws ClassNotFoundException {
                //客戶端推送advert_info事件時,onData接受資料,這裡是string型別的json資料,還可以為Byte[],object其他型別
                
                String sa = client.getRemoteAddress().toString();
                String clientIp = sa.substring(1,sa.indexOf(":"));//獲取客戶端連線的ip
                Map params = client.getHandshakeData().getUrlParams();//獲取客戶端url引數
                
                System.out.println(clientIp+":客戶端:************"+data);
            }
        });
        //監聽通知事件
        server.addEventListener("notice_info", String.class, new DataListener<String>() {
            @Override    
            public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
                //同上
            }
        });
        
        /**
         * 監聽其他事件
         */
        
        //新增客戶端連線事件
        server.addConnectListener(new ConnectListener() {
            @Override
            public void onConnect(SocketIOClient client) {
                // TODO Auto-generated method stub
                String sa = client.getRemoteAddress().toString();
                String clientIp = sa.substring(1,sa.indexOf(":"));//獲取裝置ip
                System.out.println(clientIp+"-------------------------"+"客戶端已連線");
                Map params = client.getHandshakeData().getUrlParams();
                
                //獲取客戶端連線的uuid引數
                Object object = params.get("uuid");
                String uuid = "";
                if(object != null){
                    uuid = ((List<String>)object).get(0);
                    //將uuid和連線客戶端物件進行繫結
                    clientsMap.put(uuid,client);
                }
                //給客戶端傳送訊息
                client.sendEvent("advert_info",clientIp+"客戶端你好,我是服務端,有什麼能幫助你的?");
            }
        });
        //新增客戶端斷開連線事件
        server.addDisconnectListener(new DisconnectListener(){
            @Override
            public void onDisconnect(SocketIOClient client) {
                // TODO Auto-generated method stub
                String sa = client.getRemoteAddress().toString();
                String clientIp = sa.substring(1,sa.indexOf(":"));//獲取裝置ip
                System.out.println(clientIp+"-------------------------"+"客戶端已斷開連線");
                
                //給客戶端傳送訊息
                client.sendEvent("advert_info",clientIp+"客戶端你好,我是服務端,期待下次和你見面");
            }
        });
          server.start();
          
        Thread.sleep(Integer.MAX_VALUE);

        server.stop();
    }
    public void stopServer(){
        if(server != null){
            server.stop();
            server = null;
        }
    }
    /**
     *  給所有連線客戶端推送訊息
     * @param eventType 推送的事件型別
     * @param message  推送的內容
     */
    public void sendMessageToAllClient(String eventType,String message){
        Collection<SocketIOClient> clients = server.getAllClients();
        for(SocketIOClient client: clients){
            client.sendEvent(eventType,message);
        }
    }
    /**
     * 給具體的客戶端推送訊息
     * @param deviceId 裝置型別
     * @param eventType推送事件型別
     * @param message 推送的訊息內容
     */
    public void sendMessageToOneClient(String uuid,String eventType,String message){
        try {
            if(uuid != null && !"".equals(uuid)){
                SocketIOClient client = (SocketIOClient)clientsMap.get(uuid);
                if(client != null){
                    client.sendEvent(eventType,message);
                }
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

   public static SocketIOServer getServer() {
        return server;
    }

}

控制層層:SocketIoController.java

import java.util.Map;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
public class SocketIoController {
    @Autowired
    private SocketIoService service;
    
    //啟動socket 服務
    @RequestMapping("startServer")
    public void startServer(HttpServletRequest request,HttpServletResponse response) throws Exception{
        Map params = ReflectUtil.transToMAP(request.getParameterMap());
        try {
            if(service.getServer() == null){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        // TODO Auto-generated method stub
                        try {
                            service.startServer();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    //停止socket服務
    @RequestMapping("stopServer")
    public void stopServer(HttpServletRequest request,HttpServletResponse response) throws Exception{
        Map params = ReflectUtil.transToMAP(request.getParameterMap());
        try {
            if(service.getServer() == null){
                service.stopServer();
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //給指定的客戶端推送訊息
    @RequestMapping("sendAdvertInfoMsg")
    public void sendAdvertInfoMsg(HttpServletRequest request,HttpServletResponse response) throws Exception{
        Map params = ReflectUtil.transToMAP(request.getParameterMap());
        String uuid = ParamsUtil.nullDeal(params, "uuid", "");
        try {
            if(!"".equals(uuid) && service.getServer() != null){
                service.sendMessageToOneClient(uuid, "advert_info", "推送的內容");
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

如果想在spring容器啟動之後啟動sockerio,可以這樣做:

自定義一個類,用@component注入

@component (把普通pojo例項化到spring容器中,相當於配置檔案中的<bean id="" class=""/>
實現spring  ApplicationListener介面,這樣在spring載入成功之後就會呼叫onApplicationEvent方法啟動socketio

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import com.zkkj.backend.common.socketio.BinaryEventLauncher;
import com.zkkj.backend.service.biz.advert.IAdvertService;

/**
 * spring載入完畢後執行
 * @author ZYL_PC
 *
 */
@Component("BeanDefineConfigue")
public class BeanDefineConfigue  implements ApplicationListener<ContextRefreshedEvent>{

    @Autowired
    private SocketIoService service;

    //當前伺服器的ip
    private String serverIp = "";
    //當前伺服器裝置id
    private String deviceId = "";
    //執行時間,時間單位為毫秒,讀者可自行設定,不得小於等於0
    private static Long cacheTime = Long.MAX_VALUE;
    //延遲時間,時間單位為毫秒,讀者可自行設定,不得小於等於0
    private static Integer delay = 3000;
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // TODO Auto-generated method stub
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
          public void run() {
            //啟動socket監聽
              try{
                  if(service.getServer() == null){
                      new Thread(new Runnable() {
                          @Override
                          public void run() {
                              try {
                                  service.startServer();
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      }).start();
                  }
              }catch(Exception e){
              }
          }
        }, delay,cacheTime);// 這裡設定將延時每天固定執行

    }

}