Go: IM系统分布式架构方案 (6)
分布式部署可能遇到的问题
常见 nginx 反向代理方案
- 假设按照上述架构方案来
- A用户接入后connA(ws客户端) 由节点1来维护
- B用户接入后connA(ws客户端) 由节点2来维护
- 流程: A->B 发信息: A -> connA -> 分析处理 -> connB -> B
- 实际上,上述流程是没有办法通信的,因为 A找不到B在哪里
- 核心问题:系统如何将消息投递到 connB?
常用解决方案
1 ) 使用消息总线
- 优点:简单
- 去电:各个节点不知道彼此节点状态
- 例子:redis/kafka/MQ
2 ) 局域网通信协议
- 节点间通过通信协议来通信
- 优点:简单,成本低
- 缺点:不知道节点状态
- 例子:UDP
3 ) 实现调度应用
- 自己实现调度应用,保存各个客户端的状态
- 优点:可靠
- 缺点:复杂
基于局域网通信UDP协议解决
- 以上三种方案,我们选择第二种来解决
- 首先,回顾单体应用
- 开启ws接收协程recvproc/ws发送协程sendproc
- websocket收到消息->dispatch发送给dstid
- 基于UDP的分布式应用
- 开启ws接收协程recvproc/ws发送协程sendproc
- 开启udp接收协程udprecvproc/udp发送协程udpsendproc
- websocket收到消息->broadMsg广播到局域网
- udp接收到收到消息->dispatch发送给dstid
- 自己是局域网一份子,所以也能接收到消息
代码实现解决,ctrl/chat.go
package ctrl
import (
"net/http"
"github.com/gorilla/websocket"
"gopkg.in/fatih/set.v0"
"sync"
"strconv"
"log"
"encoding/json"
"net"
)
const (
CMD_SINGLE_MSG = 10
CMD_ROOM_MSG = 11
CMD_HEART = 0
)
type Message struct {
Id int64 `json:"id,omitempty" form:"id"` //消息ID
Userid int64 `json:"userid,omitempty" form:"userid"` //谁发的
Cmd int `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
Dstid int64 `json:"dstid,omitempty" form:"dstid"`//对端用户ID/群ID
Media int `json:"media,omitempty" form:"media"` //消息按照什么样式展示
Content string `json:"content,omitempty" form:"content"` //消息的内容
Pic string `json:"pic,omitempty" form:"pic"` //预览图片
Url string `json:"url,omitempty" form:"url"` //服务的URL
Memo string `json:"memo,omitempty" form:"memo"` //简单描述
Amount int `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
/**
消息发送结构体
1、MEDIA_TYPE_TEXT
{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
2、MEDIA_TYPE_News
{id:1,userid:2,dstid:3,cmd:10,media:2,content:"标题",pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/dsturl","memo":"这是描述"}
3、MEDIA_TYPE_VOICE,amount单位秒
{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
4、MEDIA_TYPE_IMG
{id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
5、MEDIA_TYPE_REDPACKAGR //红包amount 单位分
{id:1,userid:2,dstid:3,cmd:10,media:5,url:"http://www.baidu.com/a/b/c/redpackageaddress?id=100000","amount":300,"memo":"恭喜发财"}
6、MEDIA_TYPE_EMOJ 6
{id:1,userid:2,dstid:3,cmd:10,media:6,"content":"cry"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}
8、MEDIA_TYPE_VIDEO 8
{id:1,userid:2,dstid:3,cmd:10,media:8,pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/a.mp4"}
9、MEDIA_TYPE_CONTACT 9
{id:1,userid:2,dstid:3,cmd:10,media:9,"content":"10086","pic":"http://www.baidu.com/a/avatar,jpg","memo":"胡大力"}
*/
//本核心在于形成userid和Node的映射关系
type Node struct {
Conn *websocket.Conn
//并行转串行,
DataQueue chan []byte
GroupSets set.Interface
}
//映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node,0)
//读写锁
var rwlocker sync.RWMutex
// ws://127.0.0.1/chat?id=1&token=xxxx
func Chat(writer http.ResponseWriter,
request *http.Request) {
//fmt.Printf("%+v",request.Header)
// 检验接入是否合法
//checkToken(userId int64,token string)
query := request.URL.Query()
id := query.Get("id")
token := query.Get("token")
userId ,_ := strconv.ParseInt(id,10,64)
isvalida := checkToken(userId,token)
//如果isvalida=true
//isvalida=false
conn,err :=(&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return isvalida
},
}).Upgrade(writer,request,nil)
if err!=nil{
log.Println(err.Error())
return
}
// 获得conn
node := &Node{
Conn:conn,
DataQueue:make(chan []byte,50),
GroupSets:set.New(set.ThreadSafe),
}
// 获取用户全部群Id
comIds := contactService.SearchComunityIds(userId)
for _,v:=range comIds{
node.GroupSets.Add(v)
}
// userid和node形成绑定关系
rwlocker.Lock()
clientMap[userId]=node
rwlocker.Unlock()
//todo 完成发送逻辑,con
go sendproc(node)
//todo 完成接收逻辑
go recvproc(node)
log.Printf("<-%d\n",userId)
sendMsg(userId,[]byte("hello,world!"))
}
// 添加新的群ID到用户的groupset中
func AddGroupId(userId,gid int64){
//取得node
rwlocker.Lock()
node,ok := clientMap[userId]
if ok{
node.GroupSets.Add(gid)
}
//clientMap[userId] = node
rwlocker.Unlock()
//添加gid到set
}
// ws发送协程
func sendproc(node *Node) {
for {
select {
case data:= <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage,data)
if err!=nil{
log.Println(err.Error())
return
}
}
}
}
// ws接收协程
func recvproc(node *Node) {
for{
_,data,err := node.Conn.ReadMessage()
if err!=nil{
log.Println(err.Error())
return
}
//dispatch(data)
//把消息广播到局域网
broadMsg(data)
log.Printf("[ws]<=%s\n",data)
}
}
func init(){
go udpsendproc()
go udprecvproc()
}
// 用来存放发送的要广播的数据
var udpsendchan chan []byte=make(chan []byte,1024)
// 将消息广播到局域网
func broadMsg(data []byte){
udpsendchan<-data
}
// 完成udp数据的发送协程
func udpsendproc(){
log.Println("start udpsendproc")
//todo 使用udp协议拨号
con,err:=net.DialUDP("udp",nil,
&net.UDPAddr{
IP:net.IPv4(192,168,0,255), // 这里代表网段,这个可以在部署的时候,抽离出去配置
Port:3000,
})
defer con.Close()
if err!=nil{
log.Println(err.Error())
return
}
// 通过的到的con发送消息
// con.Write()
for{
select {
case data := <- udpsendchan:
_,err=con.Write(data)
if err!=nil{
log.Println(err.Error())
return
}
}
}
}
// 完成upd接收并处理功能
func udprecvproc(){
log.Println("start udprecvproc")
//todo 监听udp广播端口
con,err:=net.ListenUDP("udp",&net.UDPAddr{
IP:net.IPv4zero,
Port:3000,
})
defer con.Close()
if err!=nil{log.Println(err.Error())}
// 处理端口发过来的数据
for{
var buf [512]byte
n,err:=con.Read(buf[0:])
if err!=nil{
log.Println(err.Error())
return
}
//直接数据处理
dispatch(buf[0:n])
}
log.Println("stop updrecvproc")
}
// 后端调度逻辑处理
func dispatch(data[]byte){
//todo 解析data为message
msg := Message{}
err := json.Unmarshal(data,&msg)
if err!=nil{
log.Println(err.Error())
return
}
// 根据cmd对逻辑进行处理
switch msg.Cmd {
case CMD_SINGLE_MSG:
sendMsg(msg.Dstid,data)
case CMD_ROOM_MSG:
//todo 群聊转发逻辑
for _,v:= range clientMap{
if v.GroupSets.Has(msg.Dstid){
v.DataQueue<-data
}
}
case CMD_HEART:
//todo 一般啥都不做
}
}
// 发送消息
func sendMsg(userId int64,msg []byte) {
rwlocker.RLock()
node,ok:=clientMap[userId]
rwlocker.RUnlock()
if ok{
node.DataQueue<- msg
}
}
// 检测是否有效
func checkToken(userId int64,token string)bool{
//从数据库里面查询并比对
user := userService.Find(userId)
return user.Token==token
}
nginx 反向代理
upstream wsbackend {
server 192.168.0.102:8080;
server 192.168.0.100:8080;
hash $request_uri;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://wsbackend;
}
location ^~ /chat {
proxy_pass http://wsbackend;
proxy_connect_timeout 500s;
proxy_read_timeout 500s;
proxy_send_timeout 500s;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
}
}
注意,这里在 server 192.168.0.102; server 192.168.0.100; 这两台服务器上启动 chat.exe 程序
需要提前 go build 并进行相关部署,此处不在赘述,下面会有说明
打包部署
- 我们要打包应用,同时需要 asset 和 view 目录,这两个在 go build 中是不会被打包进去的
- 所以,我们要同时把两者和二进制程序一起进行部署
- 这里,我们写两个脚本文件,分别对应在 window 和 linux 平台的部署文件
- 这里说下,一般会借助 jenkins 来操作
1 )windows 下
rd /s/q release
md release
::go build -ldflags "-H windowsgui" -o chat.exe
go build -o chat.exe
COPY chat.exe release\
COPY favicon.ico release\favicon.ico
XCOPY asset\*.* release\asset\ /s /e
XCOPY view\*.* release\view\ /s /e
2 )Linux 下
#!/bin/sh
rm -rf ./release
mkdir release
go build -o chat
chmod +x ./chat
cp chat ./release/
cp favicon.ico ./release/
cp -arf ./asset ./release/
cp -arf ./view ./release/
- 注意,linux 下这里使用 nohup:
nohup ./chat >>./log.log 2>&1 &
3 ) 总结
- 上面两种是分别在不同平台手动部署的样例
- 实际上,我们如果借助jenkins只需要配置下linux下的相关命令即可
原文地址:https://blog.csdn.net/Tyro_java/article/details/140589919
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!