自学内容网 自学内容网

springboot简单集成t-io框架实现即时通讯


话不多说,直接上代码吧

先奉上官网地址文档:https://www.tiocloud.com/doc/tio/125?pageNumber=1
开源源码地址:https://gitee.com/tywo45/t-io

1、pom.xml添加相关依赖

<dependency>
    <groupId>org.t-io</groupId>
    <artifactId>tio-websocket-spring-boot-starter</artifactId>
    <version>3.6.0.v20200315-RELEASE</version>
</dependency>

版本我用的是3.6.0.v20200315-RELEASE这个,目前(2024年12月26日09:40:40)最高的版本(3.8.6.v20240801-RELEASE)好像需要jdk17以上了,不想升级jdk。

2、配置文件添加配置(application.yml)

tio:
  websocket:
    server:
      port: 9876
      heartbeat-timeout: 60000
      #是否支持集群,集群开启需要redis
    cluster:
      enabled: false
      redis:
        ip: 127.0.0.1
        port: 6379
        password: 123456
      # all: true
      # group: true
      # user: true
      # ip: true
      # channel: true

t-io是可以支持集群的,但是他很强大,一般中小型的项目单体就够了,喜欢集群就开启集群,配置redis,启动多个实例,做负载均衡就可以了。这里主要介绍单例模式。好了继续。。。

3、启动类添加注解

@EnableTioWebSocketServer

就是我们springboot的启动类,给你们看一个完整的吧

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.tio.websocket.starter.EnableTioWebSocketServer;

@SpringBootApplication
@EnableTioWebSocketServer
public class WebsocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebsocketApplication.class, args);
    }
}

到这里springboot简单集成t-io就已经完成了,启动项目就可以用了,简单吧。
但是怎么用呢?接下来就看我们的业务如何去使用

4、添加消息接收处理类,包括连接握手,消息发送都在这里面

import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.common.starter.annotation.TioServerMsgHandler;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.handler.IWsMsgHandler;

import java.util.Map;
import java.util.Objects;

/**
 * @author tanyaowu
 * 2017年6月28日 下午5:32:38
 */
@TioServerMsgHandler
@Component
public class ShowcaseWsMsgHandler implements IWsMsgHandler {
private static Logger log = LoggerFactory.getLogger(ShowcaseWsMsgHandler.class);

/**
 * 握手时走这个方法,业务可以在这里获取cookie,request参数等
 */
@Override
public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
String clientip = request.getClientIp();
String groupId = request.getParam("groupId");
String userId = request.getParam("userId");
Tio.bindUser(channelContext, userId);
log.info("收到来自{}的ws握手包\r\n{}", clientip, request.toString());
// 这里是用户发起连接的时候首先触发的方法,其实这里也可以去绑定用户,绑定组之类的业务
// 但是这里只是初始连接,不一定连接得上,所有我们在下面这个连接之后的方法去做绑定业务,具体
// 看自己的需求
return httpResponse;
}

/**
 * @param httpRequest
 * @param httpResponse
 * @param channelContext
 * @throws Exception
 * @author tanyaowu
 */
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
Map<String, Object[]> params = httpRequest.getParams();
String groupId = httpRequest.getParam("groupId");
String userId = httpRequest.getParam("userId");
//绑定到群组,后面会有群发
Tio.bindGroup(channelContext, groupId);

// 这里是连接成功之后触发的方法,在这里可以去绑定用户id,绑定群id等等
// 它是支持一人绑定多个群的,这里可以查询个人的所有群进行绑定,后续可以发送消息
log.info(msg);

}

/**
 * 字节消息(binaryType = arraybuffer)过来后会走这个方法
 */
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
return null;
}

/**
 * 当客户端发close flag时,会走这个方法
 */
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 这里是将断开的人移除
Tio.remove(channelContext, "receive close flag");
String userId = channelContext.userid;
// 这里是有人断开连接时触发的方法,可以做相关的业务处理
return null;
}

/*
 * 字符消息(binaryType = blob)过来后会走这个方法
 */
@Override
public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
HttpRequest httpRequest = wsSessionContext.getHandshakeRequest();//获取websocket握手包
if (log.isDebugEnabled()) {
log.debug("握手包:{}", httpRequest);
}

//log.info("收到ws消息:{}", text);

String userId = channelContext.userid;
// 这里收到消息后去做业务处理
return null;
}

}

