自学内容网 自学内容网

Go: IM系统接入ws进行消息发送以及群聊功能 (5)

概述

  • 在即时通讯(IM)系统中,实现多媒体消息(如文本、表情包、拍照、图片、音频、视频)的实时传输是一项核心功能
  • 随着HTML5和WebSocket技术的发展,现代Web应用能够支持更高效、更实时的通信方式
  • 本文将详细探讨如何使用Go语言结合WebSocket技术,在IM系统中实现多媒体消息的发送和接收

基于MVC的目录设计

im-project
├── go.mod
├── main.go          主程序
├── ctrl             控制器层
│     └── chat.go
├── views            模板层
│     └── chat
│           ├── foot.html
│           └── x.html

主程序

main.go 核心代码

package main

import (
"net/http"
"im-project/ctrl"
)

func main() {
// 1. 绑定请求和处理函数
http.HandleFunc("/chat", ctrl.Chat)
http.HandleFunc("/attach/upload", ctrl.Upload)
// 2. 指定目录的静态文件
http.Handle("/asset/",http.FileServer(http.Dir(".")))
http.Handle("/mnt/",http.FileServer(http.Dir(".")))
// 3. 启动
http.ListenAndServe(":80",nil)
}

控制器

ctrl/attach.go

package ctrl

import (
"net/http"
"im-project/util"
"os"
"strings"
"fmt"
"time"
"math/rand"
"io"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
)

func init(){
os.MkdirAll("./mnt",os.ModePerm)
}

func Upload(w http.ResponseWriter, r *http.Request){
//UploadLocal(w,r)
UploadOss(w,r)
}

// 1.存储位置 ./mnt,需要确保已经创建好
// 2.url格式 /mnt/xxxx.png  需要确保网络能访问/mnt/
func UploadLocal(writer http.ResponseWriter, request * http.Request) {
// 获得上传的源文件
    srcfile,head,err:=request.FormFile("file")
    if err!=nil{
    util.RespFail(writer,err.Error())
}
// 创建一个新文件
suffix := ".png"
// 如果前端文件名称包含后缀 xx.xx.png
ofilename := head.Filename
tmp := strings.Split(ofilename,".")
if len(tmp)>1 {
suffix = "."+tmp[len(tmp)-1]
}
// 如果前端指定filetype
// formdata.append("filetype",".png")
filetype := request.FormValue("filetype")
if len(filetype)>0 {
suffix = filetype
}
// time.Now().Unix()
    filename := fmt.Sprintf("%d%04d%s", time.Now().Unix(), rand.Int31(), suffix)
    dstfile,err:= os.Create("./mnt/"+filename)
    if err!=nil {
    util.RespFail(writer,err.Error())
    return
}
// todo 将源文件内容copy到新文件
_,err = io.Copy(dstfile,srcfile)
if err!=nil{
util.RespFail(writer,err.Error())
return
}
// 将新文件路径转换成url地址
url := "/mnt/"+filename
// 响应到前端
util.RespOk(writer,url,"")
}

// 即将删掉,定期更新
const (
AccessKeyId="5p2RZ******nMuQw9" // 填入自己的 key
AccessKeySecret="bsNmjU8Au08*****S5XIFAkK" // 填入自己的secret
EndPoint="oss-cn-shenzhen.aliyuncs.com"
Bucket="winliondev"
)

// 权限设置为公共读状态
// 需要安装
func UploadOss(writer http.ResponseWriter, request * http.Request) {
// 获得上传的文件
srcfile,head,err := request.FormFile("file")
if err!=nil {
util.RespFail(writer,err.Error())
return
}
// 获得文件后缀.png/.mp3
suffix := ".png"
//如果前端文件名称包含后缀 xx.xx.png
ofilename := head.Filename
tmp := strings.Split(ofilename,".")
if len(tmp)>1 {
suffix = "."+tmp[len(tmp)-1]
}
// 如果前端指定filetype
// formdata.append("filetype",".png")
filetype := request.FormValue("filetype")
if len(filetype)>0{
suffix = filetype
}
// 初始化ossclient
client,err:=oss.New(EndPoint,AccessKeyId,AccessKeySecret)
if err!=nil{
util.RespFail(writer,err.Error())
return
}
// todo 获得bucket
bucket,err := client.Bucket(Bucket)
if err!=nil{
util.RespFail(writer,err.Error())
return
}
// 设置文件名称
// time.Now().Unix()
filename := fmt.Sprintf("mnt/%d%04d%s", time.Now().Unix(), rand.Int31(), suffix)
// 通过bucket上传
err = bucket.PutObject(filename, srcfile)
if err!=nil {
util.RespFail(writer,err.Error())
return
}
// 获得url地址
url := "http://"+Bucket+"."+EndPoint+"/"+filename
// 响应到前端
util.RespOk(writer,url,"")
}

