网络编程
位置:首页>> 网络编程>> Go语言>> 基于Golang实现Redis协议解析器

基于Golang实现Redis协议解析器

作者:CSGOPHER  发布时间:2024-04-27 15:37:41 

标签:Golang,Redis,协议,解析

本文实现Redis的协议层,协议层负责解析指令,然后将指令交给核心database执行

echo database用来测试协议层的代码

https://github.com/csgopher/go-redis

RESP协议

RESP是客户端与服务端通信的协议,格式有五种:

正常回复:以“+”开头,以“\r\n”结尾的字符串形式

错误回复:以“-”开头,以“\r\n”结尾的字符串形式

整数:以“:”开头,以“\r\n”结尾的字符串形式

多行字符串:以“$”开头,后跟实际发送字节数,再以“\r\n”开头和结尾

$3\r\nabc\r\n

数组:以“*”开头,后跟成员个数

SET key value
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。

当我们输入*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n这样一串命令,服务端接收到的是如下的命令:
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n

interface/resp/conn.go

type Connection interface {
  Write([]byte) error
  GetDBIndex() int
  SelectDB(int)
}

interface/resp/reply.go
type Reply interface {
ToBytes() []byte
}
  • Connection接口:Redis客户端的一个连接

  • Write:给客户端回复消息

  • GetDBIndex:Redis有16个DB

  • Reply接口:响应接口

resp/reply/consts.go

type PongReply struct{}

var pongBytes = []byte("+PONG\r\n")

func (r *PongReply) ToBytes() []byte {
   return pongBytes
}

var thePongReply = new(PongReply)

func MakePongReply() *PongReply {
   return thePongReply
}

type OkReply struct{}

var okBytes = []byte("+OK\r\n")

func (r *OkReply) ToBytes() []byte {
   return okBytes
}

var theOkReply = new(OkReply)

func MakeOkReply() *OkReply {
   return theOkReply
}

var nullBulkBytes = []byte("$-1\r\n")

type NullBulkReply struct{}

func (r *NullBulkReply) ToBytes() []byte {
   return nullBulkBytes
}

func MakeNullBulkReply() *NullBulkReply {
   return &NullBulkReply{}
}

var emptyMultiBulkBytes = []byte("*0\r\n")

type EmptyMultiBulkReply struct{}

func (r *EmptyMultiBulkReply) ToBytes() []byte {
   return emptyMultiBulkBytes
}

type NoReply struct{}

var noBytes = []byte("")

func (r *NoReply) ToBytes() []byte {
   return noBytes
}

定义五种回复:回复pong,ok,null,空数组,空

resp/reply/reply.go

type ErrorReply interface {
  Error() string
  ToBytes() []byte
}

ErrorReply:定义错误接口

resp/reply/errors.go

type UnknownErrReply struct{}

var unknownErrBytes = []byte("-Err unknown\r\n")

func (r *UnknownErrReply) ToBytes() []byte {
  return unknownErrBytes
}

func (r *UnknownErrReply) Error() string {
  return "Err unknown"
}

type ArgNumErrReply struct {
  Cmd string
}

func (r *ArgNumErrReply) ToBytes() []byte {
  return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}

func (r *ArgNumErrReply) Error() string {
  return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}

func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
  return &ArgNumErrReply{
     Cmd: cmd,
  }
}

type SyntaxErrReply struct{}

var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}

func MakeSyntaxErrReply() *SyntaxErrReply {
  return theSyntaxErrReply
}

func (r *SyntaxErrReply) ToBytes() []byte {
  return syntaxErrBytes
}

func (r *SyntaxErrReply) Error() string {
  return "Err syntax error"
}

type WrongTypeErrReply struct{}

var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")

func (r *WrongTypeErrReply) ToBytes() []byte {
  return wrongTypeErrBytes
}

func (r *WrongTypeErrReply) Error() string {
  return "WRONGTYPE Operation against a key holding the wrong kind of value"
}

type ProtocolErrReply struct {
  Msg string
}

func (r *ProtocolErrReply) ToBytes() []byte {
  return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}

func (r *ProtocolErrReply) Error() string {
  return "ERR Protocol error: '" + r.Msg
}

errors定义5种错误:UnknownErrReply 未知错误,ArgNumErrReply 参数个数错误,SyntaxErrReply 语法错误,WrongTypeErrReply 数据类型错误,ProtocolErrReply 协议错误

resp/reply/reply.go

var (
  nullBulkReplyBytes = []byte("$-1")
  // 协议的结尾
  CRLF = "\r\n"
)

