redis订阅发布(精简)
订阅类A
package com.hdx.master.listener;
import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class AMessageListener extends MessageListenerAdapter {
@Autowired
RedisTemplate<String, Object> redisTemplate;
@SneakyThrows
@Override
public void onMessage(Message message, byte[] pattern) {
List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());
}
}
订阅类B
package com.hdx.master.listener;
import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class BMessageListener extends MessageListenerAdapter {
@Autowired
RedisTemplate<String, Object> redisTemplate;
@SneakyThrows
@Override
public void onMessage(Message message, byte[] pattern) {
List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());
}
}
订阅的主题的配置类
package com.hdx.master.socket.client;
import com.hdx.master.listener.AMessageListener;
import com.hdx.master.listener.BMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OldDataHandle implements InitializingBean {
@Autowired
private RedisMessageListenerContainer container;
@Autowired
private AMessageListener aMessageListener;
@Autowired
private BMessageListener bMessageListener;
@Override
public void afterPropertiesSet() throws Exception {
subscribe();
}
public void subscribe() {
container.addMessageListener(aMessageListener, new PatternTopic("aTOPIC"));
container.addMessageListener(bMessageListener, new PatternTopic("bTOPIC"));
}
}
redis配置类
package com.hdx.master.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
/**
* 线程池配置 给redsi订阅的时候使用,如果不使用这个线程池,redis订阅会一直创建线程
* @return
*/
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16); // 设置核心线程数
executor.setMaxPoolSize(500); // 设置最大线程数
executor.setQueueCapacity(10000); // 设置队列容量
executor.setThreadFactory(new CustomizableThreadFactory("RedisPubSub-exec-"));
executor.initialize(); // 初始化执行器
return executor;
}
/**
* redis 消息监听
* @return
*/
@Bean
public RedisMessageListenerContainer messageListenerContainer(Executor threadPoolTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.setTaskExecutor(threadPoolTaskExecutor);
return container;
}
@Bean
@Primary
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// 自定义RedisTemplate时,必须 设置连接方式
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 系列化
redisTemplate.setKeySerializer(keySerializer());// 键系列化
redisTemplate.setHashKeySerializer(keySerializer());
redisTemplate.setValueSerializer(jacksonSerializer());// 值系列化
redisTemplate.setHashValueSerializer(jacksonSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
public RedisSerializer<String> keySerializer() {
return new StringRedisSerializer();
}
private Jackson2JsonRedisSerializer jacksonSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return jackson2JsonRedisSerializer;
}
}
测试使用mvc接口发送消息进行发布订阅
package com.hdx.master.controller;
import com.hdx.master.common.HttpResult;
import com.hdx.master.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("test")
public class PublishController {
/**
* redis工具类
*/
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 发布消息
*
* @return
*/
@PostMapping("/publish")
public HttpResult publish(String message) {
redisTemplate.convertAndSend("aTOPIC", message);
return HttpResult.successMsg("发布成功");
}
}
原文地址:https://blog.csdn.net/qq_19891197/article/details/144301502
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!