ctrl/chat.go

package ctrl

import (
"net/http"
"github.com/gorilla/websocket"
"gopkg.in/fatih/set.v0"
"sync"
"strconv"
"log"
"fmt"
"encoding/json"
)

const (
CMD_SINGLE_MSG = 10
CMD_ROOM_MSG   = 11
CMD_HEART      = 0
)

type Message struct {
Id      int64  `json:"id,omitempty" form:"id"` //消息ID
Userid  int64  `json:"userid,omitempty" form:"userid"` //谁发的
Cmd     int    `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
Dstid   int64  `json:"dstid,omitempty" form:"dstid"`//对端用户ID/群ID
Media   int    `json:"media,omitempty" form:"media"` //消息按照什么样式展示
Content string `json:"content,omitempty" form:"content"` //消息的内容
Pic     string `json:"pic,omitempty" form:"pic"` //预览图片
Url     string `json:"url,omitempty" form:"url"` //服务的URL
Memo    string `json:"memo,omitempty" form:"memo"` //简单描述
Amount  int    `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
/**
消息发送结构体
1、MEDIA_TYPE_TEXT
{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
2、MEDIA_TYPE_News
{id:1,userid:2,dstid:3,cmd:10,media:2,content:"标题",pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/dsturl","memo":"这是描述"}
3、MEDIA_TYPE_VOICE,amount单位秒
{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
4、MEDIA_TYPE_IMG
{id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
5、MEDIA_TYPE_REDPACKAGR //红包amount 单位分
{id:1,userid:2,dstid:3,cmd:10,media:5,url:"http://www.baidu.com/a/b/c/redpackageaddress?id=100000","amount":300,"memo":"恭喜发财"}
6、MEDIA_TYPE_EMOJ 6
{id:1,userid:2,dstid:3,cmd:10,media:6,"content":"cry"}
7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

7、MEDIA_TYPE_Link 6
{id:1,userid:2,dstid:3,cmd:10,media:7,"url":"http://www.a,com/dsturl.html"}

8、MEDIA_TYPE_VIDEO 8
{id:1,userid:2,dstid:3,cmd:10,media:8,pic:"http://www.baidu.com/a/log,jpg",url:"http://www.a,com/a.mp4"}

9、MEDIA_TYPE_CONTACT 9
{id:1,userid:2,dstid:3,cmd:10,media:9,"content":"10086","pic":"http://www.baidu.com/a/avatar,jpg","memo":"胡大力"}

*/

// 本核心在于形成userid和Node的映射关系
type Node struct {
Conn *websocket.Conn
//并行转串行,
DataQueue chan []byte
GroupSets set.Interface
}
// 映射关系表
var clientMap map[int64]*Node = make(map[int64]*Node,0)
// 读写锁
var rwlocker sync.RWMutex

// ws://127.0.0.1/chat?id=1&token=xxxx
func Chat(writer http.ResponseWriter, request *http.Request) {

// 检验接入是否合法
    // checkToken(userId int64, token string)
    query := request.URL.Query()
    id := query.Get("id")
    token := query.Get("token")
    userId ,_ := strconv.ParseInt(id,10,64)
isvalida := checkToken(userId,token)
// 如果 isvalida = true
// isvalida = false
conn,err :=(&websocket.Upgrader {
CheckOrigin: func(r *http.Request) bool {
return isvalida
},
}).Upgrade(writer,request,nil)

if err!=nil {
log.Println(err.Error())
return
}
//  获得conn
node := &Node {
Conn:conn,
DataQueue:make(chan []byte,50),
GroupSets:set.New(set.ThreadSafe),
}
// 获取用户全部群Id
comIds := contactService.SearchComunityIds(userId)
for _,v:=range comIds {
node.GroupSets.Add(v)
}
// userid和node形成绑定关系
rwlocker.Lock()
clientMap[userId] = node
rwlocker.Unlock()
// 完成发送逻辑, con
go sendproc(node)
// 完成接收逻辑
go recvproc(node)
sendMsg(userId,[]byte("hello,world!"))
}

//发送协程
func sendproc(node *Node) {
for {
select {
case data:= <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage,data)
    if err!=nil{
    log.Println(err.Error())
    return
}
}
}
}
// 添加新的群ID到用户的groupset中
func AddGroupId(userId,gid int64) {
// 取得node
rwlocker.Lock()
node,ok := clientMap[userId]
if ok {
node.GroupSets.Add(gid)
}
// clientMap[userId] = node
rwlocker.Unlock()
// 添加gid到set
}
// 接收协程
func recvproc(node *Node) {
for {
_,data,err := node.Conn.ReadMessage()
if err!=nil {
log.Println(err.Error())
return
}
// 对data进一步处理
dispatch(data)
fmt.Printf("recv<=%s",data)
}
}
//后端调度逻辑处理
func dispatch(data[]byte) {
// 解析data为message
msg := Message{}
err := json.Unmarshal(data, &msg)
if err!=nil {
log.Println(err.Error())
return
}
// 根据cmd对逻辑进行处理
switch msg.Cmd {
case CMD_SINGLE_MSG:
sendMsg(msg.Dstid, data)
case CMD_ROOM_MSG:
// 群聊转发逻辑
for _,v:= range clientMap {
if v.GroupSets.Has(msg.Dstid){
v.DataQueue <- data
}
}
case CMD_HEART:
// 一般啥都不做
}
}

