1. 程式人生 > 其它 >golang原始碼分析:redcon基於redis協議的框架

golang原始碼分析:redcon基於redis協議的框架

https://github.com/tidwall/redcon 是一個Go實現 的 Redis 相容伺服器框架。它實現了redis協議,封裝了網路連線,我們可以基於這個庫快速實現一個基於redis協議的伺服器。簡單的redis伺服器https://github.com/redis-go/redis 就是基於這個包實現的。

package main


import (
"log"
"strings"
"sync"


"github.com/tidwall/redcon"
)


var addr = ":6380"


func main() {
var mu sync.RWMutex
var items = make(map[string][]byte)

var ps redcon.PubSub
go log.Printf("started server at %s", addr)
err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
case "ping":

conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.Lock()
items[string(cmd.Args[1])] = cmd.Args[2]

mu.Unlock()
conn.WriteString("OK")
case "get":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.RLock()
val, ok := items[string(cmd.Args[1])]
mu.RUnlock()
if !ok {
conn.WriteNull()
} else {
conn.WriteBulk(val)
}
case "del":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.Lock()
_, ok := items[string(cmd.Args[1])]
delete(items, string(cmd.Args[1]))
mu.Unlock()
if !ok {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
case "publish":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
conn.WriteInt(ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])))
case "subscribe", "psubscribe":
if len(cmd.Args) < 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
command := strings.ToLower(string(cmd.Args[0]))
for i := 1; i < len(cmd.Args); i++ {
if command == "psubscribe" {
ps.Psubscribe(conn, string(cmd.Args[i]))
} else {
ps.Subscribe(conn, string(cmd.Args[i]))
}
}
}
},
func(conn redcon.Conn) bool {
// Use this function to accept or deny the connection.
// log.Printf("accept: %s", conn.RemoteAddr())
return true
},
func(conn redcon.Conn, err error) {
// This is called when the connection has been closed
// log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
},
)
if err != nil {
log.Fatal(err)
}
}

下面看下原始碼實現,原始碼很簡單,主要是兩個檔案:redcon/redcon.go,redcon/resp.go前者實現了網路連線的包裝,後者實現了redis協議。依賴了兩個網路包https://github.com/tidwall/btree,https://github.com/tidwall/match

我們還是從例子的入口函式ListenAndServe開始學習

func ListenAndServe(addr string,
handler func(conn Conn, cmd Command),
accept func(conn Conn) bool,
closed func(conn Conn, err error),
) error {
return ListenAndServeNetwork("tcp", addr, handler, accept, closed)
}

傳入了4個引數,地址、服務handler(服務核心邏輯實現的地方,處理請求並返回結果)、accept函式和close函式。核心邏輯只是對ListenAndServeNetwork的一個包裝,確定了網路協議是tcp協議

func ListenAndServeNetwork(
net, laddr string,
handler func(conn Conn, cmd Command),
accept func(conn Conn) bool,
closed func(conn Conn, err error),
) error {
return NewServerNetwork(net, laddr, handler, accept, closed).ListenAndServe()
}

NewServerNetwort函式初始化了server,最終呼叫的是server的ListenAndServe()函式。

func NewServerNetwork(
net, laddr string,
handler func(conn Conn, cmd Command),
accept func(conn Conn) bool,
closed func(conn Conn, err error),
) *Server {
if handler == nil {
panic("handler is nil")
}
s := &Server{
net: net,
laddr: laddr,
handler: handler,
accept: accept,
closed: closed,
conns: make(map[*conn]bool),
}
return s
}
func (s *Server) ListenAndServe() error {
return s.ListenServeAndSignal(nil)
}
func (s *Server) ListenServeAndSignal(signal chan error) error {
ln, err := net.Listen(s.net, s.laddr)
if err != nil {
if signal != nil {
signal <- err
}
return err
}
s.ln = ln
if signal != nil {
signal <- nil
}
return serve(s)
}

在這裡初始化了網路連線,偵聽網路埠,最後呼叫serve服務

funcserve(s*Server)error{
for{
lnconn, err := s.ln.Accept()
if s.accept != nil && !s.accept(c) {
go handle(s, c)
}
}
}

serve是整個服務的大迴圈,裡面不斷accept請求,對每個連線,啟用一個協程去處理請求內容。

funchandle(s*Server,c*conn){
for {
cmds, err := c.rd.readCommands(nil)
for len(c.cmds) > 0 {
cmd := c.cmds[0]
s.handler(c, cmd)
}
}

在handle函式內部呼叫server的handler去處理服務端請求的內容。至此整個服務端的框架基本介紹完畢。裡面還封裝了一套TLS的server邏輯,內容基本相似。

func NewServerTLS(addr string,
handler func(conn Conn, cmd Command),
accept func(conn Conn) bool,
closed func(conn Conn, err error),
config *tls.Config,
) *TLSServer {
return NewServerNetworkTLS("tcp", addr, handler, accept, closed, config)
}

下面重點介紹下handler函式,它是server結構體的一個屬性

type Server struct {
mu sync.Mutex
net string
laddr string
handler func(conn Conn, cmd Command)
accept func(conn Conn) bool
closed func(conn Conn, err error)
conns map[*conn]bool
ln net.Listener
done bool
idleClose time.Duration
// AcceptError is an optional function used to handle Accept errors.
AcceptError func(err error)
}

有兩個引數Conn 網路連線、Command請求引數

type Conn interface {}
type conn struct {
conn net.Conn
wr *Writer
rd *Reader
addr string
ctx interface{}
detached bool
closed bool
cmds []Command
idleClose time.Duration
}

包裹了網路連線和reader、writer

redis協議resp的定義如下

type RESP struct {
Type Type
Raw []byte
Data []byte
Count int
}

並且也實現了相關協議的解析函式

    func ReadNextRESP(b []byte) (n int, resp RESP) 
func ReadNextCommand(packet []byte, argsbuf [][]byte)
func readTelnetCommand(packet []byte, argsbuf [][]byte)
func AppendAny(b []byte, v interface{}) []byte

redcon只是一個server框架,基於這個框架,我們可以向開發httpserver一樣非常方便地開發出一個相容redis協議的服務端。