type BulkReply struct {
  Arg []byte
}

func MakeBulkReply(arg []byte) *BulkReply {
  return &BulkReply{
     Arg: arg,
  }
}

func (r *BulkReply) ToBytes() []byte {
  if len(r.Arg) == 0 {
     return nullBulkReplyBytes
  }
  return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}

type MultiBulkReply struct {
  Args [][]byte
}

func (r *MultiBulkReply) ToBytes() []byte {
  argLen := len(r.Args)
  var buf bytes.Buffer
  buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
  for _, arg := range r.Args {
     if arg == nil {
        buf.WriteString("$-1" + CRLF)
     } else {
        buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
     }
  }
  return buf.Bytes()
}

func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
  return &MultiBulkReply{
     Args: args,
  }
}

type StatusReply struct {
  Status string
}

func MakeStatusReply(status string) *StatusReply {
  return &StatusReply{
     Status: status,
  }
}

func (r *StatusReply) ToBytes() []byte {
  return []byte("+" + r.Status + CRLF)
}

type IntReply struct {
  Code int64
}

func MakeIntReply(code int64) *IntReply {
  return &IntReply{
     Code: code,
  }
}

func (r *IntReply) ToBytes() []byte {
  return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}

type StandardErrReply struct {
  Status string
}

func (r *StandardErrReply) ToBytes() []byte {
  return []byte("-" + r.Status + CRLF)
}

func (r *StandardErrReply) Error() string {
  return r.Status
}

func MakeErrReply(status string) *StandardErrReply {
  return &StandardErrReply{
     Status: status,
  }
}

func IsErrorReply(reply resp.Reply) bool {
  return reply.ToBytes()[0] == '-'
}
  • BulkReply:回复一个字符串

  • MultiBulkReply:回复字符串数组

  • StatusReply:状态回复

  • IntReply:数字回复

  • StandardErrReply:标准错误回复

  • IsErrorReply:判断是否为错误回复

  • ToBytes:将字符串转成RESP协议规定的格式

resp/parser/parser.go

type Payload struct {
  Data resp.Reply
  Err  error
}

type readState struct {
  readingMultiLine  bool    
  expectedArgsCount int    
  msgType           byte    
  args              [][]byte
  bulkLen           int64    
}

func (s *readState) finished() bool {
  return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}

func ParseStream(reader io.Reader) <-chan *Payload {
  ch := make(chan *Payload)
  go parse0(reader, ch)
  return ch
}

func parse0(reader io.Reader, ch chan<- *Payload) {
......
}

Payload结构体:客服端给我们发的数据

Reply:客户端与服务端互相发的数据都称为Reply

readState结构体:

  • readingMultiLine:解析单行还是多行数据

  • expectedArgsCount:应该读取的参数个数

  • msgType:消息类型

  • args:消息内容

  • bulkLen:数据长度

finished方法:判断解析是否完成

ParseStream方法:异步解析数据后放入管道,返回管道数据

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
  var msg []byte
  var err error
  if state.bulkLen == 0 {
     msg, err = bufReader.ReadBytes('\n')
     if err != nil {
        return nil, true, err
     }
     if len(msg) == 0 || msg[len(msg)-2] != '\r' {
        return nil, false, errors.New("protocol error: " + string(msg))
     }
  } else {
     msg = make([]byte, state.bulkLen+2)
     _, err = io.ReadFull(bufReader, msg)
     if err != nil {
        return nil, true, err
     }
     if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
        return nil, false, errors.New("protocol error: " + string(msg))
     }
     state.bulkLen = 0
  }
  return msg, false, nil
}

readLine:一行一行的读取。读正常的行,以\n分隔。读正文中包含\r\n字符的行时,state.bulkLen加上换行符\r\n(state.bulkLen+2)

func parseMultiBulkHeader(msg []byte, state *readState) error {
  var err error
  var expectedLine uint64
  expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
  if err != nil {
     return errors.New("protocol error: " + string(msg))
  }
  if expectedLine == 0 {
     state.expectedArgsCount = 0
     return nil
  } else if expectedLine > 0 {
     state.msgType = msg[0]
     state.readingMultiLine = true
     state.expectedArgsCount = int(expectedLine)
     state.args = make([][]byte, 0, expectedLine)
     return nil
  } else {
     return errors.New("protocol error: " + string(msg))
  }
}