// 发送消息
func sendMsg(userId int64,msg []byte) {
rwlocker.RLock()
node,ok := clientMap[userId]
rwlocker.RUnlock()
if ok {
node.DataQueue<- msg
}
}

// 检测是否有效
func checkToken(userId int64,token string) bool {
// 从数据库里面查询并比对
user := userService.Find(userId)
return user.Token == token
}

视图层


这里,视图层要着重说明下

1 )发送文本

sendtxtmsg:function(txt) {
    //{id:1,userid:2,dstid:3,cmd:10,media:1,content:"hello"}
    var msg =this.createmsgcontext();
    msg.media=1;
    msg.content=txt;
    this.showmsg(userInfo(),msg);
    this.webSocket.send(JSON.stringify(msg))
}
  • 前端user1拼接好数据对象Message msg={id:1,userid:2,dstid:3,cmd:10,media:1,content:txt}
  • 转化成json字符串jsonstr : jsonstr = JSON.stringify(msg)
  • 通过websocket.send(jsonstr)发送, 后端S在recvproc中接收收数据data
  • 并做相应的逻辑处理dispatch(data)-转发给user2
  • user2通过websocket.onmessage收到消息后做解析并显示

2 )发送表情包

{
loaddoutures:function(){
      var res=[];
      var config = this.doutu.config;
      for(var i in config.pkgids){
          res[config.pkgids[i]]= (config.baseurl+"/"+config.pkgids[i]+"/info.json")
      }
      var that = this;
      for(var id in res){
          //console.log("res[i]",id,res[id])
          post(res[id],{},function(pkginfo){
              //console.log("post res[i]",id,res[id],pkginfo)
              var baseurl= config.baseurl+"/"+pkginfo.id+"/"
              for(var j in pkginfo.assets){
                  pkginfo.assets[j] = baseurl+pkginfo.assets[j];
              }
              pkginfo.icon = baseurl + pkginfo.icon;
              that.doutu.packages.push(pkginfo)
              if(that.doutu.choosed.pkgid==pkginfo.id){
                  that.doutu.choosed.assets=pkginfo.assets;
              }

          })
      }
  },
}
  • 表情包简单逻辑:弹出一个窗口, 选择图片获得一个连接地址
  • 调用sendpicmsg方法开始发送流程

3 ) 拍照

3.1 照片

<input accept=\"image/gif,image/jpeg,,image/png\" type=\"file\" οnchange=\"upload(this)\" class='upload' />

3.2 拍照

<input accept=\"image/*\" capture=\"camera\" type=\"file\" οnchange=\"upload(this)\" class='upload' />
function upload(dom){
    uploadfile("attach/upload",dom,function(res){
        if(res.code==0){
            app.sendpicmsg(res.data)
        }
    })
}

function uploadfile(uri,dom,fn){
    var xhr = new XMLHttpRequest();
    xhr.open("POST","//"+location.host+"/"+uri, true);
    // 添加http头,发送信息至服务器时内容编码类型
    xhr.onreadystatechange = function() {
        if (xhr.readyState == 4 && (xhr.status == 200 || xhr.status == 304)) {
            fn.call(this, JSON.parse(xhr.responseText));
        }
    };
    var _data=[];
    var formdata = new FormData();
    if(!! userId()){
        formdata.append("userid",userId());
    }
    formdata.append("file",dom.files[0])
    xhr.send(formdata);
}

// vue methods 中的发送图片方法
{
sendpicmsg:function(picurl){
// {id:1,userid:2,dstid:3,cmd:10,media:4,url:"http://www.baidu.com/a/log,jpg"}
        var msg =this.createmsgcontext();
        msg.media=4;
        msg.url=picurl;
        this.showmsg(userInfo(),msg)
        this.webSocket.send(JSON.stringify(msg))
    },
}
  • 发送图片/拍照, 弹出一个窗口,
  • 选择图片,上传到服务器, 获得一个链接地址
  • 调用sendpicmsg方法开始发送流程

