自学内容网 自学内容网

Go: IM系统分布式架构方案 (6)

分布式部署可能遇到的问题


常见 nginx 反向代理方案

  • 假设按照上述架构方案来
  • A用户接入后connA(ws客户端) 由节点1来维护
  • B用户接入后connA(ws客户端) 由节点2来维护
  • 流程: A->B 发信息: A -> connA -> 分析处理 -> connB -> B
  • 实际上,上述流程是没有办法通信的,因为 A找不到B在哪里
  • 核心问题:系统如何将消息投递到 connB?

常用解决方案

1 ) 使用消息总线

  • 优点:简单
  • 去电:各个节点不知道彼此节点状态
  • 例子:redis/kafka/MQ

2 ) 局域网通信协议

  • 节点间通过通信协议来通信
  • 优点:简单,成本低
  • 缺点:不知道节点状态
  • 例子:UDP

3 ) 实现调度应用

  • 自己实现调度应用,保存各个客户端的状态
  • 优点:可靠
  • 缺点:复杂

基于局域网通信UDP协议解决

  • 以上三种方案,我们选择第二种来解决
  • 首先,回顾单体应用
    • 开启ws接收协程recvproc/ws发送协程sendproc
    • websocket收到消息->dispatch发送给dstid
  • 基于UDP的分布式应用
    • 开启ws接收协程recvproc/ws发送协程sendproc
    • 开启udp接收协程udprecvproc/udp发送协程udpsendproc
    • websocket收到消息->broadMsg广播到局域网
    • udp接收到收到消息->dispatch发送给dstid
    • 自己是局域网一份子,所以也能接收到消息

代码实现解决,ctrl/chat.go

package ctrl

import (
"net/http"
"github.com/gorilla/websocket"
"gopkg.in/fatih/set.v0"
"sync"
"strconv"
"log"
"encoding/json"
"net"
)

const (
CMD_SINGLE_MSG = 10
CMD_ROOM_MSG   = 11
CMD_HEART      = 0
)

type Message struct {
Id      int64  `json:"id,omitempty" form:"id"` //消息ID
Userid  int64  `json:"userid,omitempty" form:"userid"` //谁发的
Cmd     int    `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
Dstid   int64  `json:"dstid,omitempty" form:"dstid"`//对端用户ID/群ID
Media   int    `json:"media,omitempty" form:"media"` //消息按照什么样式展示
Content string `json:"content,omitempty" form:"content"` //消息的内容
Pic     string `json:"pic,omitempty" form:"pic"` //预览图片
Url     string `json:"url,omitempty" form:"url"` //服务的URL
Memo    string `json:"memo,omitempty" form:"memo"` //简单描述
Amount  int    `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
/**
消息发送结构体
1、MEDIA_TYPE_TEXT
{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
2、MEDIA_TYPE_News
{id:1,userid:2,dstid:3,cmd:10,media:2,content:"标题",pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/dsturl","memo":"这是描述"}
3、MEDIA_TYPE_VOICE,amount单位秒
{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
4、MEDIA_TYPE_IMG
{id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
5、MEDIA_TYPE_REDPACKAGR //红包amount 单位分
{id:1,userid:2,dstid:3,cmd:10,media:5,url:"http://www.baidu.com/a/b/c/redpackageaddress?id=100000","amount":300,"memo":"恭喜发财"}
6、MEDIA_TYPE_EMOJ 6
{id:1,userid:2,dstid:3,cmd:10,media:6,"content":"cry"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

8、MEDIA_TYPE_VIDEO 8
{id:1,userid:2,dstid:3,cmd:10,media:8,pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/a.mp4"}

9、MEDIA_TYPE_CONTACT 9
{id:1,userid:2,dstid:3,cmd:10,media:9,"content":"10086","pic":"http://www.baidu.com/a/avatar,jpg","memo":"胡大力"}
*/

//本核心在于形成userid和Node的映射关系
type Node struct {
Conn *websocket.Conn
//并行转串行,
DataQueue chan []byte
GroupSets set.Interface
}
//映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node,0)
//读写锁
var rwlocker sync.RWMutex

// ws://127.0.0.1/chat?id=1&token=xxxx
func Chat(writer http.ResponseWriter,
request *http.Request) {
//fmt.Printf("%+v",request.Header)
// 检验接入是否合法
    //checkToken(userId int64,token string)
    query := request.URL.Query()
    id := query.Get("id")
    token := query.Get("token")
    userId ,_ := strconv.ParseInt(id,10,64)
isvalida := checkToken(userId,token)
//如果isvalida=true
//isvalida=false

conn,err :=(&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return isvalida
},
}).Upgrade(writer,request,nil)
if err!=nil{
log.Println(err.Error())
return
}
// 获得conn
node := &Node{
Conn:conn,
DataQueue:make(chan []byte,50),
GroupSets:set.New(set.ThreadSafe),
}
// 获取用户全部群Id
comIds := contactService.SearchComunityIds(userId)
for _,v:=range comIds{
node.GroupSets.Add(v)
}
// userid和node形成绑定关系
rwlocker.Lock()
clientMap[userId]=node
rwlocker.Unlock()
//todo 完成发送逻辑,con
go sendproc(node)
//todo 完成接收逻辑
go recvproc(node)
    log.Printf("<-%d\n",userId)
sendMsg(userId,[]byte("hello,world!"))
}