func parseBulkHeader(msg []byte, state *readState) error {
  var err error
  state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
  if err != nil {
     return errors.New("protocol error: " + string(msg))
  }
  if state.bulkLen == -1 { // null bulk
     return nil
  } else if state.bulkLen > 0 {
     state.msgType = msg[0]
     state.readingMultiLine = true
     state.expectedArgsCount = 1
     state.args = make([][]byte, 0, 1)
     return nil
  } else {
     return errors.New("protocol error: " + string(msg))
  }
}

parseMultiBulkHeader:解析数组的头部,设置期望的行数和相关参数。

parseBulkHeader:解析多行字符串的头部。

func parseSingleLineReply(msg []byte) (resp.Reply, error) {
  str := strings.TrimSuffix(string(msg), "\r\n")
  var result resp.Reply
  switch msg[0] {
  case '+': // status reply
     result = reply.MakeStatusReply(str[1:])
  case '-': // err reply
     result = reply.MakeErrReply(str[1:])
  case ':': // int reply
     val, err := strconv.ParseInt(str[1:], 10, 64)
     if err != nil {
        return nil, errors.New("protocol error: " + string(msg))
     }
     result = reply.MakeIntReply(val)
  }
  return result, nil
}

func readBody(msg []byte, state *readState) error {
  line := msg[0 : len(msg)-2]
  var err error
  if line[0] == '$' {
     // bulk reply
     state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
     if err != nil {
        return errors.New("protocol error: " + string(msg))
     }
     if state.bulkLen <= 0 { // null bulk in multi bulks
        state.args = append(state.args, []byte{})
        state.bulkLen = 0
     }
  } else {
     state.args = append(state.args, line)
  }
  return nil
}

parseSingleLineReply:解析单行命令

readBody:读取多行的命令,如果是$开头,设置bulkLen,读取下一行时根据这个+2,不是$开头则直接添加到args

func parse0(reader io.Reader, ch chan<- *Payload) {
   defer func() {
      if err := recover(); err != nil {
         logger.Error(string(debug.Stack()))
     }
  }()
   bufReader := bufio.NewReader(reader)
   var state readState
   var err error
   var msg []byte
   for {
      var ioErr bool
      msg, ioErr, err = readLine(bufReader, &state)
      if err != nil {
         if ioErr {
            ch <- &Payload{
               Err: err,
           }
            close(ch)
            return
        }
         ch <- &Payload{
            Err: err,
        }
         state = readState{}
         continue
     }

if !state.readingMultiLine {
         if msg[0] == '*' {
            // multi bulk reply
            err = parseMultiBulkHeader(msg, &state)
            if err != nil {
               ch <- &Payload{
                  Err: errors.New("protocol error: " + string(msg)),
              }
               state = readState{}
               continue
           }
            if state.expectedArgsCount == 0 {
               ch <- &Payload{
                  Data: &reply.EmptyMultiBulkReply{},
              }
               state = readState{}
               continue
           }
        } else if msg[0] == '$' { // bulk reply
            err = parseBulkHeader(msg, &state)
            if err != nil {
               ch <- &Payload{
                  Err: errors.New("protocol error: " + string(msg)),
              }
               state = readState{} // reset state
               continue
           }
            if state.bulkLen == -1 { // null bulk reply
               ch <- &Payload{
                  Data: &reply.NullBulkReply{},
              }
               state = readState{} // reset state
               continue
           }
        } else {
            // single line reply
            result, err := parseSingleLineReply(msg)
            ch <- &Payload{
               Data: result,
               Err:  err,
           }
            state = readState{} // reset state
            continue
        }
     } else {
         // read bulk reply
         err = readBody(msg, &state)
         if err != nil {
            ch <- &Payload{
               Err: errors.New("protocol error: " + string(msg)),
           }
            state = readState{} // reset state
            continue
        }
         // if sending finished
         if state.finished() {
            var result resp.Reply
            if state.msgType == '*' {
               result = reply.MakeMultiBulkReply(state.args)
           } else if state.msgType == '$' {
               result = reply.MakeBulkReply(state.args[0])
           }
            ch <- &Payload{
               Data: result,
               Err:  err,
           }
            state = readState{}
        }
     }
  }
}

parse0:解析指令,解析完成后通过channel发出去

resp/connection/conn.go

type Connection struct {
   conn net.Conn
   waitingReply wait.Wait
   mu sync.Mutex // 避免多个协程往客户端中写
   selectedDB int
}

func NewConn(conn net.Conn) *Connection {
   return &Connection{
       conn: conn,
   }
}

func (c *Connection) RemoteAddr() net.Addr {
   return c.conn.RemoteAddr()
}