到这里,集成才算是真正的完成,很简单,就是添加依赖,添加配置,添加业务处理类,做自己的业务处理。没了,简单的集成就完成了。
但是有些同学可能需要更深入的业务处理。
比如说:比如说我消息处理完之后我要做一些后续的业务,例如:聊天记录的处理,日志等等。
或者考虑到安全问题,有恶意连接的ip怎么处理?是否可以加入很名单等等。
如果有需要就加下面这个两个处理类。

5、实现WsServerAioListener类,自定义各类消息的前置或后续处理


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.WsServerAioListener;

/**
 * @author tanyaowu
 * 用户根据情况来完成该类的实现
 */
@Component
public class ShowcaseTioServerListener extends WsServerAioListener {
private static Logger log = LoggerFactory.getLogger(ShowcaseTioServerListener.class);



@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
super.onAfterConnected(channelContext, isConnected, isReconnect);
//if (log.isInfoEnabled()) {
//log.info("onAfterConnected\r\n{}", channelContext);
//}

}

@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
super.onAfterSent(channelContext, packet, isSentSuccess);
//if (log.isInfoEnabled()) {
//log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext);
//}
}

@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
super.onBeforeClose(channelContext, throwable, remark, isRemove);
if (log.isInfoEnabled()) {
log.info("onBeforeClose\r\n{}", channelContext);
}

WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
}

@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
super.onAfterDecoded(channelContext, packet, packetSize);
//if (log.isInfoEnabled()) {
//log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext);
//}
}

@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
super.onAfterReceivedBytes(channelContext, receivedBytes);
//if (log.isInfoEnabled()) {
//log.info("onAfterReceivedBytes\r\n{}", channelContext);
//}
}

@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
super.onAfterHandled(channelContext, packet, cost);
//if (log.isInfoEnabled()) {
//log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext);
//}
}

}

6、继承IpStatListener类,实现ip监控,实现相关业务


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.stat.IpStat;
import org.tio.core.stat.IpStatListener;

/**
 * 
 * @author tanyaowu
 *
 */
@Component
public class ShowcaseIpStatListener implements IpStatListener {
@SuppressWarnings("unused")
private static Logger log = LoggerFactory.getLogger(ShowcaseIpStatListener.class);

public static final ShowcaseIpStatListener me = new ShowcaseIpStatListener();

/**
 * 
 */
private ShowcaseIpStatListener() {
}

@Override
public void onExpired(TioConfig tioConfig, IpStat ipStat) {
//在这里把统计数据入库中或日志
//if (log.isInfoEnabled()) {
//log.info("可以把统计数据入库\r\n{}", Json.toFormatedJson(ipStat));
//}
}

@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterConnected\r\n{}", Json.toFormatedJson(ipStat));
//}
}

@Override
public void onDecodeError(ChannelContext channelContext, IpStat ipStat) {
//if (log.isInfoEnabled()) {
//log.info("onDecodeError\r\n{}", Json.toFormatedJson(ipStat));
//}
}

@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//}
}

@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//}
}

@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterReceivedBytes\r\n{}", Json.toFormatedJson(ipStat));
//}
}

@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, IpStat ipStat, long cost) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//}
}

}

好了,到这里算是真真正正的完成了,简单的基础就这样。当然还有很多的东西,可以自己去官网了解。这里就不做更多介绍。
接下来看一下如何使用吧。

7、启动项目并进行连接

看到下面这个,说明启动成功。

在这里插入图片描述

接下来我们找一个在线测试工具试一下:
推荐两个:
http://www.websocket-test.com/
http://wstool.js.org/
这两个各有优劣
第一个不能定时发心跳,第二个可以发心跳
但是第二个好像只能连接本地的127.0.0.1,连192.168.. 就连不了

我们用第二个看一下吧
在这里插入图片描述
这个就是连接成功的
我的连接地址是:
ws://127.0.0.1:9876?userId=1&groupId=178979558244139008
后面的参数可以自己带,就跟get 请求一样,拼接在url后面就可以了。我还定时3秒钟发了一个心跳。

8、客户端如何发送消息

我相信很多同学都有我一样的问题,因为我们集成这个不一定是做聊天工具,一个发,一个收。
很多时候我们都是通过服务端主动给某个人发的,没有所谓的发送,或者有些是通过http接口去发送的消息。那我们怎么发送呢?
接下来我们写一个专门的消息收发的业务处理类

