1. 程式人生 > >C# socket 非同步接收訊息

C# socket 非同步接收訊息

近日根據官方提供的通訊例子自己寫了一個關於Unity(C#)和後臺通訊的類,拿出來和大家分享一下。

具體請參考:

1.java服務端用的apach.mina框架搭建。java服務端請參考:http://blog.9tech.cn/?c=site&m=article&id=548</a></p>

2.C#環境:.NET framework 2.0

3.C#幫組文件,及Socket註解:http://msdn.microsoft.com/zh-cn/library/system.net.sockets.socket(v=vs.85).aspx</a></p>

4.官方例子:http://msdn.microsoft.com/zh-cn/library/bew39x2a(v=VS.85).aspx#CommunityContent</a></p>

 

個人覺得,最難的地方在與以下幾個地方:

1.封裝資料,和後臺的編解碼格式保持一致

   封裝資料,其實就是一個前後臺約定好一個通訊格式。比如:獲得所以資料後並寫入位元組陣列後,在吧這個位元組陣列的長度讀出來(4個位元組的整形資料),再封裝進一個位元組陣列中。

    所以最終的資料位元組陣列內容是:4個位元組的資料長度+實際資料的位元組陣列。

當然資料的加密加壓可以在吧實際資料存入位元組陣列後就進行,那麼傳送的長度就是加密加壓後的資料長度了。

實現方法:

1

2

3

4

5

6

7

8

9

String message = "shit ";//5個位元組

byte[] bytes = Encoding.UTF8.GetBytes(message);//未加密加壓的實際資料

byte[] prefixBytes =BitConverter.GetBytes(bytes.Length);//資料長度

Array.Reverse(prefixBytes);

byte[] sendBytes = new byte[bytes.Length + 4];

//吧位元組數組合併到sendBytes 

System.Buffer.BlockCopy(prefixBytes ,0,sendBytes ,0,4);

System.Buffer.BlockCopy(bytes,0,sendBytes ,4,bytes.Length);

//合併完畢,就可以使用socket吧sendBytes傳送出去了,後臺也是有同樣的格式解碼就可以了。

2.非同步通訊的執行緒管理

    非同步通訊的執行緒安全一直是一個難點,它不像ActionScript那樣,建立通訊連線後註冊事件用來偵聽即可。但是在C#中就必須讓執行緒等待當前的非同步操作完成後才可以繼續向下執行。C#非同步通訊中可以使用ManualResetEvent類來處理這類問題。當需要暫停執行後續程式碼以完成非同步操作時。使用ManualResetEvent的WaitOne();方法來阻塞這個執行緒(類似與java的ReentrantLock類),但是必須在非同步操作完成後使執行緒恢復,否則就會出現執行緒被鎖死的情況。使用ManualResetEvent的Set()方法即可。

實現方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

private static ManualResetEvent connectDone = new ManualResetEvent(false);

 

public void startConnect(){

    Connect();

    connectDone.WaitOne();//阻塞執行緒,

     

     receive();

}

 

public void Connect(){

    // to do connection

    //socket.BeginConnect....

}

public void connectCallback(IAsyncResult ar){

 

    Socket socket = (Socket) ar.AsyncState;

    socket .EndConnect(ar);

    connectDone.Set();//恢復執行緒,繼續向下執行

}

public void receive(){

    // to do received message

}

ManualResetEvent類的幫組文件及例子:http://msdn.microsoft.com/zh-cn/library/system.threading.manualresetevent_members(v=vs.85).aspx</a></p>

 

3.關於資料的壓縮,和解壓.

    對於資料的壓縮和解壓可以採用ICSharpCode.SharpZipLib這個動態連結庫。下載地址:http://www.icsharpcode.net/opensource/sharpziplib/</a></p>

下載後再MonoDevelop裡倒入引用就可以了,位置:Project->Edit References..  

using ICSharpCode.SharpZipLib.Zip;

然後在程式碼裡就可以倒入類了

參考:http://blog.sina.com.cn/s/blog_62fda93c0101d51j.html</a></p>

4.接收和讀取資料的操作

    在接受服務端傳送的資料時,也根據同樣的格式進行解讀;先讀取4個位元組的資料長度,再跟進這個長度得到實際的資料,最後在解密和解壓就可以得到最終的資料了。

    但是在這個操作過程中,會出現一些意想不到的麻煩。

    大致流程是:

    採取分段讀取的方式,第一次只讀取4個位元組的長度資訊。

    取得長度後,根據設定的每次分段讀取資料長度來讀取,知道所得的資料和總長度相同;

    每次分段讀取的資料的長度不一定都是設定的長度,所以將每次讀取的資料寫入記憶體流MemoryStream類中。特別重要的每次操作MemoryStream類時注意設定它的Position位置,不然會出現你本來已經成功的存入了資料,但是由於Position的原因沒有找準需要取出或者存入資料的準確位置而讀取資料失敗。

 

詳細程式碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

using UnityEngine;

using System.Collections;

using System.Collections.Generic;

using System;

using System.IO;

using System.Threading;

using System.Text;

using System.Net;

using System.Net.Sockets;

using ICSharpCode.SharpZipLib.Zip;

using ICSharpCode.SharpZipLib.GZip;

using LitJson;

/

  連線物件

  @author duwei

  @version 1.0.0

  build time :2013.11.7

  /

public class BufferConnection{

    public Socket socket = null;

    public const int prefixSize = 4;

    public String ip = "192.168.1.105";

    public int port = 8005;

     

    private static ManualResetEvent connectDone = 

        new ManualResetEvent(false);

    private static ManualResetEvent sendDone = 

        new ManualResetEvent(false);

    private static ManualResetEvent receiveDone = 

        new ManualResetEvent(false);

     

    public BufferConnection(){

             

    }

    // State object for receiving data from remote device.

    public class StateObject {

        // Client socket.

        public Socket workSocket = null;

        // Size of receive buffer.

        public const int BufferSize = 1024;

        // Receive buffer.

        public byte[] buffer = new byte[BufferSize];

    }

    /開始建立socket連線/

    public void startConnect(){

        try{

            Debug.Log("starting connection...");

            IPAddress ipd = IPAddress.Parse(ip);

            EndPoint endPoint = new IPEndPoint(ipd,port);

             

            socket = new Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);

            socket.BeginConnect(endPoint,new AsyncCallback(connectCallback),socket);

            connectDone.WaitOne();

             

            receive(socket);

              //receiveDone.WaitOne();

             

        }catch(Exception e){

            Console.WriteLine(e.ToString());

        }

    }

    public void connectCallback(IAsyncResult ar){

        try{

            Socket backSocket = (Socket)ar.AsyncState;

            backSocket.EndConnect(ar);

            connectDone.Set();

            Debug.Log("on connected");

        }catch(Exception e){

            Console.WriteLine(e.ToString());   

        }  

    }

    /**傳送資料,目前只支援 String 型別資料/

    public void send(Socket client,String msg){

        //封裝資料

        byte[] byteData = Encoding.UTF8.GetBytes(msg);

        byte[] sendData = new byte[byteData.Length + prefixSize];

        byte[] sizeData = BitConverter.GetBytes(byteData.Length);

        //反轉

       Array.Reverse(sizeData);

        //合併

        System.Buffer.BlockCopy(sizeData,0,sendData,0,prefixSize);

        System.Buffer.BlockCopy(byteData,0,sendData,prefixSize,byteData.Length);

        try{

            //socket.Send(sendData);

            client.BeginSend(sendData,0,sendData.Length,0,new AsyncCallback(sendCallback),client);

            Debug.Log("data send finished, data size:"+sendData.Length);

        }catch(Exception e){

            Console.WriteLine(e.ToString());

        }

    }

    public void send(String msg){

        if(socket != null){

            send(socket,msg);

            sendDone.WaitOne();

        }

    }

    public void sendCallback(IAsyncResult ar){

        try{

            Socket socket = (Socket)ar.AsyncState;

            if(ar.IsCompleted){

                int endPoint = socket.EndSend(ar);  

                Debug.Log("send data finished endpoint: "+endPoint);

                sendDone.Set();

            }

        }catch(Exception e){

            Console.WriteLine(e.ToString());

        }

    }

    public void receive(Socket socket){

        try{   

            StateObject so = new StateObject();

            so.workSocket = socket;

            //第一次讀取資料的總長度

            socket.BeginReceive(so.buffer,0,prefixSize,0,new AsyncCallback(receivedCallback),so);

            //測試用:資料在1024以內的資料,一次性讀取出來

            //socket.BeginReceive(so.buffer,0,StateObject.BufferSize,0,new AsyncCallback(simpleCallback),so);

        }catch(Exception e){

            Console.WriteLine(e.ToString());

        }

    }

    public void simpleCallback(IAsyncResult ar){

        StateObject so = (StateObject)ar.AsyncState;  

        Socket socket = so.workSocket;

        byte[] presixBytes = new byte[prefixSize];

        int presix = 0;

        Buffer.BlockCopy(so.buffer,0,presixBytes,0,prefixSize);

        Array.Reverse(presixBytes);

        presix = BitConverter.ToInt32(presixBytes,0);

        if(presix <= 0){

            return;

        }  

        byte [] datas = new byte[presix];

        Buffer.BlockCopy(so.buffer,prefixSize,datas,0,datas.Length);

        String str = Encoding.UTF8.GetString(datas);

        Debug.Log("received message :"+str);

    }

     

    public MemoryStream receiveData = new MemoryStream();

    private bool isPresix = true;

    public int curPrefix = 0;//需要讀取的資料總長度

    public void receivedCallback(IAsyncResult ar){

        try{

            StateObject so = (StateObject)ar.AsyncState;

            Socket client = so.workSocket;

            int readSize = client.EndReceive (ar);//結束讀取,返回已讀取的緩衝區裡的位元組陣列長度

            //將每次讀取的資料,寫入記憶體流裡

            receiveData.Write(so.buffer,0,readSize);

            receiveData.Position = 0;

            //讀取前置長度,只讀取一次

            if((int)receiveData.Length >= prefixSize && isPresix){

                byte[] presixBytes = new byte[prefixSize];

                receiveData.Read(presixBytes,0,prefixSize);

                Array.Reverse(presixBytes);

                curPrefix = BitConverter.ToInt32(presixBytes,0);

                isPresix = false;

            }

            if(receiveData.Length - (long)prefixSize < (long)curPrefix){

                //如果資料沒有讀取完畢,調整Position到最後,接著讀取。

                receiveData.Position = receiveData.Length;

            }else{

                //如果記憶體流中的實際數字總長度符合要求,則說明資料已經全部讀取完畢。

                //將position位置調整到第4個節點,開始準備讀取資料。

                receiveData.Position = prefixSize;

                //讀取資料

                byte[] datas = new byte[curPrefix];

                receiveData.Read(datas,0,datas.Length);

                //有壓縮的話需要先解壓,然後在操作。

                byte[] finallyBytes = decompress(datas);

                String str = Encoding.UTF8.GetString(finallyBytes);

                Debug.Log("the finally message is : "+str);

            }

            //重複讀取,每次讀取1024個位元組資料

            client.BeginReceive(so.buffer,0,StateObject.BufferSize,0,new AsyncCallback(receivedCallback), so);

        }catch(Exception e){

            Console.WriteLine(e.ToString());

        }

    }

    private byte [] temp = new byte[1024];

    //解壓

    public byte[] decompress(byte[] bytes){

        MemoryStream memory = new MemoryStream ();

        ICSharpCode.SharpZipLib.Zip.Compression.Inflater inf = new ICSharpCode.SharpZipLib.Zip.Compression.Inflater ();

        inf.SetInput (bytes);

        while (!inf.IsFinished) {

            int extracted = inf.Inflate (temp);

            if (extracted > 0) {

                memory.Write (temp, 0, extracted);

            else {

                break

            }

        }

        return memory.ToArray ();

    }

}