godis源码分析——TCP服务
前言
Godis 是一个用 Go 语言实现的 Redis 服务器。
地址:https://github.com/HDT3213/godis?tab=readme-ov-file
简单架构描述
godis是一个中心服务,是TCP服务。流程大概是:godis开启服务,客户端通过TCP建立连接。客户端发起命令,如’keys k’查询k是否存在,服务端接收到这个命令,从而做出业务逻辑返回结果给客户端。
所以第一步就来看看godis的TCP服务的源码实现
源码目录结构
.
├── LICENSE
├── README.md
├── README_CN.md
├── aof
├── appendonly.aof
├── build-all.sh
├── build-darwin.sh
├── build-linux.sh
├── cluster
├── commands.md
├── config
├── database
├── datastruct
├── go.mod
├── go.sum
├── interface
├── lib
├── logs
├── main.go
├── node1.conf
├── node2.conf
├── node3.conf
├── pubsub
├── redis
├── redis.conf
├── tcp
├── test.rdb
└── tmp
源码
主方法启动
func main() {
// 打印标志
print(banner)
// 日志配置
logger.Setup(&logger.Settings{
Path: "logs",
Name: "godis",
Ext: "log",
TimeFormat: "2006-01-02",
})
// 配置文件
configFilename := os.Getenv("CONFIG")
// 获取redis.conf,如果没有获取到,则用默认参数
if configFilename == "" {
if fileExists("redis.conf") {
config.SetupConfig("redis.conf")
} else {
config.Properties = defaultProperties
}
} else {
config.SetupConfig(configFilename)
}
// 主程序启动,开启TCP服务
err := tcp.ListenAndServeWithSignal(&tcp.Config{
Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port),
}, RedisServer.MakeHandler())
if err != nil {
logger.Error(err)
}
}
TCP服务
// Config stores tcp server properties
type Config struct {
Address string `yaml:"address"`
MaxConnect uint32 `yaml:"max-connect"`
Timeout time.Duration `yaml:"timeout"`
}
// ClientCounter Record the number of clients in the current Godis server
var ClientCounter int32
// ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
// 创建两个通道
closeChan := make(chan struct{})
sigCh := make(chan os.Signal)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
// 异步执行一个内置方法
go func() {
sig := <-sigCh
switch sig {
case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
closeChan <- struct{}{}
}
}()
// 监听服务
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return err
}
//cfg.Address = listener.Addr().String()
logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
// 监听和服务
ListenAndServe(listener, handler, closeChan)
return nil
}
// ListenAndServe binds port and handle requests, blocking until close
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
// listen signal
errCh := make(chan error, 1)
defer close(errCh)
// 异步监听服务关闭信号
go func() {
// select 监听通道信息
select {
case <-closeChan:
logger.Info("get exit signal")
case er := <-errCh:
logger.Info(fmt.Sprintf("accept error: %s", er.Error()))
}
logger.Info("shutting down...")
// 关闭监听
_ = listener.Close() // listener.Accept() will return err immediately
// 关闭连接
_ = handler.Close() // close connections
}()
// 创建一个上下文
ctx := context.Background()
var waitDone sync.WaitGroup
for {
// 监听阻塞
conn, err := listener.Accept()
if err != nil {
// learn from net/http/serve.go#Serve()
if ne, ok := err.(net.Error); ok && ne.Timeout() {
logger.Infof("accept occurs temporary error: %v, retry in 5ms", err)
time.Sleep(5 * time.Millisecond)
continue
}
// 错误信息传递
errCh <- err
break
}
// handle
logger.Info("accept link")
// 统计客户端连接数
ClientCounter++
// 并发等待
waitDone.Add(1)
// 建立异步方法
go func() {
// 方法处理结束后,关闭
defer func() {
logger.Info("done...")
waitDone.Done()
// 建立连接后数量-1
atomic.AddInt32(&ClientCounter, -1)
}()
logger.Info("handle data")
// 服务端处理,上下文传递
handler.Handle(ctx, conn)
}()
}
waitDone.Wait()
}
TCP消息处理
package server
/*
* A tcp.Handler implements redis protocol
*/
import (
"context"
"io"
"net"
"strings"
"sync"
"github.com/hdt3213/godis/cluster"
"github.com/hdt3213/godis/config"
database2 "github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/sync/atomic"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
)
var (
unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)
// 处理结构体
// Handler implements tcp.Handler and serves as a redis server
type Handler struct {
// 活跃连接, 并发安全map
activeConn sync.Map // *client -> placeholder
// 存储引擎
db database.DB
closing atomic.Boolean // refusing new client and new request
}
// 创建一个处理实例
// MakeHandler creates a Handler instance
func MakeHandler() *Handler {
// redis的一个存储引擎
var db database.DB
// 创建是集群还是单例
if config.Properties.ClusterEnable {
db = cluster.MakeCluster()
} else {
db = database2.NewStandaloneServer()
}
return &Handler{
db: db,
}
}
// 客户端关闭连接处理
func (h *Handler) closeClient(client *connection.Connection) {
_ = client.Close()
h.db.AfterClientClose(client)
h.activeConn.Delete(client)
}
// 处理接收到客户端的命令
// Handle receives and executes redis commands
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
// closing handler refuse new connection
_ = conn.Close()
return
}
client := connection.NewConn(conn)
// 存储一个客户端
h.activeConn.Store(client, struct{}{})
// 获取字符串
ch := parser.ParseStream(conn)
// 接收客户端数据
for payload := range ch {
// 遍历消息体
if payload.Err != nil {
// 错误信息处理,关闭连接
if payload.Err == io.EOF ||
payload.Err == io.ErrUnexpectedEOF ||
strings.Contains(payload.Err.Error(), "use of closed network connection") {
// connection closed
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr())
return
}
// 协议错误处理,关闭连接
// protocol err
errReply := protocol.MakeErrReply(payload.Err.Error())
_, err := client.Write(errReply.ToBytes())
if err != nil {
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr())
return
}
continue
}
// 信息不可为空
if payload.Data == nil {
logger.Error("empty payload")
continue
}
// 获取到客户端信息
r, ok := payload.Data.(*protocol.MultiBulkReply)
if !ok {
logger.Error("require multi bulk protocol")
continue
}
// 执行结果
result := h.db.Exec(client, r.Args)
// 结果回复
if result != nil {
_, _ = client.Write(result.ToBytes())
} else {
_, _ = client.Write(unknownErrReplyBytes)
}
}
}
// Close stops handler
func (h *Handler) Close() error {
logger.Info("handler shutting down...")
h.closing.Set(true)
// TODO: concurrent wait
h.activeConn.Range(func(key interface{}, val interface{}) bool {
client := key.(*connection.Connection)
_ = client.Close()
return true
})
h.db.Close()
return nil
}
原文地址:https://blog.csdn.net/qq_43163694/article/details/140096990
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!