基於TCP協議的項目架構之Socket流傳輸的實現
項目背景
某銀行的影像平臺由於使用時間長,服務器等配置原因,老影像系統滿足不了現在日益增長的數據量的需求,所以急需要升級改造。傳統的影像平臺使用的是Oracle數據庫和簡單的架構來存儲數據(視頻、圖片等),所以很難滿足現在的業務需求,其中最主要的就是存儲下載等速度的影響,綜合考慮之後,我們給出了升級改造方案,基於Http協議的數據傳輸和基於TCP協議的數據傳輸,按照行方的要求需要用TCP協議,我們最終采用的是Socket網絡傳輸的方案。
TCP協議介紹
TCP是一種面向連接的、可靠的、基於字節流的傳輸層通信協議,在簡化的計算機網絡OSI模型中,它完成第四層傳輸層所指定的功能,TCP層位於IP層之上,應用層之下的中間層,不同主機的應用層之間經常需要可靠的、像管道一樣的連接,但IP層不需要提供這樣的機制,而是提供不可靠的包交換。當應用層向TCP層發送用於網間傳輸的、用8位字節標示的數據流,TCP會把數據流分割為適當長度的報文段,之後TCP把數據包傳給IP層,由它來通過網絡將包傳輸給接收端的實體TCP層。TCP是因特網中的傳輸層協議,使用3次握手協議建立連接。
Socket
TCP通信是嚴格區分客戶端與服務端的,在通信時,必須先由客戶端去連接服務器才能實現通信,服務器端不可以主動連接客戶端,並且服務器要事先啟動,等待客戶端的連接。
在JDK中提供了兩個類用於實現TCP程序,一個是ServerSocket類,用於表示服務器,一個是Socket類,用於表示客戶端。通信時,首先創建代表服務器端的ServerSocket對象,該對象相當於開啟一個服務,並等待客戶端的連接,然後創建代表客戶端的Socket對象向服務器端發出連接請求,服務器端響應請求,兩者建立連接,開始通信。
下面是我們通過Socket建立的模型,是多線程的,首先看服務端代碼:
/** * socket服務端 * @author 我心自在 * */ public class Main { private static Log logger=null; //線程池 public static ExecutorService executor =null; /** * 靜態塊,初始化 */ static{ logger=LogFactory.getLog(SocketMain.class); //線程池 executor = Executors.newFixedThreadPool(Integer.parseInt(newPropertiesUtil(). getProperties("config.properties").getProperty("UserThreads"))); } /** * 主程序入口 * @param args */ public static void main(String[] args) { ServerSocket server = null; Socket socket = null; PropertiesUtil PropertiesUtil = new PropertiesUtil(); try { //啟動Socket服務端 server=newServerSocket(Integer.parseInt(PropertiesUtil.getProperties("config.properties") .getProperty("port"))); while (true) { //多線程接收客戶端請求 socket = server.accept(); if (socket != null) { executor.execute(new Controller(socket)); } } } catch (IOException e) { logger.error("Main IO異常:"+e.getMessage(),e); } finally { try { if (server != null) { server.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { logger.error("Main流關閉異常:"+e.getMessage(),e); } } } }
下面是業務處理類,支持多線程,主要用來處理業務
public class SocketController implements Runnable { private Socket socket; private static Log logger=LogFactory.getLog(SocketController.class); public SocketController(Socket socket) { super(); this.socket = socket; } public void run() { //讀取文件流 String requestMethod=null; Map<String,Object> getMethAndNumMap=null; Map<String,Object> jsonMap=null; BufferedInputStream bis =null; OutputStream ops = null; BufferedWriter bw = null; BufferedOutputStream bos = null; Document doc=null; String XMLString=null; try { bis= new BufferedInputStream (socket.getInputStream()); //獲取輸出流 ops = socket.getOutputStream(); bos = new BufferedOutputStream(ops); bw = new BufferedWriter(new OutputStreamWriter(ops)); byte[] fileinfo=new byte[256]; try { bis.read(fileinfo); } catch (IOException e1){ logger.error("流讀取異常...."+e1.getMessage(),e1); } if(fileinfo!=null){ fileInfoString=new String(fileinfo).trim(); } jsonMap=JSONUtil.jsonToMap(fileInfoString); requestMethod=(String) jsonMap.get("requestMethod"); if (!(requestMethod==null||"".equals(requestMethod))) { switch (requestMethod){ case "login": Login.login(XMLString,bw,doc);//登錄接口 break; case "XXXX": XXX.xxx(); break; default: //請求方法錯誤 } } } catch (UnknownHostException e) { logger.error("Socket未知端口:"+e.getMessage(),e); } catch (IOException e) { logger.error("Controller讀流流異常:"+e.getMessage(),e); }finally{ try { if (bw!=null) { bw.flush(); bw.close(); } if(ops!=null){ ops.close(); } if(bos!=null){ bos.close(); } bis.close(); socket.close(); } catch (IOException e) { logger.error("socket關閉異常:"+e.getMessage(),e); } } }
這裏只貼出了部分代碼,做個參考,基本思想就是,通過輸入流接到客戶端發送過來的報文,然後進行解析,為什麽統一用字節流接受呢,因為我們的文件上傳分兩部分,一部分是文件信息,一部分是文件流,所以為了方便,統一使用字節流接收,根據字節流中的請求接口方法,調用對應的接口方法,完成業務處理。因為客戶端的報文有兩種,一種是XML類型的報文,另外一種是json格式的報文,這裏只貼出了部分json格式的代碼。差的只是一個XML的解析,解析方式很多,就不再贅述。
下面是客戶端代碼,以登錄為例:
public class LoginTest { public static void main(String[] args) { String xml = "<?xml version=\"1.0\" encoding=\"utf-8\"?>" + "<root>" + "<requestMethod>login</requestMethod>" + "<userinfo>" + "<loginname>test</loginname>" + "<passwd>dmd1dnZndXY=</passwd>" + "</userinfo>" + "</root>"; Socket socket=null; BufferedWriter bw=null; BufferedReader br=null; try { socket=new Socket(new ConfigUtil().getValue("ipAddress"), Integer.parseInt(new ConfigUtil().getValue("port"))); bw=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bw.write(xml); bw.newLine(); bw.flush(); br=new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println("服務器返回報文:"+br.readLine()); bw.close(); br.close(); socket.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
客戶端代碼非常簡單,基本就是以流的形式發送報文,發送到客戶端,完成請求,收到服務器響應後,關閉連接,完成一次請求。
以上只是一個簡單的Socket通信模型,具體項目中涉及到很多內容,這裏無法一一列舉。大致思路我們就是通過Socket通信,獲取客戶端發送過來的報文,然後對報文進行解析,根據請求方法,調用不同的業務接口,處理不同的業務,我們的業務包括對影像資料的增刪改查,文件根據不同的大小存在大數據集群中,小文件存入Hbase,大文件存入HDFS,然後根據不同的場景去查詢刪除或者更新。這裏邊涉及到的存索引,用到了Elasticsearch5.4,用來做高級檢索查詢。
基於TCP協議的項目架構之Socket流傳輸的實現