即时通讯增加Redis渠道
情况说明
在本地和服务器分别启动im服务,当本地发送消息时,会发现服务器上并没有收到消息
初版im只支持单机版,不支持分布式的情况。此次针对该情况对项目进行优化,文档中贴出的代码非完整代码,可自行查看参考资料[2]
代码结构调整
本次调整需要增加一个redis的渠道,为了方便后续再进行渠道的增加,对现有代码结构进行调整
- IBaseSendExecutor
渠道扩充接口,后续再增加渠道都可以实现该接口
package com.example.im.infra.executor.send;
/**
* @author PC
* 通信处理
*/
public interface IBaseSendExecutor {
/**
* 获取通信类型,预置的有默认和redis
*
* @return 通讯类型
*/
String getCommunicationType();
/**
* 发送给指定人
*
* @param sendUserName 发送人
* @param message 消息
*/
void sendToUser(String sendUserName, String message);
/**
* 发送给全部人
*
* @param sendUserName 发送人
* @param message 消息
*/
void sendToAll(String sendUserName, String message);
}
- AbstractBaseSendExecutor
通信处理抽象类,将一些预定义的渠道所需要的公有方法提取出来
package com.example.im.infra.executor.send;
import com.example.im.config.WebSocketProperties;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author PC
*/
public abstract class AbstractBaseSendExecutor implements IBaseSendExecutor {
protected WebSocketProperties webSocketProperties;
@Autowired
public void setWebSocketProperties(WebSocketProperties webSocketProperties) {
this.webSocketProperties = webSocketProperties;
}
/**
* 获取接收人信息
*
* @param sendUserName 发送人
* @param message 消息
* @return 接收人列表
*/
protected List<String> getReceiverName(String sendUserName, String message) {
if (!StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {
return new ArrayList<>();
}
String[] names = StringUtils.split(message, webSocketProperties.getReceiverSeparator());
return Stream.of(names).skip(1).filter(receiver ->
!(webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, receiver)))
.collect(Collectors.toList());
}
/**
* 根据配置处理发送的信息
*
* @param message 原消息
* @return 被处理后的消息
*/
protected String generatorMessage(String message) {
return BooleanUtils.isTrue(webSocketProperties.getExcludeReceiverInfoFlag()) ?
StringUtils.substringBefore(message, webSocketProperties.getReceiverSeparator()) : message;
}
}
- DefaultSendExecutor
原有消息发送逻辑
package com.example.im.infra.executor.send;
import com.example.im.endpoint.WebSocketEndpoint;
import com.example.im.infra.constant.ImConstants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
/**
* @author PC
* 默认执行
*/
@Component
public class DefaultSendExecutor extends AbstractBaseSendExecutor {
private final static Logger logger = LoggerFactory.getLogger(DefaultSendExecutor.class);
private TaskExecutor taskExecutor;
@Autowired
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public String getCommunicationType() {
return ImConstants.CommunicationType.DEFAULT;
}
@Override
public void sendToUser(String sendUserName, String message) {
List<String> receiverNameList = getReceiverName(sendUserName, message);
CountDownLatch countDownLatch = new CountDownLatch(receiverNameList.size());
Set<String> notOnlineReceiverSet = ConcurrentHashMap.newKeySet();
Set<String> finalNotOnlineReceiverSet = notOnlineReceiverSet;
receiverNameList.forEach(receiverName -> taskExecutor.execute(() -> {
try {
if (WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.containsKey(receiverName)) {
WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.get(receiverName).getSession().getBasicRemote()
.sendText(generatorMessage(message));
} else {
finalNotOnlineReceiverSet.add(receiverName);
}
} catch (IOException ioException) {
logger.error("send error:" + ioException);
} finally {
countDownLatch.countDown();
}
}
)
);
try {
countDownLatch.await();
} catch (InterruptedException interruptedException) {
logger.error("error.countDownLatch.await");
}
notOnlineReceiverSet = notOnlineReceiverSet.stream().filter(StringUtils::isNotEmpty).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(notOnlineReceiverSet)) {
logger.info("not online number is " + notOnlineReceiverSet.size());
logger.info("The user : {} is not online", String.join(",", notOnlineReceiverSet));
}
}
@Override
public void sendToAll(String sendUserName, String message) {
for (Map.Entry<String, WebSocketEndpoint> webSocketEndpointEntry : WebSocketEndpoint.WEB_SOCKET_ENDPOINT_MAP.entrySet()) {
taskExecutor.execute(() -> {
if (webSocketProperties.getReceiverExcludesHimselfFlag() && StringUtils.equals(sendUserName, webSocketEndpointEntry.getKey())) {
return;
}
try {
webSocketEndpointEntry.getValue().getSession().getBasicRemote()
.sendText(generatorMessage(message));
} catch (IOException ioException) {
logger.error("send error:" + ioException);
}
}
);
}
}
}
- SendExecutorFactory
发送渠道工厂
package com.example.im.infra.executor.send;
import com.example.im.config.WebSocketProperties;
import com.example.im.infra.executor.config.ExecutorConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @author PC
* 发送逻辑工厂
*/
@Component
public class SendExecutorFactory {
private final WebSocketProperties webSocketProperties;
private ExecutorConfiguration executorConfiguration;
@Autowired
public SendExecutorFactory(WebSocketProperties webSocketProperties) {
this.webSocketProperties = webSocketProperties;
}
@Autowired
public void setExecutorConfiguration(ExecutorConfiguration executorConfiguration) {
this.executorConfiguration = executorConfiguration;
}
public void onMessage(String sendUserName, String message) {
IBaseSendExecutor iBaseSendExecutor = Optional.ofNullable(executorConfiguration.getBaseSendExecutorMap()
.get(webSocketProperties.getCommunicationType())).orElse(new DefaultSendExecutor());
//包含@发给指定人,否则发给全部人
if (StringUtils.contains(message, webSocketProperties.getReceiverSeparator())) {
iBaseSendExecutor.sendToUser(sendUserName, message);
} else {
iBaseSendExecutor.sendToAll(sendUserName, message);
}
}
}
- ExecutorConfiguration
加载
package com.example.im.infra.executor.config;
import com.example.im.infra.executor.send.IBaseSendExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author PC
* Executor配置
*/
@Component
public class ExecutorConfiguration implements ApplicationContextAware {
private final static Logger logger = LoggerFactory.getLogger(ExecutorConfiguration.class);
private Map<String, IBaseSendExecutor> baseSendExecutorMap = new HashMap<>(16);
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ExecutorConfiguration.applicationContext = applicationContext;
//加载IBaseSendExecutor实现类
this.initBaseSendExecutor(applicationContext);
}
/**
* 加载IBaseSendExecutor实现类
* 如果一个服务的发送渠道是固定的,可以使用@Bean搭配@ConditionalOnProperty的方式
* 但是考虑到后续可能会有一个服务不同发送渠道的场景,采用当前加载方式
*
* @param applicationContext 上下文
*/
private void initBaseSendExecutor(ApplicationContext applicationContext) {
logger.info("Start loading IBaseSendExecutor");
Map<String, IBaseSendExecutor> baseSendExecutorMap = applicationContext.getBeansOfType(IBaseSendExecutor.class);
for (Map.Entry<String, IBaseSendExecutor> iBaseSendExecutorEntry : baseSendExecutorMap.entrySet()) {
String communicationType = iBaseSendExecutorEntry.getValue().getCommunicationType();
this.baseSendExecutorMap.put(communicationType, iBaseSendExecutorEntry.getValue());
logger.info("initBaseSendExecutor>>>>>>>communicationType:{},className:{}", communicationType, iBaseSendExecutorEntry.getValue().getClass().getName());
}
logger.info("IBaseSendExecutor loading is complete");
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public Map<String, IBaseSendExecutor> getBaseSendExecutorMap() {
return baseSendExecutorMap;
}
public void setBaseSendExecutorMap(Map<String, IBaseSendExecutor> baseSendExecutorMap) {
this.baseSendExecutorMap = baseSendExecutorMap;
}
}
添加redis通信渠道
- pom.xml
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
- application.yml
server:
port: 18080
cus:
ws:
exclude-receiver-info-flag: true
receiver-excludes-himself-flag: true
communication-type: redis
spring:
redis:
host: 127.0.0.1
port: 6379
username: root
password: root
database: ${SPRING_REDIS_DATABASE:1}
# Redis连接超时时间
connect-timeout: ${SPRING_REDIS_CONNECT_TIMEOUT:2000}
# Redis读取超时时间
timeout: ${SPRING_REDIS_READ_TIMEOUT:5000}
lettuce:
pool:
# 资源池中最大连接数
# 默认8,-1表示无限制;可根据服务并发redis情况及服务端的支持上限调整
max-active: ${SPRING_REDIS_POOL_MAX_ACTIVE:50}
# 资源池运行最大空闲的连接数
# 默认8,-1表示无限制;可根据服务并发redis情况及服务端的支持上限调整,一般建议和max-active保持一致,避免资源伸缩带来的开销
max-idle: ${SPRING_REDIS_POOL_MAX_IDLE:50}
# 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)
# 默认 -1 表示永不超时,设置5秒
max-wait: ${SPRING_REDIS_POOL_MAX_WAIT:5000}
- RedisSendExecutor
redis发送
package com.example.im.infra.executor.send.redis;
import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import com.example.im.infra.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* @author PC
* redis执行
*/
@Component
public class RedisSendExecutor extends AbstractBaseSendExecutor {
private final static Logger logger = LoggerFactory.getLogger(RedisSendExecutor.class);
private RedisTemplate<String, String> redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public String getCommunicationType() {
return ImConstants.CommunicationType.REDIS;
}
@Override
public void sendToUser(String sendUserName, String message) {
MessageInfo messageInfo = new MessageInfo();
messageInfo.setSendUserName(sendUserName);
messageInfo.setMessage(message);
messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);
logger.debug("send to user redis websocket, channel is " + "redis-websocket");
redisTemplate.convertAndSend("redis-websocket-user", JsonUtils.toJson(messageInfo));
}
@Override
public void sendToAll(String sendUserName, String message) {
MessageInfo messageInfo = new MessageInfo();
messageInfo.setSendUserName(sendUserName);
messageInfo.setMessage(message);
messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);
logger.debug("send to all redis websocket, channel is " + "redis-websocket");
redisTemplate.convertAndSend("redis-websocket-all", JsonUtils.toJson(messageInfo));
}
}
- RedisMessageListener
redis监听
package com.example.im.infra.executor.send.redis;
import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author PC
* redis监听
*/
@Component
public class RedisMessageListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);
private DefaultSendExecutor defaultSendExecutor;
@Autowired
public void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {
this.defaultSendExecutor = defaultSendExecutor;
}
@Override
public void onMessage(Message message, byte[] pattern) {
//消息内容
String messageJson = new String(message.getBody(), StandardCharsets.UTF_8);
MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {
});
switch (messageInfo.getScopeOfSending()) {
case USER:
defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());
break;
case ALL:
defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());
break;
default:
//一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子
logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());
break;
}
}
}
测试
本地服务发送消息
服务器接收到了消息
常见问题
打包报错
执行mvn clean packages打包时出现以下错误
[ERROR] contextLoads Time elapsed: 0.001 s <<< ERROR!
java.lang.IllegalStateException: Failed to load ApplicationContext
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'serverEndpoint' defined in class path resource [c
om/example/im/config/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available
Caused by: java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available
查看ServerContainer接口,发现其有两个接口实现类,其中有一个是test包的
将其排除后即可正常打包
jar包启动时no main manifest attribute问题
需将pom的plugin标签中的skip标签删除或设置为false
参考资料
[1].初版im文档
[2].im项目地址
原文地址:https://blog.csdn.net/weixin_43625238/article/details/142962332
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!