// 添加新的群ID到用户的groupset中
func AddGroupId(userId,gid int64){
//取得node
rwlocker.Lock()
node,ok := clientMap[userId]
if ok{
node.GroupSets.Add(gid)
}
//clientMap[userId] = node
rwlocker.Unlock()
//添加gid到set
}
// ws发送协程
func sendproc(node *Node) {
for {
select {
case data:= <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage,data)
if err!=nil{
log.Println(err.Error())
return
}
}
}
}
// ws接收协程
func recvproc(node *Node) {
for{
_,data,err := node.Conn.ReadMessage()
if err!=nil{
log.Println(err.Error())
return
}
//dispatch(data)
//把消息广播到局域网
broadMsg(data)
log.Printf("[ws]<=%s\n",data)
}
}

func init(){
go udpsendproc()
go udprecvproc()
}

// 用来存放发送的要广播的数据
var udpsendchan chan []byte=make(chan []byte,1024)
// 将消息广播到局域网
func broadMsg(data []byte){
udpsendchan<-data
}
// 完成udp数据的发送协程
func udpsendproc(){
log.Println("start udpsendproc")
//todo 使用udp协议拨号
con,err:=net.DialUDP("udp",nil,
&net.UDPAddr{
IP:net.IPv4(192,168,0,255), // 这里代表网段,这个可以在部署的时候,抽离出去配置
Port:3000,
})
defer con.Close()
if err!=nil{
log.Println(err.Error())
return
}
// 通过的到的con发送消息
// con.Write()
for{
select {
case data := <- udpsendchan:
_,err=con.Write(data)
if err!=nil{
log.Println(err.Error())
return
}
}
}
}
// 完成upd接收并处理功能
func udprecvproc(){
log.Println("start udprecvproc")
 //todo 监听udp广播端口
 con,err:=net.ListenUDP("udp",&net.UDPAddr{
 IP:net.IPv4zero,
 Port:3000,
 })
 defer con.Close()
 if err!=nil{log.Println(err.Error())}
// 处理端口发过来的数据
for{
var buf [512]byte
n,err:=con.Read(buf[0:])
if err!=nil{
log.Println(err.Error())
return
}
//直接数据处理
dispatch(buf[0:n])
}
log.Println("stop updrecvproc")
}

// 后端调度逻辑处理
func dispatch(data[]byte){
//todo 解析data为message
msg := Message{}
err := json.Unmarshal(data,&msg)
if err!=nil{
log.Println(err.Error())
return
}
// 根据cmd对逻辑进行处理
switch msg.Cmd {
case CMD_SINGLE_MSG:
sendMsg(msg.Dstid,data)
case CMD_ROOM_MSG:
//todo 群聊转发逻辑
for _,v:= range clientMap{
if v.GroupSets.Has(msg.Dstid){
v.DataQueue<-data
}
}
case CMD_HEART:
//todo 一般啥都不做
}
}

// 发送消息
func sendMsg(userId int64,msg []byte) {
rwlocker.RLock()
node,ok:=clientMap[userId]
rwlocker.RUnlock()
if ok{
node.DataQueue<- msg
}
}
// 检测是否有效
func checkToken(userId int64,token string)bool{
//从数据库里面查询并比对
user := userService.Find(userId)
return user.Token==token
}

nginx 反向代理

upstream wsbackend {
server 192.168.0.102:8080;
server 192.168.0.100:8080;
hash $request_uri;
}
map $http_upgrade $connection_upgrade {
      default upgrade;
      ''      close; 
}
    server {
  listen  80;
  server_name localhost;
  location / {
      proxy_pass http://wsbackend;
  }
  location ^~ /chat {
   proxy_pass http://wsbackend;
   proxy_connect_timeout 500s;
       proxy_read_timeout 500s;
   proxy_send_timeout 500s;
   proxy_set_header Upgrade $http_upgrade;
       proxy_set_header Connection "Upgrade";
  }
 }
}

注意,这里在 server 192.168.0.102; server 192.168.0.100; 这两台服务器上启动 chat.exe 程序
需要提前 go build 并进行相关部署,此处不在赘述,下面会有说明

打包部署

  • 我们要打包应用,同时需要 asset 和 view 目录,这两个在 go build 中是不会被打包进去的
  • 所以,我们要同时把两者和二进制程序一起进行部署
  • 这里,我们写两个脚本文件,分别对应在 window 和 linux 平台的部署文件
  • 这里说下,一般会借助 jenkins 来操作

1 )windows 下

rd /s/q release
md release
::go build -ldflags "-H windowsgui" -o chat.exe
go build -o chat.exe
COPY chat.exe release\
COPY favicon.ico release\favicon.ico
XCOPY asset\*.* release\asset\  /s /e
XCOPY view\*.* release\view\  /s /e

2 )Linux 下

#!/bin/sh
rm -rf ./release
mkdir  release
go build -o chat
chmod +x ./chat 
cp chat ./release/
cp favicon.ico ./release/ 
cp -arf ./asset ./release/
cp -arf ./view ./release/
  • 注意,linux 下这里使用 nohup: nohup ./chat >>./log.log 2>&1 &

3 ) 总结

  • 上面两种是分别在不同平台手动部署的样例
  • 实际上,我们如果借助jenkins只需要配置下linux下的相关命令即可

原文地址:https://blog.csdn.net/Tyro_java/article/details/140589919

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!