4 )音视频

// 上传语音
function uploadblob(uri,blob,filetype,fn){
    var xhr = new XMLHttpRequest();
    xhr.open("POST","//"+location.host+"/"+uri, true);
    // 添加http头,发送信息至服务器时内容编码类型
    xhr.onreadystatechange = function() {
        if (xhr.readyState == 4 && (xhr.status == 200 || xhr.status == 304)) {
            fn.call(this, JSON.parse(xhr.responseText));
        }
    };
    var _data=[];
    var formdata = new FormData();
    formdata.append("filetype",filetype);
    formdata.append("file",blob)
    xhr.send(formdata);
}

// vue methods 中的方法
{
playaudio:function(url) {
      document.getElementById('audio4play').src = url;
      document.getElementById('audio4play').play();
    },
    startrecorder:function(){
      let audioTarget = document.getElementById('audio');
      var types = ["video/webm",
          "audio/webm",
          "video/webm\;codecs=vp8",
          "video/webm\;codecs=daala",
          "video/webm\;codecs=h264",
          "audio/webm\;codecs=opus",
          "video/mpeg"];
      var suporttype ="";
      for (var i in types) {
          if(MediaRecorder.isTypeSupported(types[i])){
              suporttype = types[i];
          }
      }
      if(!suporttype){
          mui.toast("编码不支持")
          return ;
      }
      this.duration = new Date().getTime();
      navigator.mediaDevices.getUserMedia({audio: true, video: false})
      .then(function(stream){
          this.showprocess = true
          this.recorder = new MediaRecorder(stream);
          audioTarget.srcObject = stream;

          this.recorder.ondataavailable = (event) => {
              console.log("ondataavailable");
              uploadblob("attach/upload",event.data,".mp3",res=>{
                  var duration = Math.ceil((new Date().getTime()-this.duration)/1000);
                  this.sendaudiomsg(res.data,duration);
              })
              stream.getTracks().forEach(function (track) {
                  track.stop();
              });
              this.showprocess = false
          }
          this.recorder.start();
      }.bind(this))
       .catch(function(err){
          console.log(err)
          mui.toast(err)
          this.showprocess = false
      }.bind(this));
  },
  stoprecorder :function() {
      if(typeof this.recorder.stop=="function"){
          this.recorder.stop();
      }
      this.showprocess = false
      console.log("stoprecorder")

  },
  sendaudiomsg:function(url,num) {
  //{id:1,userid:2,dstid:3,cmd:10,media:3,url:"http://www.a,com/dsturl.mp3",anount:40}
    var msg = this.createmsgcontext();
    msg.media = 3;
    msg.url = url;
    msg.amount = num;
    this.showmsg(userInfo(), msg)
    // console.log("sendaudiomsg",this.msglist);
    this.webSocket.send(JSON.stringify(msg))
  },
}

5 )单聊

{
singlemsg:function(user){
    //console.log(user)
    this.win = "single";
    this.title = "和"+user.nickname+"聊天中";
    this.msgcontext.dstid = parseInt(user.id);
    this.msgcontext.cmd = 10;
},
}

6 )群聊

// vue methods 中的方法
{
groupmsg: function(group){
        this.win = "group";
        this.title=group.name;
        this.msgcontext.dstid = parseInt(group.id);
        this.msgcontext.cmd = 11;
    },
}
  • 群聊原理: 分析群id,找到加了这个群的用户,把消息发送过去
  • 方案一:map<qunid1,qunid2,qunid3>
    • 优势是锁的频次低
    • 劣势是要轮训全部map
      type Node struct {
      Conn *websocket.Conn
      //并行转串行,
      DataQueue chan []byte
      GroupSets set.Interface
      }
      // 映射关系表
      var clientMap map[int64]*Node = make(map[int64]*Node,0)
      
  • 方案二: map<群id><userid1,userid2,userid3>
    • 优势是找用户ID非常快
    • 劣势是发送信息时需要根据userid获取node,锁的频次太高
      type Node struct {
      Conn *websocket.Conn
      //并行转串行,
      DataQueue chan []byte
      }
      // 映射关系表
      var clientMap map[int64]*Node = make(map[int64]*Node,0)
      var comMap map[int64]set.Interface= make(map[int64]set.Interface,0)
      
  • 需要处理的问题
    • 当用户接入的时候初始化groupset
    • 当用户加入群的时候刷新groupset
    • 完成信息分发

原文地址:https://blog.csdn.net/Tyro_java/article/details/140559275

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!