自学内容网 自学内容网

redis订阅发布(精简)

订阅类A

package com.hdx.master.listener;


import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class AMessageListener extends MessageListenerAdapter {

    @Autowired
    RedisTemplate<String, Object> redisTemplate;

    @SneakyThrows
    @Override
    public void onMessage(Message message, byte[] pattern) {
        
        List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());

    }
}

订阅类B

package com.hdx.master.listener;


import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class BMessageListener extends MessageListenerAdapter {

    @Autowired
    RedisTemplate<String, Object> redisTemplate;

    @SneakyThrows
    @Override
    public void onMessage(Message message, byte[] pattern) {
        
        List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());

    }
}

订阅的主题的配置类

package com.hdx.master.socket.client;

import com.hdx.master.listener.AMessageListener;
import com.hdx.master.listener.BMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OldDataHandle implements InitializingBean {

    @Autowired
    private RedisMessageListenerContainer container;

    @Autowired
    private AMessageListener  aMessageListener;

    @Autowired
    private BMessageListener bMessageListener;

    @Override
    public void afterPropertiesSet() throws Exception {
        subscribe();
    }

    public void subscribe() {
        container.addMessageListener(aMessageListener, new PatternTopic("aTOPIC"));
        container.addMessageListener(bMessageListener, new PatternTopic("bTOPIC"));
    }
}

redis配置类

package com.hdx.master.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableRedisRepositories
public class RedisConfig {

@Autowired
private RedisConnectionFactory redisConnectionFactory;

/**
 * 线程池配置 给redsi订阅的时候使用,如果不使用这个线程池,redis订阅会一直创建线程
 * @return
 */
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16); // 设置核心线程数
executor.setMaxPoolSize(500); // 设置最大线程数
executor.setQueueCapacity(10000); // 设置队列容量
executor.setThreadFactory(new CustomizableThreadFactory("RedisPubSub-exec-"));
executor.initialize(); // 初始化执行器
return executor;
}

/**
 * redis 消息监听
 * @return
 */
@Bean
public RedisMessageListenerContainer messageListenerContainer(Executor threadPoolTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.setTaskExecutor(threadPoolTaskExecutor);
return container;
}

@Bean
@Primary
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();

// 自定义RedisTemplate时,必须 设置连接方式
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 系列化
redisTemplate.setKeySerializer(keySerializer());// 键系列化
redisTemplate.setHashKeySerializer(keySerializer());
redisTemplate.setValueSerializer(jacksonSerializer());// 值系列化
redisTemplate.setHashValueSerializer(jacksonSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}

public RedisSerializer<String> keySerializer() {
return new StringRedisSerializer();
}

private Jackson2JsonRedisSerializer jacksonSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return jackson2JsonRedisSerializer;
}

}

测试使用mvc接口发送消息进行发布订阅

package com.hdx.master.controller;

import com.hdx.master.common.HttpResult;
import com.hdx.master.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;


@RestController
@RequestMapping("test")
public class PublishController {

    /**
     * redis工具类
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 发布消息
     *
     * @return
     */
    @PostMapping("/publish")
    public HttpResult publish(String message) {
        redisTemplate.convertAndSend("aTOPIC", message);
        return HttpResult.successMsg("发布成功");
    }

}



原文地址:https://blog.csdn.net/qq_19891197/article/details/144301502

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!