EchoServer設計到實現(六)
阿新 • • 發佈:2019-01-28
問題
專案在經過思考後,重構了。主要重構原因有主要以下幾點
1.包的設計粒度多大,導致依賴導致
2.程式碼gorouting操作不當,浪費cpu使用
3.網路模型使用不當,同步模型可以滿足設計需求
4.select關鍵字使用不錯,導致gorouting停止等待
重構後
修改協議設計,新增緩衝區,將網路和協議耦合降低。設計協議類,儘量以結構體為單位去做操作方法。
//思考:是否支援非同步解析,現在支援同步解析
import (
"bytes"
"encoding/binary"
)
const (
//MsgLength 字串長度
MsgLength = 4
)
//Protocol 協議
type Protocol struct {
msgBuf []byte
//ReaderChannel chan []byte
}
//New 返回新物件
func New() *Protocol {
pro := &Protocol{msgBuf: make([]byte, 0)} //readerChannel:make(chan []byte,1024)}
//go pro.Depack()
return pro
}
//PushStream 新增緩衝區節流
func (pro *Protocol) PushStream(buf []byte ) {
pro.msgBuf = append(pro.msgBuf, buf...)
}
//Enpack 壓縮
func (pro *Protocol) Enpack(message []byte) []byte {
Header := IntToBytes(len(message))
return append(Header, message...)
}
//Depack 解壓
func (pro *Protocol) Depack() []string {
Msg := make([]string, 0)
buffer := pro.msgBuf
BufLen := len (pro.msgBuf)
i := 0
for i < BufLen {
if BufLen < i+MsgLength {
break
}
msgLen := BytesToInt(buffer[i : i+MsgLength])
if msgLen > 0 && msgLen < 2<<32 {
if BufLen < i+MsgLength+msgLen {
break
}
data := buffer[i+MsgLength : i+MsgLength+msgLen]
Msg = append(Msg, string(data))
i += MsgLength + msgLen
}
}
if i == BufLen {
pro.msgBuf = make([]byte, 0)
} else {
pro.msgBuf = buffer[i:]
}
return Msg
}
//IntToBytes 整形轉換byte
func IntToBytes(n int) []byte {
x := int32(n)
bytesBuffer := bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, x)
return bytesBuffer.Bytes()
}
//BytesToInt byte轉換Int
func BytesToInt(b []byte) int {
bytesBuffer := bytes.NewBuffer(b)
var x int32
binary.Read(bytesBuffer, binary.BigEndian, &x)
return int(x)
}
重構了網路操作,取代主函式,所有操作歸併到網路類;更改接收和傳送非同步方式為同步操作。主要原因非同步存在傳送積壓,導致客戶端接收超時。
package server
import (
"EchoTCP/config"
"EchoTCP/logger"
"EchoTCP/protocol"
"net"
"time"
)
//Config 配置累
var Config = config.New(3)
//EchoServer 服務sock結構
type EchoServer struct {
Port string
IP string
Pro string
}
//Client 客戶端
type Client struct {
conn net.Conn
tmpBuf protocol.Protocol
}
//New 建立新的EchoServer
func New() *EchoServer {
logger.Log.Logger.Info("Create EchoServer Sucesss")
Config.Dyn.Monitor()
return &EchoServer{}
}
//SetPort 設定埠號
func (server *EchoServer) SetPort(port string) {
server.Port = port
}
//SetAddr 設定IP地址
func (server *EchoServer) SetAddr(addr string) {
server.IP = addr
}
//SetPro 設定連線協議
func (server *EchoServer) SetPro(pro string) {
server.Pro = pro
}
//Listen 啟動監聽服務 返回監聽物件 錯誤址
func (server *EchoServer) Listen() (listener net.Listener, err error) {
if server.IP == "" {
server.IP = Config.Sta.Sock.IP
}
if server.Port == "" {
server.Port = Config.Sta.Sock.Port
}
if "" == server.Pro {
server.Pro = Config.Sta.Sock.Protocol
}
addr := server.IP + ":" + server.Port
listener, err = net.Listen(server.Pro, addr)
logger.Log.Logger.Info(addr)
logger.Log.Logger.Info(server.Pro)
if err != nil {
logger.Log.Logger.Error(err.Error())
return nil, err
}
return listener, nil
}
//Accept 接受連線
func (server *EchoServer) Accept(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
logger.Log.Logger.Error(err.Error())
continue
}
logger.Log.Logger.Info("There is Conner")
client := Client{conn, *protocol.New()}
go server.RecvHandle(client)
}
}
//RecvHandle 接受客戶端傳送過的訊息
func (server *EchoServer) RecvHandle(client Client) {
buf := make([]byte, 1024)
defer client.conn.Close()
for {
client.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
n, err := client.conn.Read(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
logger.Log.Logger.Info("client timeout")
} else {
logger.Log.Logger.Error(err.Error())
}
break
}
server.SendHandle(client, buf[:n])
}
}
//SendHandle 讀寫訊息
func (server *EchoServer) SendHandle(client Client, buf []byte) {
client.tmpBuf.PushStream(buf)
recMsg := client.tmpBuf.Depack()
for _, msg := range recMsg {
logger.Log.Logger.Info(string("recv Msassge " + msg))
switch Config.Dyn.Opt.Select {
case 1:
msg = Reverse(msg)
default:
msg = Shuffle(msg)
}
_, err := server.SendSingleMsg(client.conn, msg)
if err != nil {
logger.Log.Logger.Error(err.Error())
}
logger.Log.Logger.Info(string("send msg : " + msg))
}
}
//SendSingleMsg 單條傳送
func (server *EchoServer) SendSingleMsg(conn net.Conn, sendMsg string) (n int, err error) {
n, err = conn.Write([]byte(sendMsg))
return
}
修改針對配置的設計,把動態配置和靜態配置合併一起,去掉冗餘程式碼,封裝一個配置管理類,通過傳入引數分類實力化動態配置還是靜態配置。
type opteration struct {
Select int
}
type dynConfig struct {
Opt opteration
Watcher *fsnotify.Watcher
}
type sock struct {
Port string
IP string
Protocol string
}
type def struct {
BufMax int
Timeout int
}
type staConfig struct {
Sock sock
Def def
}
//Config 全域性配置
type Config struct {
Sta *staConfig
Dyn *dynConfig
}
//New 生成新的物件
func New(configType int) (conf *Config) {
switch configType {
case 1:
conf = &Config{Sta: newStaticConfig()}
case 2:
conf = &Config{Dyn: newDynConfig()}
case 3:
conf = &Config{Sta: newStaticConfig(), Dyn: newDynConfig()}
}
return
}
func newStaticConfig() *staConfig {
var conf staConfig
if err := readFileConfig(&conf, staticFilePath); err != nil {
return nil
}
return &conf
}
func newDynConfig() *dynConfig {
var conf dynConfig
var err error
if err = readFileConfig(&conf, dynFilePath); err != nil {
return nil
}
conf.Watcher, err = fsnotify.NewWatcher()
if err != nil {
logger.Log.Logger.Error(err.Error())
return nil
}
err = conf.Watcher.Watch(dynFilePath)
if err != nil {
logger.Log.Logger.Error(err.Error())
return nil
}
return &conf
}
func readFileConfig(tlConfig interface{}, fileName string) (err error) {
if _, err = toml.DecodeFile(fileName, tlConfig); err != nil {
logger.Log.Logger.Error(err.Error())
return err
}
//fmt.Println(tlConfig, fileName)
return nil
}
func (dyn *dynConfig) Monitor() {
go func() {
for {
select {
case <-dyn.Watcher.Event:
temp := newDynConfig()
if temp.Opt.Select != dyn.Opt.Select {
dyn.Opt = temp.Opt
}
logger.Log.Logger.Info(string("Change : " + fmt.Sprintf("%d", temp.Opt.Select)))
continue
case err := <-dyn.Watcher.Error:
logger.Log.Logger.Error(err.Error())
return
default:
time.Sleep(1 * time.Second)
}
}
}()
}
結束語
go語言的博大精深,設計理念由於沒有具體的書籍很難學好。建議通過讀go語言的原始碼,拋析原始碼學習使用go語言。
ps:專案暫時不給出,有需要可以私信我。