@Component
@Slf4j
public class MsgHandel {
// 注入bean 这个非常重要,后面所有的消息发送,包括单发给个人,还是群发都需要这个
@Autowired
    private TioWebSocketServerBootstrap tioWebSocketServerBootstrap;


 // 发送消息,单发给某个人,这里的userId 就是我们之前在用户连接的时候绑定的用户Id
 // 陷入回忆:Tio.bindUser(channelContext, userId);
    public void sendObject(String userId, Object object) {
        WsResponse wsResponse = WsResponse.fromText(JSONObject.toJSONString(object), CHARSET);
        //单发
        Tio.sendToUser(tioWebSocketServerBootstrap.getServerTioConfig(), userId, wsResponse);
    }

//群发,这个的groupId 也就是我们刚刚绑定的群Id
// 再次陷入回忆:Tio.bindGroup(channelContext, groupId);
// 当然前提是你前端连接的时候带了群Id或者自己定义了群Id
public void sendOnlineNum(String groupId) {
// 包装消息体
        JSONObject bodyJsonObject = new JSONObject();
        bodyJsonObject.put("data", "所有人,在吗?在的请举手");
        // 群发最新在线人
        WsResponse wsResponse = WsResponse.fromText(bodyJsonObject.toJSONString(), CHARSET);
        //群发
        Tio.sendToGroup(tioWebSocketServerBootstrap.getServerTioConfig(), groupId, wsResponse);
    }

}

9、最后啰嗦,问题探讨

到这里就结束了,最后啰嗦一点吧,这个也是我当前没有完善的部分,有好的解决办法的可以评论区支支招。
现在很多项目都是微服务,我们刚刚这种是直接调的本服务的ip+端口的方式。因为t-io 底层是使用netty写的,大家都知道,netty是单独监听一个端口的,我们这个也是一样,那么我们在启动项目的时候,不知道大家有没有发现,我们springboot项目也监听了一个端口,相当于我们的一个项目启动了两个实例,而且微服务注册到nacos注册中心的是springboot监听的端口。
这样我们做服务访问,或者网关路由转发的时候就不能转发到我们的 t-io 监听的端口上面来。
我想了几个方法,但是都不太如意,最后还是使用单体了,体量不大也能够用。

1、手动注册一个服务到nacos

package ren.chemi3.tio;

import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.tio.server.ServerTioConfig;
import org.tio.websocket.server.WsServerStarter;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;

/**
 * websocket 配置类
 */
@Configuration
public class WebSocketConfig {

    @Resource
    private NacosDiscoveryProperties nacosDiscoveryProperties;
    @Value("${tio.websocket.server.port}")
    private Integer port;


    /**
     * 将服务注册进Nacos
     *
     */
    @PostConstruct
    public void registerNamingService() {
        try {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
            properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
            NamingService namingService = NamingFactory.createNamingService(properties);
            InetAddress address = InetAddress.getLocalHost();
            namingService.registerInstance("chemi3-websocket-im", address.getHostAddress(), port);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

上面我手动注册了一个名为chemi3-websocket-im 的服务名称到注册中心,嗯,注册成功,没有问题
在这里插入图片描述
我还启动了两个
在这里插入图片描述
看似没有问题。接下来就是没在网关了

routes:
 - id: chemi3-websocket-im
   # uri: ws://127.0.0.1:9876 
   uri: lb:ws://chemi3-websocket-im
   predicates: 
     - Path=/ws/**
   filters:
     - StripPrefix=1

这样应该没问题吧,按道理,我现在访问 ws://ip:网关端口/ws 应该可以连接才对,但是那就是不能连接,我也不知道什么问题。
那就试试第二种方法,第二种方法好像行,但是好像又不太行。

2、直接通过网关配置真实地址做负载,不走服务名

项目启动不变,修改一下网关配置

routes:
 - id: chemi3-websocket-im
   uri: ws://127.0.0.1:9876
   predicates: 
     - Path=/ws/**
      - Weight=group1,50
   filters:
     - StripPrefix=1

  - id: chemi3-websocket-im1
    uri: ws://127.0.0.1:9875
    predicates: 
     - Path=/ws/**
      - Weight=group1,50
    filters:
      - StripPrefix=1

这样就是通过网关来做负载均衡,好像是可以的,但是感觉不太稳妥。而且这样如果再启动更多的服务就只能在这里加配置,ip和端口写死。我没有具体去做线上的实际使用,你们可以试一下。


结束


原文地址:https://blog.csdn.net/qq_38227017/article/details/144734302

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!