springboot简单集成t-io框架实现即时通讯
springboot简单集成t-io框架实现即时通讯
话不多说,直接上代码吧
先奉上官网地址文档:https://www.tiocloud.com/doc/tio/125?pageNumber=1
开源源码地址:https://gitee.com/tywo45/t-io
1、pom.xml添加相关依赖
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-spring-boot-starter</artifactId>
<version>3.6.0.v20200315-RELEASE</version>
</dependency>
版本我用的是3.6.0.v20200315-RELEASE这个,目前(2024年12月26日09:40:40)最高的版本(3.8.6.v20240801-RELEASE)好像需要jdk17以上了,不想升级jdk。
2、配置文件添加配置(application.yml)
tio:
websocket:
server:
port: 9876
heartbeat-timeout: 60000
#是否支持集群,集群开启需要redis
cluster:
enabled: false
redis:
ip: 127.0.0.1
port: 6379
password: 123456
# all: true
# group: true
# user: true
# ip: true
# channel: true
t-io是可以支持集群的,但是他很强大,一般中小型的项目单体就够了,喜欢集群就开启集群,配置redis,启动多个实例,做负载均衡就可以了。这里主要介绍单例模式。好了继续。。。
3、启动类添加注解
@EnableTioWebSocketServer
就是我们springboot的启动类,给你们看一个完整的吧
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.tio.websocket.starter.EnableTioWebSocketServer;
@SpringBootApplication
@EnableTioWebSocketServer
public class WebsocketApplication {
public static void main(String[] args) {
SpringApplication.run(WebsocketApplication.class, args);
}
}
到这里springboot简单集成t-io就已经完成了,启动项目就可以用了,简单吧。
但是怎么用呢?接下来就看我们的业务如何去使用
4、添加消息接收处理类,包括连接握手,消息发送都在这里面
import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.common.starter.annotation.TioServerMsgHandler;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.handler.IWsMsgHandler;
import java.util.Map;
import java.util.Objects;
/**
* @author tanyaowu
* 2017年6月28日 下午5:32:38
*/
@TioServerMsgHandler
@Component
public class ShowcaseWsMsgHandler implements IWsMsgHandler {
private static Logger log = LoggerFactory.getLogger(ShowcaseWsMsgHandler.class);
/**
* 握手时走这个方法,业务可以在这里获取cookie,request参数等
*/
@Override
public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
String clientip = request.getClientIp();
String groupId = request.getParam("groupId");
String userId = request.getParam("userId");
Tio.bindUser(channelContext, userId);
log.info("收到来自{}的ws握手包\r\n{}", clientip, request.toString());
// 这里是用户发起连接的时候首先触发的方法,其实这里也可以去绑定用户,绑定组之类的业务
// 但是这里只是初始连接,不一定连接得上,所有我们在下面这个连接之后的方法去做绑定业务,具体
// 看自己的需求
return httpResponse;
}
/**
* @param httpRequest
* @param httpResponse
* @param channelContext
* @throws Exception
* @author tanyaowu
*/
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
Map<String, Object[]> params = httpRequest.getParams();
String groupId = httpRequest.getParam("groupId");
String userId = httpRequest.getParam("userId");
//绑定到群组,后面会有群发
Tio.bindGroup(channelContext, groupId);
// 这里是连接成功之后触发的方法,在这里可以去绑定用户id,绑定群id等等
// 它是支持一人绑定多个群的,这里可以查询个人的所有群进行绑定,后续可以发送消息
log.info(msg);
}
/**
* 字节消息(binaryType = arraybuffer)过来后会走这个方法
*/
@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
return null;
}
/**
* 当客户端发close flag时,会走这个方法
*/
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 这里是将断开的人移除
Tio.remove(channelContext, "receive close flag");
String userId = channelContext.userid;
// 这里是有人断开连接时触发的方法,可以做相关的业务处理
return null;
}
/*
* 字符消息(binaryType = blob)过来后会走这个方法
*/
@Override
public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
HttpRequest httpRequest = wsSessionContext.getHandshakeRequest();//获取websocket握手包
if (log.isDebugEnabled()) {
log.debug("握手包:{}", httpRequest);
}
//log.info("收到ws消息:{}", text);
String userId = channelContext.userid;
// 这里收到消息后去做业务处理
return null;
}
}
到这里,集成才算是真正的完成,很简单,就是添加依赖,添加配置,添加业务处理类,做自己的业务处理。没了,简单的集成就完成了。
但是有些同学可能需要更深入的业务处理。
比如说:比如说我消息处理完之后我要做一些后续的业务,例如:聊天记录的处理,日志等等。
或者考虑到安全问题,有恶意连接的ip怎么处理?是否可以加入很名单等等。
如果有需要就加下面这个两个处理类。
5、实现WsServerAioListener类,自定义各类消息的前置或后续处理
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.WsServerAioListener;
/**
* @author tanyaowu
* 用户根据情况来完成该类的实现
*/
@Component
public class ShowcaseTioServerListener extends WsServerAioListener {
private static Logger log = LoggerFactory.getLogger(ShowcaseTioServerListener.class);
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
super.onAfterConnected(channelContext, isConnected, isReconnect);
//if (log.isInfoEnabled()) {
//log.info("onAfterConnected\r\n{}", channelContext);
//}
}
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
super.onAfterSent(channelContext, packet, isSentSuccess);
//if (log.isInfoEnabled()) {
//log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext);
//}
}
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
super.onBeforeClose(channelContext, throwable, remark, isRemove);
if (log.isInfoEnabled()) {
log.info("onBeforeClose\r\n{}", channelContext);
}
WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();
}
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
super.onAfterDecoded(channelContext, packet, packetSize);
//if (log.isInfoEnabled()) {
//log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext);
//}
}
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
super.onAfterReceivedBytes(channelContext, receivedBytes);
//if (log.isInfoEnabled()) {
//log.info("onAfterReceivedBytes\r\n{}", channelContext);
//}
}
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
super.onAfterHandled(channelContext, packet, cost);
//if (log.isInfoEnabled()) {
//log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext);
//}
}
}
6、继承IpStatListener类,实现ip监控,实现相关业务
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.stat.IpStat;
import org.tio.core.stat.IpStatListener;
/**
*
* @author tanyaowu
*
*/
@Component
public class ShowcaseIpStatListener implements IpStatListener {
@SuppressWarnings("unused")
private static Logger log = LoggerFactory.getLogger(ShowcaseIpStatListener.class);
public static final ShowcaseIpStatListener me = new ShowcaseIpStatListener();
/**
*
*/
private ShowcaseIpStatListener() {
}
@Override
public void onExpired(TioConfig tioConfig, IpStat ipStat) {
//在这里把统计数据入库中或日志
//if (log.isInfoEnabled()) {
//log.info("可以把统计数据入库\r\n{}", Json.toFormatedJson(ipStat));
//}
}
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterConnected\r\n{}", Json.toFormatedJson(ipStat));
//}
}
@Override
public void onDecodeError(ChannelContext channelContext, IpStat ipStat) {
//if (log.isInfoEnabled()) {
//log.info("onDecodeError\r\n{}", Json.toFormatedJson(ipStat));
//}
}
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//}
}
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//}
}
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes, IpStat ipStat) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterReceivedBytes\r\n{}", Json.toFormatedJson(ipStat));
//}
}
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, IpStat ipStat, long cost) throws Exception {
//if (log.isInfoEnabled()) {
//log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//}
}
}
好了,到这里算是真真正正的完成了,简单的基础就这样。当然还有很多的东西,可以自己去官网了解。这里就不做更多介绍。
接下来看一下如何使用吧。
7、启动项目并进行连接
看到下面这个,说明启动成功。
接下来我们找一个在线测试工具试一下:
推荐两个:
http://www.websocket-test.com/
http://wstool.js.org/
这两个各有优劣
第一个不能定时发心跳,第二个可以发心跳
但是第二个好像只能连接本地的127.0.0.1,连192.168.. 就连不了
我们用第二个看一下吧
这个就是连接成功的
我的连接地址是:
ws://127.0.0.1:9876?userId=1&groupId=178979558244139008
后面的参数可以自己带,就跟get 请求一样,拼接在url后面就可以了。我还定时3秒钟发了一个心跳。
8、客户端如何发送消息
我相信很多同学都有我一样的问题,因为我们集成这个不一定是做聊天工具,一个发,一个收。
很多时候我们都是通过服务端主动给某个人发的,没有所谓的发送,或者有些是通过http接口去发送的消息。那我们怎么发送呢?
接下来我们写一个专门的消息收发的业务处理类
@Component
@Slf4j
public class MsgHandel {
// 注入bean 这个非常重要,后面所有的消息发送,包括单发给个人,还是群发都需要这个
@Autowired
private TioWebSocketServerBootstrap tioWebSocketServerBootstrap;
// 发送消息,单发给某个人,这里的userId 就是我们之前在用户连接的时候绑定的用户Id
// 陷入回忆:Tio.bindUser(channelContext, userId);
public void sendObject(String userId, Object object) {
WsResponse wsResponse = WsResponse.fromText(JSONObject.toJSONString(object), CHARSET);
//单发
Tio.sendToUser(tioWebSocketServerBootstrap.getServerTioConfig(), userId, wsResponse);
}
//群发,这个的groupId 也就是我们刚刚绑定的群Id
// 再次陷入回忆:Tio.bindGroup(channelContext, groupId);
// 当然前提是你前端连接的时候带了群Id或者自己定义了群Id
public void sendOnlineNum(String groupId) {
// 包装消息体
JSONObject bodyJsonObject = new JSONObject();
bodyJsonObject.put("data", "所有人,在吗?在的请举手");
// 群发最新在线人
WsResponse wsResponse = WsResponse.fromText(bodyJsonObject.toJSONString(), CHARSET);
//群发
Tio.sendToGroup(tioWebSocketServerBootstrap.getServerTioConfig(), groupId, wsResponse);
}
}
9、最后啰嗦,问题探讨
到这里就结束了,最后啰嗦一点吧,这个也是我当前没有完善的部分,有好的解决办法的可以评论区支支招。
现在很多项目都是微服务,我们刚刚这种是直接调的本服务的ip+端口的方式。因为t-io 底层是使用netty写的,大家都知道,netty是单独监听一个端口的,我们这个也是一样,那么我们在启动项目的时候,不知道大家有没有发现,我们springboot项目也监听了一个端口,相当于我们的一个项目启动了两个实例,而且微服务注册到nacos注册中心的是springboot监听的端口。
这样我们做服务访问,或者网关路由转发的时候就不能转发到我们的 t-io 监听的端口上面来。
我想了几个方法,但是都不太如意,最后还是使用单体了,体量不大也能够用。
1、手动注册一个服务到nacos
package ren.chemi3.tio;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.tio.server.ServerTioConfig;
import org.tio.websocket.server.WsServerStarter;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
/**
* websocket 配置类
*/
@Configuration
public class WebSocketConfig {
@Resource
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Value("${tio.websocket.server.port}")
private Integer port;
/**
* 将服务注册进Nacos
*
*/
@PostConstruct
public void registerNamingService() {
try {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
NamingService namingService = NamingFactory.createNamingService(properties);
InetAddress address = InetAddress.getLocalHost();
namingService.registerInstance("chemi3-websocket-im", address.getHostAddress(), port);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
上面我手动注册了一个名为chemi3-websocket-im 的服务名称到注册中心,嗯,注册成功,没有问题
我还启动了两个
看似没有问题。接下来就是没在网关了
routes:
- id: chemi3-websocket-im
# uri: ws://127.0.0.1:9876
uri: lb:ws://chemi3-websocket-im
predicates:
- Path=/ws/**
filters:
- StripPrefix=1
这样应该没问题吧,按道理,我现在访问 ws://ip:网关端口/ws 应该可以连接才对,但是那就是不能连接,我也不知道什么问题。
那就试试第二种方法,第二种方法好像行,但是好像又不太行。
2、直接通过网关配置真实地址做负载,不走服务名
项目启动不变,修改一下网关配置
routes:
- id: chemi3-websocket-im
uri: ws://127.0.0.1:9876
predicates:
- Path=/ws/**
- Weight=group1,50
filters:
- StripPrefix=1
- id: chemi3-websocket-im1
uri: ws://127.0.0.1:9875
predicates:
- Path=/ws/**
- Weight=group1,50
filters:
- StripPrefix=1
这样就是通过网关来做负载均衡,好像是可以的,但是感觉不太稳妥。而且这样如果再启动更多的服务就只能在这里加配置,ip和端口写死。我没有具体去做线上的实际使用,你们可以试一下。
结束
原文地址:https://blog.csdn.net/qq_38227017/article/details/144734302
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!