func (c *Connection) Close() error {
   c.waitingReply.WaitWithTimeout(10 * time.Second)
   _ = c.conn.Close()
   return nil
}

func (c *Connection) Write(b []byte) error {
   if len(b) == 0 {
       return nil
   }
   c.mu.Lock()
   c.waitingReply.Add(1)
   defer func() {
       c.waitingReply.Done()
       c.mu.Unlock()
   }()

_, err := c.conn.Write(b)
   return err
}

func (c *Connection) GetDBIndex() int {
   return c.selectedDB
}

func (c *Connection) SelectDB(dbNum int) {
   c.selectedDB = dbNum
}

之前写的EchoHandler是用户传过来什么,我们传回去什么。现在要写一个RespHandler来代替EchoHandler,让解析器来解析。RespHandler中要有一个管理客户端连接的结构体Connection。

Connection:客户端连接,在协议层的handler中会用到

resp/handler/handler.go

var (
  unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)

type RespHandler struct {
  activeConn sync.Map
  db         databaseface.Database
  closing    atomic.Boolean
}

func MakeHandler() *RespHandler {
  var db databaseface.Database
  db = database.NewEchoDatabase()
  return &RespHandler{
     db: db,
  }
}

func (h *RespHandler) closeClient(client *connection.Connection) {
  _ = client.Close()
  h.db.AfterClientClose(client)
  h.activeConn.Delete(client)
}

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  if h.closing.Get() {
     // closing handler refuse new connection
     _ = conn.Close()
  }

client := connection.NewConn(conn)
  h.activeConn.Store(client, 1)

ch := parser.ParseStream(conn)
  for payload := range ch {
     if payload.Err != nil {
        if payload.Err == io.EOF ||
           payload.Err == io.ErrUnexpectedEOF ||
           strings.Contains(payload.Err.Error(), "use of closed network connection") {
           // connection closed
           h.closeClient(client)
           logger.Info("connection closed: " + client.RemoteAddr().String())
           return
        }
        // protocol err
        errReply := reply.MakeErrReply(payload.Err.Error())
        err := client.Write(errReply.ToBytes())
        if err != nil {
           h.closeClient(client)
           logger.Info("connection closed: " + client.RemoteAddr().String())
           return
        }
        continue
     }
     if payload.Data == nil {
        logger.Error("empty payload")
        continue
     }
     r, ok := payload.Data.(*reply.MultiBulkReply)
     if !ok {
        logger.Error("require multi bulk reply")
        continue
     }
     result := h.db.Exec(client, r.Args)
     if result != nil {
        _ = client.Write(result.ToBytes())
     } else {
        _ = client.Write(unknownErrReplyBytes)
     }
  }
}

func (h *RespHandler) Close() error {
  logger.Info("handler shutting down...")
  h.closing.Set(true)
  // TODO: concurrent wait
  h.activeConn.Range(func(key interface{}, val interface{}) bool {
     client := key.(*connection.Connection)
     _ = client.Close()
     return true
  })
  h.db.Close()
  return nil
}

RespHandler:和之前的echo类似,加了核心层的db.exec执行解析的指令

interface/database/database.go

type CmdLine = [][]byte

type Database interface {
Exec(client resp.Connection, args [][]byte) resp.Reply
AfterClientClose(c resp.Connection)
Close()
}

type DataEntity struct {
Data interface{}
}

Exec:核心层的执行

AfterClientClose:关闭之后的善后方法

CmdLine:二维字节数组的指令别名

DataEntity:表示Redis的数据,包括string, list, set等等

database/echo_database.go

type EchoDatabase struct {
}

func NewEchoDatabase() *EchoDatabase {
  return &EchoDatabase{}
}

func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
  return reply.MakeMultiBulkReply(args)
}

func (e EchoDatabase) AfterClientClose(c resp.Connection) {
  logger.Info("EchoDatabase AfterClientClose")
}

func (e EchoDatabase) Close() {
  logger.Info("EchoDatabase Close")
}

echo_database:测试协议层

Exec:指令解析后,再使用MakeMultiBulkReply包装一下返回去

main.go

err := tcp.ListenAndServeWithSignal(
  &tcp.Config{
     Address: fmt.Sprintf("%s:%d",
        config.Properties.Bind,
        config.Properties.Port),
  },
  handler.MakeHandler())
if err != nil {
  logger.Error(err)
}

main改成刚才写的:handler.MakeHandler()

来源:https://www.cnblogs.com/csgopher/p/17249305.html

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com