自学内容网 自学内容网

Spring Boot中集成Redis与MySQL

1. 环境准备与依赖配置 

1.1 Maven 依赖管理

为了在 Spring Boot 项目中使用 Redis 和 MySQL,我们需要在 pom.xml 中添加必要的依赖。主要包括以下几个依赖:

  • Spring Data Redis:用于在 Spring Boot 中集成 Redis,提供 RedisTemplate 进行操作。
  • MySQL JDBC 驱动:用于连接 MySQL 数据库。
  • Spring Data JPA:用于简化与 MySQL 数据库的交互,提供面向对象的数据库操作支持。

具体依赖代码:

<!-- Spring Data Redis 依赖,用于集成 Redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- MySQL JDBC 驱动,用于连接 MySQL 数据库 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- Spring Data JPA 依赖,用于简化数据库访问 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

添加完这些依赖后,Spring Boot 项目就可以利用 Spring Data Redis 来操作 Redis,并通过 Spring Data JPA 连接和操作 MySQL 数据库。

1.2. 配置文件:设置 application.properties 文件

Spring Boot 项目中通常使用 application.properties 文件来配置应用所需的参数。以下是 Redis 和 MySQL 的详细连接参数设置说明。

Redis 配置

# Redis 主机地址,默认情况下是 localhost,本地使用无需修改
spring.redis.host=localhost

# Redis 服务端口,默认 6379
spring.redis.port=6379

# Redis 密码,如果没有设置密码,可以留空
spring.redis.password=your_redis_password

配置说明:

  • spring.redis.host:Redis 服务器的主机地址,通常为 localhost(即本地)或 Redis 所在的服务器 IP。
  • spring.redis.port:Redis 服务器的端口号,默认是 6379,可以根据实际 Redis 配置进行调整。
  • spring.redis.password:Redis 的访问密码。如果 Redis 设置了密码保护(通常用于远程访问或安全性较高的场景),则在此处填写对应的密码。

MySQL 配置

# MySQL 数据库连接 URL,格式为:jdbc:mysql://[host]:[port]/[database_name]?参数
spring.datasource.url=jdbc:mysql://localhost:3306/your_database_name?useSSL=false&serverTimezone=UTC

# MySQL 数据库用户名
spring.datasource.username=your_mysql_username

# MySQL 数据库用户密码
spring.datasource.password=your_mysql_password

# JDBC 驱动类名称,Spring Boot 2.x 及以上版本中使用 com.mysql.cj.jdbc.Driver
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

配置说明:

  • spring.datasource.url:数据库连接 URL。其格式为:
    • jdbc:mysql://:指定使用 MySQL 数据库的 JDBC 驱动。
    • [host]:MySQL 服务器主机地址,本地为 localhost,如果是远程则填写 IP 地址。
    • [port]:MySQL 服务端口号,默认是 3306,可以根据实际情况调整。
    • [database_name]:具体使用的数据库名称,需要在 MySQL 中提前创建好。
    • ?useSSL=false:指定是否启用 SSL 连接(一般本地开发设为 false)。
    • &serverTimezone=UTC:设置服务器的时区为 UTC,防止可能的时区问题。
  • spring.datasource.username:MySQL 数据库用户名,用于连接数据库。
  • spring.datasource.password:对应的用户名密码,确保输入正确。
  • spring.datasource.driver-class-name:指定 JDBC 驱动类。MySQL 使用 com.mysql.cj.jdbc.Driver(这是 MySQL 8.0 及以上版本的驱动类)。

1.3. 本地与远程服务搭建

1.3.1. Redis 服务启动

(1)本地 Redis 启动

安装 Redis:首先,你需要在本地环境中安装 Redis。根据操作系统的不同,可以通过 apt-get(Linux)、brew(macOS)或直接下载 Redis 可执行文件来安装。

启动 Redis 服务

  • 安装完成后,可以通过命令行启动 Redis 服务:
redis-server
  • Redis 默认在 localhost6379 端口上监听请求。
  • 启动后,你可以通过 Redis 客户端(redis-cli)测试连接。在终端输入以下命令:
redis-cli
  • 然后执行 PING 命令。如果返回 PONG,表示 Redis 已经正常启动。

(2)远程 Redis 服务器配置

安全性设置:如果 Redis 部署在远程服务器上,建议为 Redis 设置密码。在 Redis 的配置文件(通常是 redis.conf)中设置 requirepass your_password,然后重新启动服务以生效。

防火墙配置:确保 Redis 端口(默认 6379)在服务器的防火墙中开放,以允许远程访问。

网络连接测试

  • 确保你的应用主机能够通过 IP 地址和端口访问远程 Redis 服务器,可以在命令行测试连接:
redis-cli -h <远程IP> -p 6379 -a <密码>
  • 如果连接成功,执行 PING 返回 PONG,则说明网络连接和认证都已正确配置。

(3)Redis 的远程访问限制

  • 为了安全起见,Redis 的配置文件中默认限制了公网访问。可以在 redis.conf 文件中设置 bind 0.0.0.0 以允许所有 IP 连接 Redis。
  • 也可以在同一文件中设置 protected-mode yes 以启用受保护模式,如果不打算开放公网访问,可以禁用远程访问。

1.3.2. MySQL 服务启动

(1)本地 MySQL 启动

安装 MySQL:根据操作系统不同,可以通过 apt-getyumbrew 或下载安装包来安装 MySQL。

启动 MySQL 服务

  • 安装后,启动 MySQL 服务。在不同系统上启动 MySQL 的命令可能不同,如在 Linux 中:
sudo service mysql start
  • 使用以下命令行测试 MySQL 是否正常启动并连接:
mysql -u root -p
  • 输入密码后,如果成功进入 MySQL 命令行界面,表示 MySQL 已正常运行。

(2)远程 MySQL 服务器配置

1.配置远程访问权限:默认情况下,MySQL 只允许本地连接。要开放远程访问权限,你需要修改 MySQL 的用户权限。在 MySQL 控制台执行以下命令:

CREATE USER 'your_user'@'%' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON your_database.* TO 'your_user'@'%';
FLUSH PRIVILEGES;

2.配置文件修改:修改 MySQL 配置文件(通常是 my.cnfmy.ini),找到 [mysqld] 配置块,设置 bind-address0.0.0.0 以允许所有 IP 地址访问:

[mysqld]
bind-address = 0.0.0.0

重新启动 MySQL 使配置生效。

3.防火墙配置:确保服务器上开放 MySQL 的默认端口(3306),允许远程访问。

4.测试远程连接

使用以下命令测试远程连接(在另一台主机上):

mysql -h <远程IP> -P 3306 -u your_user -p

如果成功连接,表示 MySQL 的远程访问配置正确。

1.3.3. 网络连通性和认证设置

网络连通性测试:从应用主机上测试到 Redis 和 MySQL 服务的连通性。可以使用 pingtelnet 命令检查 IP 和端口的连接状态:

# 测试 Redis 连接
telnet <redis_ip> 6379

# 测试 MySQL 连接
telnet <mysql_ip> 3306

如果连接成功说明网络连通性没有问题。

认证配置

  • 确保 Redis 和 MySQL 的认证信息(如用户名、密码)正确配置在 Spring Boot 项目中,并且可以成功访问。
  • 对于 Redis,使用密码认证的配置项是 spring.redis.password
  • 对于 MySQL,认证配置项包括 spring.datasource.usernamespring.datasource.password

2.Spring Boot 与 Redis 的集成

2.1. RedisTemplate 配置

RedisTemplate 是一个通用的模板类,适用于操作 Redis 中的多种数据结构,包括字符串、哈希、列表、集合等。我们可以通过 RedisTemplate 来方便地执行 Redis 的增删查改操作。为了确保数据的可读性和兼容性,我们通常需要自定义 RedisTemplate 的序列化配置。

2.1.1创建 RedisTemplate Bean

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        // 创建 RedisTemplate 实例
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        
        // 设置连接工厂
        template.setConnectionFactory(redisConnectionFactory);

        // 配置 key 的序列化器
        template.setKeySerializer(new StringRedisSerializer());

        // 配置 value 的序列化器
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        // 配置 hash key 的序列化器
        template.setHashKeySerializer(new StringRedisSerializer());

        // 配置 hash value 的序列化器
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

        // 初始化 RedisTemplate 配置
        template.afterPropertiesSet();
        
        return template;
    }
}

配置解析

1.创建 RedisTemplate Bean

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { ... }
  • 使用 @Bean 注解,将 RedisTemplate 注册为 Spring 容器的一个 Bean,使得应用的其他组件可以通过依赖注入 @Autowired 来直接使用它。
  • 泛型 <String, Object> 表示此 RedisTemplate 的键类型为 String,值类型为 Object。这种配置较为通用,支持存储字符串和对象等多种类型。

2.设置连接工厂

template.setConnectionFactory(redisConnectionFactory);
  • RedisConnectionFactory 是 Redis 连接的工厂接口,它负责管理 Redis 连接的创建和配置。
  • Spring Boot 通常会自动配置一个 RedisConnectionFactory(基于 Lettuce 或 Jedis 的客户端)供我们使用。这里将连接工厂传给 RedisTemplate,确保其能够与 Redis 服务器建立连接。
  • 没有配置连接工厂,RedisTemplate 将无法连接到 Redis 服务,导致操作失败。

3.配置键的序列化器

template.setKeySerializer(new StringRedisSerializer());
  • setKeySerializer 用于设置键(key)的序列化方式。
  • StringRedisSerializer 是一种将键转换为字符串格式的序列化器,它将键序列化为 UTF-8 字符串,并以二进制的形式存储在 Redis 中。
  • 采用字符串序列化器使键在 Redis 中存储时具有可读性,例如你可以在 Redis CLI 中直接查看键的名称,这对于调试和维护非常有帮助。

4.配置值的序列化器

template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  • setValueSerializer 用于设置值(value)的序列化方式。
  • GenericJackson2JsonRedisSerializer 是一个 JSON 序列化器,使用 Jackson 库将对象序列化为 JSON 字符串存储。
  • 这样配置的好处是,值在 Redis 中会以 JSON 格式存储,不仅可读性强,而且便于与其他系统的数据交互。如果需要从 Redis 中获取复杂对象数据,GenericJackson2JsonRedisSerializer 还支持反序列化,将 JSON 字符串转换回 Java 对象。

5.配置哈希键的序列化器

template.setHashKeySerializer(new StringRedisSerializer());
  • setHashKeySerializer 设置哈希数据类型的键(hash key)的序列化器。
  • Redis 的哈希数据类型允许在一个键下存储多个字段(即子键和值对),这些字段可以独立操作。StringRedisSerializer 将哈希键序列化为字符串,使得存储的哈希键具备可读性,方便查看。
  • 例如在 Redis 中,你可以看到每个哈希键的具体内容,这在操作哈希类型时非常直观。

6.配置哈希值的序列化器

template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
  • setHashValueSerializer 设置哈希数据类型的值(hash value)的序列化器。
  • 我们使用 GenericJackson2JsonRedisSerializer 将哈希值序列化为 JSON 格式,以确保哈希类型的值在 Redis 中以 JSON 形式存储。
  • JSON 格式不仅便于阅读,还支持复杂对象的存储,适合项目中有嵌套对象或自定义数据结构的需求。

7.初始化 RedisTemplate 配置

template.afterPropertiesSet();
  • afterPropertiesSet() 方法用于初始化 RedisTemplate 的配置。
  • 这个方法会检查 RedisTemplate 的各个属性是否已正确设置,确保 RedisTemplate 就绪以供使用。这一步是 Spring 处理配置属性的通用做法,避免运行时出现未初始化的错误。

2.1.2.创建 StringRedisTemplate Bean

StringRedisTemplateRedisTemplate 的一个变种,它专门用于存储字符串数据。由于它默认的键和值的序列化方式都是 StringRedisSerializer,可以直接用于存储和读取字符串数据。

StringRedisTemplate 配置代码

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;

@Configuration
public class RedisConfig {

    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        return new StringRedisTemplate(redisConnectionFactory);
    }
}

配置解析

  • StringRedisTemplate 继承自 RedisTemplate,主要用于操作 Redis 中的字符串类型数据。
  • StringRedisTemplate 的键和值的序列化方式都是 StringRedisSerializer,这使得所有存储在 Redis 中的数据都是可读的字符串格式。
  • 适用于数据都是简单字符串的场景,由于 StringRedisTemplate 默认的序列化方式已满足大多数字符串操作的需求,不需要额外配置序列化器。
  • 注册为 Spring Bean:同样地,@Bean 注解将 StringRedisTemplate 注册为 Spring 容器中的一个 Bean,其他类可以使用 @Autowired 注入 StringRedisTemplate 实例,进行字符串数据的操作。

2.1.3.常用序列化方式总结

  • StringRedisSerializer

    • 用途:将键或简单的值序列化为字符串。
    • 适用场景:通常用于键的序列化,确保键在 Redis 中以字符串存储,以便于直接查看和管理。
  • GenericJackson2JsonRedisSerializer

    • 用途:将对象序列化为 JSON 格式的字符串,并支持 JSON 反序列化回对象。
    • 适用场景:通常用于值的序列化,尤其是需要存储复杂对象的情况。它可以确保数据的可读性,且 JSON 格式数据跨系统兼容性好。

2.2.CRUD 操作方法 

2.2.1.RedisTemplate 常用方法

通用方法

除了特定数据结构的 opsFor...() 方法,RedisTemplate 还提供了一些通用的 Redis 操作方法:

  • delete(String key):删除键 key 及其对应的值,返回是否成功删除。
  • expire(String key, long timeout, TimeUnit unit):设置键 key 的过期时间。
  • hasKey(String key):检查键 key 是否存在,返回 truefalse
  • keys(String pattern):获取所有符合模式 pattern 的键。
  • persist(String key):移除键 key 的过期时间,使其永久有效。
  • rename(String oldKey, String newKey):将键 oldKey 重命名为 newKey
  • type(String key):返回键 key 的数据类型。

1. opsForValue() - 操作字符串(String)类型的数据

opsForValue() 用于操作 Redis 中的字符串数据,适用于简单的键值对操作。

常用方法

  • set(String key, Object value):将键 key 的值设置为 value,如果键已存在,则覆盖旧值。
  • set(String key, Object value, long timeout, TimeUnit unit):将键 key 的值设置为 value,并指定过期时间 timeout 和时间单位 unit
  • get(String key):获取键 key 对应的值。
  • increment(String key, long delta):将键 key 的值增加 delta,返回增加后的值(适用于整数类型的值)。
  • decrement(String key, long delta):将键 key 的值减少 delta,返回减少后的值。

2. opsForHash() - 操作哈希(Hash)类型的数据

opsForHash() 用于操作 Redis 中的哈希结构(类似于键值对集合),适合存储对象的多个字段或属性。

常用方法

  • put(String key, Object hashKey, Object value):设置哈希表 key 中字段 hashKey 的值为 value
  • putAll(String key, Map<?, ?> map):将整个 map 存入哈希表 key 中。
  • get(String key, Object hashKey):获取哈希表 key 中字段 hashKey 的值。
  • delete(String key, Object... hashKeys):删除哈希表 key 中的一个或多个字段。
  • entries(String key):获取哈希表 key 中的所有键值对,返回一个 Map
  • hasKey(String key, Object hashKey):判断哈希表 key 中是否存在字段 hashKey
  • size(String key):获取哈希表 key 的字段数量。

3. opsForList() - 操作列表(List)类型的数据

opsForList() 用于操作 Redis 中的列表结构,列表数据类型可以存储一个有序的字符串列表(支持从左、右两端插入和弹出)。

常用方法

  • leftPush(String key, Object value):将 value 插入到列表 key 的左侧。
  • rightPush(String key, Object value):将 value 插入到列表 key 的右侧。
  • leftPop(String key):移除并返回列表 key 的左侧第一个元素。
  • rightPop(String key):移除并返回列表 key 的右侧第一个元素。
  • range(String key, long start, long end):获取列表 key 中从 startend 范围内的元素。
  • size(String key):获取列表 key 的长度。
  • set(String key, long index, Object value):将列表 key 中指定索引 index 的元素设置为 value

4. opsForSet() - 操作集合(Set)类型的数据

opsForSet() 用于操作 Redis 中的集合结构,集合中的元素是唯一的,且无序。

常用方法

  • add(String key, Object... values):向集合 key 中添加一个或多个 values,返回添加的元素数量。
  • remove(String key, Object... values):从集合 key 中移除一个或多个元素。
  • members(String key):获取集合 key 中的所有元素。
  • isMember(String key, Object value):判断 value 是否是集合 key 的成员。
  • size(String key):获取集合 key 的元素个数。
  • intersect(String key, String otherKey):返回集合 keyotherKey 的交集。
  • difference(String key, String otherKey):返回集合 keyotherKey 的差集。
  • union(String key, String otherKey):返回集合 keyotherKey 的并集。

5. opsForZSet() - 操作有序集合(Sorted Set/ZSet)类型的数据

opsForZSet() 用于操作 Redis 中的有序集合结构。有序集合中的每个元素都关联一个分数,按分数从小到大排序。

常用方法

  • add(String key, Object value, double score):将 value 添加到有序集合 key 中,并设置分数 score
  • remove(String key, Object... values):从有序集合 key 中移除一个或多个元素。
  • score(String key, Object value):获取有序集合 key 中元素 value 的分数。
  • rank(String key, Object value):返回有序集合 key 中元素 value 的排名(从小到大)。
  • reverseRank(String key, Object value):返回有序集合 key 中元素 value 的排名(从大到小)。
  • range(String key, long start, long end):根据索引范围 startend 获取有序集合 key 中的元素。
  • rangeByScore(String key, double min, double max):根据分数范围 minmax 获取有序集合 key 中的元素。
  • size(String key):获取有序集合 key 的元素个数。

2.2.2. 创建 RedisService 类(编写CRUD 方法)

首先,我们创建一个服务类 RedisService,在这个类中编写对 Redis 数据的 CRUD 操作方法。假设我们已经在 Spring Boot 中配置好了 RedisTemplate,我们可以直接注入它来使用。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class RedisService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 创建或存储数据
    public void saveData(String key, Object value) {
        redisTemplate.opsForValue().set(key, value);
    }

    // 创建或存储带过期时间的数据
    public void saveDataWithExpiration(String key, Object value, long timeout, TimeUnit unit) {
        redisTemplate.opsForValue().set(key, value, timeout, unit);
    }

    // 读取数据
    public Object getData(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    // 更新数据(在 Redis 中,更新实际上是直接覆盖旧值)
    public void updateData(String key, Object newValue) {
        redisTemplate.opsForValue().set(key, newValue); // 覆盖旧值
    }

    // 删除数据
    public void deleteData(String key) {
        redisTemplate.delete(key);
    }

    // 检查键是否存在
    public boolean exists(String key) {
        return Boolean.TRUE.equals(redisTemplate.hasKey(key));
    }
}

1. saveData - 创建或存储数据

public void saveData(String key, Object value) {
    redisTemplate.opsForValue().set(key, value);
}
  • 作用:将键 key 的值设置为 value。如果键已存在,则覆盖旧值。
  • 解释opsForValue().set(key, value) 是一个写入操作,使用字符串类型数据结构将 value 存储在 Redis 中的 key 下。
  • 应用场景:适用于需要保存字符串、简单对象或 JSON 格式化对象的数据。

2. saveDataWithExpiration - 创建或存储带过期时间的数据

public void saveDataWithExpiration(String key, Object value, long timeout, TimeUnit unit) {
    redisTemplate.opsForValue().set(key, value, timeout, unit);
}
  • 作用:将键 key 的值设置为 value,并指定过期时间。
  • 解释:该方法将数据保存到 Redis 中,并设置键的过期时间。timeout 表示生存时间,unit 表示时间单位(如 TimeUnit.SECONDS)。
  • 应用场景:适用于存储有时效性的数据,例如用户会话、验证码等。

3. getData - 读取数据

public Object getData(String key) {
    return redisTemplate.opsForValue().get(key);
}
  • 作用:根据键 key 获取 Redis 中存储的值。
  • 解释opsForValue().get(key) 方法从 Redis 中读取 key 的值。如果 key 不存在,返回 null
  • 应用场景:适用于读取简单的键值数据,例如读取用户信息、会话信息等。

4. updateData - 更新数据

public void updateData(String key, Object newValue) {
    redisTemplate.opsForValue().set(key, newValue);
}
  • 作用:更新键 key 的值为 newValue
  • 解释:Redis 没有直接的“更新”操作,使用 set 方法可以实现覆盖更新效果。如果 key 存在,则直接覆盖旧值;如果 key 不存在,则创建新数据。
  • 应用场景:适用于需要修改 Redis 中现有数据的场景。

5. deleteData - 删除数据

public void deleteData(String key) {
    redisTemplate.delete(key);
}
  • 作用:删除 Redis 中指定的键值对。
  • 解释delete(key) 方法会将 Redis 中指定的 key 删除,如果 key 存在则成功删除,如果 key 不存在则无影响。
  • 应用场景:适用于需要清理缓存数据、移除不再需要的数据的场景。

6. exists - 检查键是否存在

public boolean exists(String key) {
    return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
  • 作用:检查键 key 是否存在于 Redis 中。
  • 解释hasKey(key) 方法返回一个 Boolean,表示 key 是否存在。此处使用 Boolean.TRUE.equals(...) 来避免 null 值的问题。
  • 应用场景:适用于在执行操作前检查数据是否已存在,避免无效操作。

 2.3.缓存配置

RedisTemplate组件用于直接操作 Redis 数据,RedisCacheManager组件用于管理 Spring Cache 注解的缓存。

在 Spring Boot 中,将 Redis 作为缓存管理器可以通过 RedisCacheManager 实现,并利用 Spring Cache 的抽象功能。通过这种方式,可以很方便地使用缓存注解(如 @Cacheable@CachePut@CacheEvict)来管理 Redis 缓存。 

配置 Redis 作为缓存管理器

首先,我们需要创建一个配置类,用于设置 Redis 缓存的默认行为,例如键值的序列化方式和缓存条目的过期时间。

import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

@Configuration
@EnableCaching // 启用 Spring Cache 缓存功能
public class CacheConfig {

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
        // 配置 Redis 缓存的默认设置
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofMinutes(30)) // 设置缓存过期时间为 30 分钟
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) // 设置键的序列化器
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer())); // 设置值的序列化器

        // 创建 RedisCacheManager 并应用配置
        return RedisCacheManager.builder(redisConnectionFactory)
                .cacheDefaults(config)
                .build();
    }
}

配置类代码详解

1. @Configuration@EnableCaching 注解

@Configuration
@EnableCaching
  • @Configuration:将此类标记为一个配置类,用于定义 Spring Bean 和配置项。
  • @EnableCaching:启用 Spring Cache 的缓存功能。启用后,Spring 会自动检测和管理带有缓存注解(如 @Cacheable@CachePut@CacheEvict)的方法,从而自动实现缓存的存储和读取。

2. 创建 CacheManager Bean

@Bean
public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
  • CacheManager 是 Spring Cache 的核心组件,负责管理缓存的生命周期、存取和删除操作。我们在这里定义了一个 CacheManager Bean,利用 RedisCacheManager 将 Redis 作为缓存的存储介质。

  • RedisConnectionFactory 是 Redis 的连接工厂,用于与 Redis 建立连接。在这里,我们将 RedisConnectionFactory 注入到 RedisCacheManager 中,确保 CacheManager 可以正常连接 Redis 服务器。

3. 配置 Redis 缓存行为(RedisCacheConfiguration

在这部分代码中,我们定义了缓存的默认配置(如过期时间和序列化方式)。这些配置只影响通过 CacheManager 和缓存注解(如 @Cacheable)进行的缓存操作,不会影响 RedisTemplate

RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
        .entryTtl(Duration.ofMinutes(30))
        .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
        .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
  • RedisCacheConfiguration.defaultCacheConfig()

    • RedisCacheConfiguration 是 Redis 缓存的配置类,用于定义缓存的行为。defaultCacheConfig() 方法获取一个默认的缓存配置对象。
    • 我们可以在这个配置对象的基础上定制缓存的行为,例如设置过期时间和序列化器。
  • entryTtl(Duration.ofMinutes(30))

    • entryTtl 方法用于设置缓存条目的过期时间,即缓存的生存时间(TTL)。
    • 在这里,我们将缓存的过期时间设置为 30 分钟,这意味着缓存中的数据在存储 30 分钟后将自动失效。
    • 通过设置合理的过期时间,可以确保缓存中的数据是较新的,有助于数据的一致性。
  • serializeKeysWith( ......StringRedisSerializer()))

    • serializeKeysWith 方法用于设置缓存键(key)的序列化方式。
    • StringRedisSerializer 是一个将数据序列化为字符串的序列化器,适用于缓存键的序列化。使用字符串格式存储键便于在 Redis 中查看缓存键名,方便管理和调试。
  • serializeValuesWith( ....... GenericJackson2JsonRedisSerializer()))

    • serializeValuesWith 方法用于设置缓存值(value)的序列化方式。
    • GenericJackson2JsonRedisSerializer 使用 JSON 格式将对象序列化为 JSON 字符串。JSON 格式具有良好的可读性和跨系统兼容性,非常适合存储复杂的对象数据。
    • 这样配置后,所有通过缓存注解存入 Redis 的缓存数据将以 JSON 格式存储,便于在 Redis 中查看缓存内容,同时支持反序列化为 Java 对象。

4. 创建 RedisCacheManager 实例

return RedisCacheManager.builder(redisConnectionFactory)
        .cacheDefaults(config)
        .build();
  • RedisCacheManager.builder(redisConnectionFactory)

    • 使用 Redis 连接工厂 redisConnectionFactory 创建一个 RedisCacheManager 的构建器。
    • 连接工厂为 RedisCacheManager 提供与 Redis 服务器的连接,使其能够将缓存数据存储到 Redis 中。
  • .cacheDefaults(config)

    • cacheDefaults 方法用于指定 Redis 缓存的默认配置。这里将上面定义的 config(包含序列化方式和过期时间)作为 Redis 缓存的默认配置。
    • 所有使用 RedisCacheManager 存储的缓存数据将遵循该默认配置,使缓存的行为和存储格式一致。
  • .build()

    • 最终调用 build() 方法创建 RedisCacheManager 实例,并将其注册为 CacheManager Bean。
    • 通过 RedisCacheManager 管理缓存后,所有使用缓存注解(如 @Cacheable)的缓存数据将存储在 Redis 中,并按照指定的配置进行序列化和过期管理。

2.4.缓存注解使用

1. @Cacheable 注解

@Cacheable 注解用于将方法的返回值缓存起来。下次调用该方法时,如果参数相同,Spring 会直接从缓存中返回结果,而不再执行方法体。这可以减少对数据库或外部服务的重复访问,提升性能。

使用方法

@Cacheable(value = "userCache", key = "#id", unless = "#result == null")
public User getUserById(Long id) {
    System.out.println("Executing getUserById for id: " + id);
    return findUserInDatabase(id); // 模拟数据库查询
}

参数详解

  • value:指定缓存区域的名称,相当于缓存的命名空间或前缀(例如 userCache)。所有 userCache 缓存区的键都会以 userCache:: 开头。

  • key:指定缓存的键。使用 #参数名 来引用方法的参数,如 #id 表示将 id 参数作为缓存键的一部分。最终 Redis 中的键可能是 userCache::1(假设 id=1)。

  • unless(可选):指定条件,当条件成立时不缓存结果。例如,unless = "#result == null" 表示如果返回值为 null,则不缓存。

应用场景

  • 数据查询操作:适用于频繁查询的操作,比如从数据库中获取用户信息、商品详情等。这些数据变化不频繁,使用 @Cacheable 缓存可以大大减少数据库访问。

  • API 调用:对于一些耗时的第三方 API 请求,可以缓存其返回结果,避免频繁调用外部接口。


2. @CachePut 注解

@CachePut 注解用于更新缓存数据,与 @Cacheable 不同的是,@CachePut 每次都会执行方法体,并将返回值更新到缓存中。它主要用于在更新数据的同时刷新缓存。

使用方法

@CachePut(value = "userCache", key = "#user.id")
public User updateUser(User user) {
    System.out.println("Executing updateUser for id: " + user.getId());
    return updateUserInDatabase(user); // 模拟更新数据库
}

参数详解

  • value:指定缓存区域名称,通常为与查询操作相同的命名空间(例如 userCache)。

  • key:指定缓存的键,通常是方法参数的某个属性,如 #user.id 表示将 user 对象的 id 作为缓存键。

应用场景

  • 数据更新操作:适用于更新数据库信息的场景。在数据库更新成功后,@CachePut 会自动更新缓存中的数据,保证缓存和数据库的数据一致性。

  • 缓存同步@CachePut 保证每次都更新缓存,适合在需要同步缓存与数据库的情况下使用。


3. @CacheEvict 注解

@CacheEvict 注解用于清除缓存中的数据。可以指定删除特定的缓存项,也可以通过配置 allEntries 参数清空整个缓存区域。

使用方法

@CacheEvict(value = "userCache", key = "#id", beforeInvocation = true)
public void deleteUserById(Long id) {
    System.out.println("Executing deleteUserById for id: " + id);
    deleteUserFromDatabase(id); // 模拟从数据库中删除用户
}

参数详解

  • value:指定缓存区域的名称(如 userCache)。

  • key:指定要清除的缓存项的键。例如,key = "#id" 表示删除 userCache 中以 id 为键的缓存项。

  • allEntries(可选):当设置为 true 时,清除整个缓存区域内的所有缓存项。默认为 false

  • beforeInvocation(可选):当设置为 true 时,在方法执行前清除缓存。默认值是 false,即方法成功执行后再清除缓存。这在方法可能抛出异常时非常有用,避免方法失败导致缓存不一致。

应用场景

  • 数据删除操作:适用于在从数据库删除数据时,确保清除对应的缓存项。

  • 清空缓存allEntries = true 时,适用于清空整个缓存区域。例如,定期清空用户缓存区中的所有数据,以确保数据的时效性。

  • 防止缓存不一致beforeInvocation = true 可以在方法执行前清除缓存,适合可能因异常导致数据不一致的场景。

4.综合示例:业务逻辑层

import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;

@Service
public class UserService {

    // 查询用户信息并缓存结果
    @Cacheable(value = "userCache", key = "#id", unless = "#result == null")
    public User getUserById(Long id) {
        System.out.println("Fetching user from database, id: " + id);
        return findUserInDatabase(id); // 模拟数据库查询
    }

    // 更新用户信息并刷新缓存
    @CachePut(value = "userCache", key = "#user.id")
    public User updateUser(User user) {
        System.out.println("Updating user in database, id: " + user.getId());
        return updateUserInDatabase(user); // 模拟数据库更新
    }

    // 删除用户信息并清除对应缓存
    @CacheEvict(value = "userCache", key = "#id")
    public void deleteUserById(Long id) {
        System.out.println("Deleting user from database, id: " + id);
        deleteUserFromDatabase(id); // 模拟数据库删除
    }

    // 清空整个 userCache 缓存区域
    @CacheEvict(value = "userCache", allEntries = true)
    public void clearAllUserCache() {
        System.out.println("Clearing all entries in userCache");
    }

    // 模拟数据库操作
    private User findUserInDatabase(Long id) {
        return new User(id, "User" + id); // 返回一个模拟的用户对象
    }

    private User updateUserInDatabase(User user) {
        return user; // 返回更新后的用户
    }

    private void deleteUserFromDatabase(Long id) {
        // 模拟删除操作
    }
}
  • UserService 使用场景:是面向应用的业务服务层,专注于处理用户相关的核心业务逻辑,同时结合简单的缓存策略,如缓存数据的查询、更新和删除等。 当你希望业务逻辑代码中自动处理缓存逻辑时,可以使用 UserService 和相关的缓存注解。它适用于绝大多数需要缓存的业务场景,尤其是在缓存操作较为简单的情况下。
  • RedisService 专注于提供对缓存的直接操作,例如存取、更新、删除、批量操作等。它将缓存的操作与核心业务逻辑分离,提供更灵活的缓存管理功能。在一些高级缓存操作中,可能涉及到一些需要手动控制的缓存策略,比如缓存过期时间的设置、缓存清理、批量删除等。

3. Redis 和 MySQL 的协同缓存机制

3.1.缓存策略设计:哪些数据适合缓存

  • 高频查询的数据

    • 频繁查询的数据非常适合缓存,例如用户资料、商品信息、热门文章等。
    • 这些数据的访问频率高,通过缓存可以显著减少对数据库的访问,降低数据库负载,提高响应速度。
    • 示例:在电商平台中,用户反复访问的商品详情可以放入缓存,这样在用户查看商品时,系统会优先从缓存读取数据。
  • 不经常变化的数据

    • 那些变化频率低、数据更新较少的内容非常适合缓存。因为一旦缓存,这些数据可以在较长时间内有效,减少缓存失效的频率。
    • 示例:像一些系统配置项、字典数据等,这些内容的更新较少,可以长期缓存。例如在用户管理系统中,用户权限的基本配置可以缓存。
  • 聚合查询结果

    • 一些复杂的查询可能涉及到多张表和多个条件的组合查询,执行这些查询通常会耗费资源。对于这些复杂查询的结果,可以缓存到 Redis 中,避免每次都执行繁琐的数据库查询。
    • 示例:如果在社交媒体平台中查询用户的好友推荐列表,这个列表可能涉及到多个条件和数据的关联,通过缓存查询结果可以提升查询效率。
  • 实时性要求不高的数据

    • 对于一些实时性要求不高的数据,可以通过缓存来加速访问。例如,用户的浏览历史、日志信息等。
    • 示例:在新闻类应用中,用户的阅读记录可以缓存,因为这些数据的更新对用户影响不大,也不会影响其他用户的体验。

选择缓存数据的关键考量

在选择数据缓存到 Redis 时,需要平衡以下几个方面:

  • 数据访问的频率:高频访问的数据更适合缓存,因为缓存可以显著降低频繁访问对数据库的负担。
  • 数据更新的频率:频繁更新的数据可能不适合缓存,除非有专门的机制保持缓存数据和源数据的一致性。
  • 数据的大小和结构:缓存的大小需要考虑 Redis 的内存限制。对于特别大的数据或复杂的数据结构,缓存设计需要小心,以避免 Redis 内存不足。

 3.2.读写分离

3.2.1. 读取操作:优先使用缓存,加速查询

在读取操作中,我们遵循以下流程:

  1. 查询 Redis 缓存:首先尝试从 Redis 获取数据。
  2. 缓存命中:如果 Redis 中存在目标数据,则直接返回。
  3. 缓存未命中:如果 Redis 中没有数据,则查询 MySQL,将查询结果存入 Redis 供后续使用。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

@Service
public class ProductService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private ProductRepository productRepository; // 使用 JPA Repository 访问 MySQL

    private static final String CACHE_PREFIX = "productCache::";

    // 读取操作 - 查询商品信息
    public Product getProductById(Long productId) {
        // 1. 从缓存中获取数据
        String cacheKey = CACHE_PREFIX + productId;
        Product product = (Product) redisTemplate.opsForValue().get(cacheKey);

        // 2. 如果缓存命中,直接返回
        if (product != null) {
            System.out.println("Cache hit for product ID: " + productId);
            return product;
        }

        // 3. 缓存未命中,查询数据库
        System.out.println("Cache miss for product ID: " + productId + ", querying database...");
        product = productRepository.findById(productId).orElse(null);

        // 4. 数据库有结果,将结果缓存,并设置过期时间
        if (product != null) {
            redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
            System.out.println("Data cached for product ID: " + productId);
        }

        return product;
    }
}

代码详解

  • 缓存查询:代码通过 redisTemplate.opsForValue().get(cacheKey) 从 Redis 获取数据。cacheKey 是由 CACHE_PREFIXproductId 组成的字符串,如 "productCache::1",用于唯一标识商品。

  • 缓存命中:如果 product 不为 null,说明缓存中存在数据,打印日志提示“缓存命中”并直接返回数据。

  • 缓存未命中:如果缓存中没有目标数据,则查询 MySQL 数据库获取商品信息。成功获取到商品数据后,使用 redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS) 将数据缓存到 Redis 中,并设置过期时间为 1 小时。

实际效果

  • 加速响应:缓存命中时,直接从 Redis 返回数据,避免了数据库查询,显著加速响应速度。
  • 减少数据库压力:Redis 缓存可以减少数据库的重复访问,降低数据库负载。

3.2.2. 写入操作:更新 MySQL 并同步更新 Redis 缓存

写操作会直接影响到数据库和缓存的数据一致性。为了保证一致性,我们在写操作中进行以下步骤:

  1. 更新数据库:首先将数据更新到 MySQL 数据库中,确保持久化存储。
  2. 更新缓存:数据库更新成功后,将新数据写入 Redis 中同步更新缓存。
// 写操作 - 更新商品信息
public Product updateProduct(Product product) {
    // 1. 更新数据库中的商品信息
    product = productRepository.save(product);
    System.out.println("Database updated for product ID: " + product.getId());

    // 2. 更新缓存
    String cacheKey = CACHE_PREFIX + product.getId();
    redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
    System.out.println("Cache updated for product ID: " + product.getId());

    return product;
}

代码详解

  • 更新数据库:代码通过 productRepository.save(product) 将商品信息更新到 MySQL 数据库,save 方法会插入或更新商品记录。

  • 同步更新缓存:数据库更新成功后,将最新的商品数据写入 Redis 缓存中。通过 redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS) 设置缓存键值对,并设定 1 小时的过期时间。这样,在下次读取时可以直接从缓存中获取最新数据,保证数据一致性。

实际效果

  • 保持数据一致性:在更新数据库的同时更新缓存,确保缓存和数据库中的数据一致。
  • 减少缓存失效风险:通过同步更新缓存避免了因缓存过期而产生的额外数据库查询。

3.2.3. 删除操作:清除数据库和缓存中的数据

当我们需要删除数据时,不仅要从数据库中删除数据,还要删除缓存中的对应数据,以确保缓存中不会再返回已删除的数据。

// 删除操作 - 删除商品信息
public void deleteProductById(Long productId) {
    // 1. 从数据库删除商品信息
    productRepository.deleteById(productId);
    System.out.println("Database deleted for product ID: " + productId);

    // 2. 删除缓存
    String cacheKey = CACHE_PREFIX + productId;
    redisTemplate.delete(cacheKey);
    System.out.println("Cache deleted for product ID: " + productId);
}

代码详解

  • 删除数据库记录:代码使用 productRepository.deleteById(productId) 从 MySQL 数据库中删除指定的商品记录。

  • 删除缓存:数据库删除成功后,调用 redisTemplate.delete(cacheKey) 清除 Redis 中的缓存数据,避免读取到已被删除的数据。

实际效果

  • 数据一致性:通过同步删除数据库和缓存中的数据,避免缓存和数据库的不一致。
  • 释放缓存空间:清理无用数据,避免 Redis 中存储已删除的数据,节省内存资源。

3.3.缓存预热与过期策略

3.3.1.缓存预热(Cache Preheating)

1. 什么是缓存预热?

缓存预热是指在系统启动或高峰期到来之前,将一些热点数据提前加载到缓存中。这样可以避免系统启动时缓存为空导致的大量请求直接打到数据库,提升系统的响应速度,减少数据库压力。

2. 实现缓存预热的方式

方式一:使用 @PostConstruct 注解预热缓存

@PostConstruct 是一个 Java 标准注解,标注在方法上后,Spring 会在该 Bean 完成初始化后立即调用该方法。我们可以利用这个特性,在系统启动时自动将热点数据加载到缓存。

实现步骤和代码示例

  1. 创建预热类:定义一个预热类,专门负责将热点数据加载到缓存中。
  2. 编写预热逻辑:在 @PostConstruct 方法中实现数据加载到缓存的逻辑。
  3. 配置缓存过期时间:在预热时为缓存数据设置合理的过期时间。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Component
public class CachePreheat {

    @Autowired
    private ProductService productService;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String CACHE_PREFIX = "productCache::";

    // 使用 @PostConstruct 注解使该方法在系统启动时自动执行
    @PostConstruct
    public void preheatCache() {
        System.out.println("开始缓存预热...");

        // 1. 获取需要预热的热点数据,例如热门商品列表
        List<Product> hotProducts = productService.getHotProducts();

        // 2. 将热点数据加载到缓存并设置过期时间
        for (Product product : hotProducts) {
            String cacheKey = CACHE_PREFIX + product.getId();
            redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS); // 设置缓存过期时间为1小时
        }

        System.out.println("缓存预热完成");
    }
}

代码解释

  • @Component:将 CachePreheat 类声明为 Spring 组件,使其被 Spring 容器管理。
  • @PostConstruct:让 preheatCache() 方法在 Spring 容器初始化完成后自动执行,从而在系统启动时加载热点数据到缓存。
  • productService.getHotProducts():调用 ProductService 中的方法获取热点数据列表,例如用户常访问的热门商品。
  • redisTemplate.opsForValue().set(...):将每条热点数据存入 Redis 缓存,并设置过期时间为 1 小时。

方式二:使用 @Scheduled 注解定时预热缓存

使用定时任务可以定期刷新缓存数据,确保热点数据持续更新。例如,每天凌晨重新加载热点数据,保持缓存数据的有效性。

示例代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Component
public class CachePreheatScheduler {

    @Autowired
    private ProductService productService;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String CACHE_PREFIX = "productCache::";

    // 每天凌晨2点执行缓存预热
    @Scheduled(cron = "0 0 2 * * ?")
    public void preheatCache() {
        System.out.println("定时缓存预热开始...");

        // 获取热点数据并存入缓存
        List<Product> hotProducts = productService.getHotProducts();
        for (Product product : hotProducts) {
            String cacheKey = CACHE_PREFIX + product.getId();
            redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS); // 设置1小时的缓存过期时间
        }

        System.out.println("定时缓存预热完成");
    }
}

代码解释

  • @Scheduled(cron = "0 0 2 * * ?"):每天凌晨 2 点执行缓存预热任务,cron 表达式控制执行时间。
  • productService.getHotProducts():调用 ProductService 获取热点数据。
  • redisTemplate.opsForValue().set(...):将每条数据写入 Redis 缓存并设置过期时间,确保在一定时间内缓存数据是有效的。

3.3.2.缓存过期策略(Expiration Strategy)

缓存过期策略是指在缓存中设置数据的生命周期,以便在适当时间自动失效。合理的过期策略可以避免缓存和数据库之间的数据不一致,并减少缓存的内存占用。

1. 为什么需要缓存过期策略?

  • 保证数据一致性:缓存中的数据和数据库数据可能会发生变化,设置缓存的过期时间可以有效减少这种不一致的时间。
  • 释放内存资源:数据过期后自动释放缓存空间,避免缓存数据长期占用内存。
  • 防止缓存雪崩:如果大量缓存项在同一时间过期,会导致系统突然压力增大,因此在设置过期时间时,可以引入随机化避免集中失效。

2. 缓存过期策略的具体实现

2.1 设置合理的 TTL(生存时间)

根据业务需求和数据的更新频率,选择合适的过期时间。对于变化频率高的数据,可以设置较短的 TTL;对于变化少的数据,可以设置较长的 TTL。

代码示例

// 设置商品详情的缓存过期时间为1小时
String cacheKey = CACHE_PREFIX + product.getId();
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);

代码解释

  • redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS):缓存商品详情,并设置生存时间为 1 小时,确保缓存中的数据不会长期存在。
  • 这样可以有效控制缓存生命周期,使得缓存数据在 1 小时后失效,减少数据不一致的风险。

2.2 添加过期时间的随机化

如果大量缓存项设置了相同的过期时间,可能导致它们在同一时间失效,产生缓存雪崩。在基础过期时间上添加一个随机数,避免集中失效。

代码示例

int baseExpireTime = 60 * 60; // 基础过期时间为1小时
int randomTime = new Random().nextInt(300); // 随机增加0~300秒
redisTemplate.opsForValue().set(cacheKey, product, baseExpireTime + randomTime, TimeUnit.SECONDS);

代码解释

  • baseExpireTime = 60 * 60:基础过期时间设为 1 小时。
  • randomTime = new Random().nextInt(300):生成 0 到 300 秒之间的随机数。
  • baseExpireTime + randomTime:在基础过期时间上加一个随机值,防止大量缓存数据同时失效,减少缓存雪崩的风险。

3.3.3.避免数据不一致的策略

缓存与数据库的数据可能会出现不一致的情况,特别是在高并发场景中。为此,常用的策略包括“先更新数据库,后删除缓存”、延时双删策略和使用消息队列同步缓存等方法。

1. 先更新数据库,后删除缓存

这种方法的思路是,在数据更新时,先更新数据库,然后再删除缓存中的数据。这样可以确保缓存和数据库的一致性,因为数据库中的数据是最新的。

示例代码

public Product updateProduct(Product product) {
    // 1. 更新数据库
    product = productRepository.save(product);

    // 2. 删除缓存
    String cacheKey = CACHE_PREFIX + product.getId();
    redisTemplate.delete(cacheKey); // 删除缓存

    return product;
}

代码解释

  • productRepository.save(product):更新数据库记录,确保数据库数据是最新的。
  • redisTemplate.delete(cacheKey):删除 Redis 缓存,确保缓存数据不再存在。

2. 延时双删策略

在高并发情况下,删除缓存后可能会有并发请求查询未更新的数据库,并将旧数据再次写入缓存,导致数据不一致。延时双删策略可以有效解决这个问题。

延时双删的实现步骤

  1. 更新数据库:先更新数据库中的数据。
  2. 第一次删除缓存:更新数据库后立即删除缓存。
  3. 延时操作:线程短暂休眠一段时间,以处理其他并发请求。
  4. 再次删除缓存:延时后再次删除缓存,确保缓存中的旧数据完全清除。

示例代码

public Product updateProduct(Product product) {
    // 1. 更新数据库
    product = productRepository.save(product);

    // 2. 删除缓存
    String cacheKey = CACHE_PREFIX + product.getId();
    redisTemplate.delete(cacheKey);

    // 3. 休眠500毫秒
    try {
        Thread.sleep(500); // 等待可能的并发请求完成
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    // 4. 再次删除缓存
    redisTemplate.delete(cacheKey);

    return product;
}

代码解释

  • productRepository.save(product):更新数据库数据。
  • redisTemplate.delete(cacheKey):第一次删除缓存中的数据,防止读取到旧数据。
  • Thread.sleep(500):延时 500 毫秒,确保数据库已更新,其他并发请求已完成。
  • redisTemplate.delete(cacheKey):再次删除缓存,确保缓存不再存有旧数据。

3. 使用消息队列同步缓存

在复杂场景中,可以引入消息队列(如 RabbitMQ、Kafka 等),在数据库更新后发送缓存更新的消息,由消息队列异步同步缓存,确保缓存数据的一致性。

实现步骤和示例代码

  1. 发送缓存更新消息:在数据库更新后,将需要更新的缓存键发送到消息队列中。
  2. 监听并处理消息:在需要同步缓存的服务中,监听消息队列,根据接收到的消息内容更新或删除缓存。

示例代码

 以下的示例代码是已经在Spring boot中集成了RabbitMQ之后的代码。

生产者:在数据库更新后发送缓存更新消息。

public Product updateProduct(Product product) {
    // 更新数据库
    product = productRepository.save(product);

    // 发送缓存更新消息
    cacheUpdateSender.sendCacheUpdateMessage(product.getId());

    return product;
}

消费者:监听队列并更新或删除缓存。

@Component
public class CacheUpdateListener {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private ProductRepository productRepository;

    private static final String CACHE_PREFIX = "productCache::";

    @RabbitListener(queues = "cacheUpdateQueue")
    public void handleCacheUpdate(Long productId) {
        // 获取最新的数据
        Product product = productRepository.findById(productId).orElse(null);

        // 更新缓存
        if (product != null) {
            String cacheKey = CACHE_PREFIX + productId;
            redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
        }
    }
}

代码解释

  • cacheUpdateSender.sendCacheUpdateMessage(product.getId()):数据库更新后,发送缓存更新消息到消息队列。
  • @RabbitListener(queues = "cacheUpdateQueue"):监听消息队列,获取缓存更新请求。
  • redisTemplate.opsForValue().set(...):接收到消息后更新 Redis 中的数据,并设置 1 小时的过期时间。

删除缓存后是否需要重新加载缓存?

在一些情况下,删除缓存后并不需要立即重新加载缓存,原因包括:

  • 减少数据库压力:如果每次删除后立即重新加载,可能会增加数据库的查询压力。
  • 热点数据自动回填:对于热点数据,当有用户访问时会自动回填缓存,因此可以根据访问需求动态更新。

3.4.缓存失效后清理缓存

存失效策略主要用于确保在数据更新或删除时,及时清除过期的缓存数据,避免缓存和数据库之间出现不一致。我们可以通过以下两种方式来实现缓存失效:

  • 利用 @CacheEvict 注解:简化缓存失效操作,适合常规的缓存清理需求,包括更新、删除和批量清理。
  • 手动清理缓存:使用 Redis API 手动删除缓存项,适合复杂业务需求下的精准控制,便于处理分布式缓存清理和条件清理的场景。

3.4.1.利用 @CacheEvict 注解实现缓存失效

@CacheEvict 是 Spring Cache 提供的注解,用于在方法执行后清除指定的缓存。它可以让我们在方法调用后自动清除缓存,不需要手动调用 Redis API,从而简化代码。

1. @CacheEvict 注解的常用参数

  • value:指定缓存的名称(即命名空间)。缓存名称在 Redis 中通常作为缓存键的前缀。
  • key:指定要删除的缓存项的键,可以使用 SpEL 表达式(如 #id 表示使用方法参数 id 作为缓存键)。
  • allEntries:设置为 true 时会清空指定缓存区域中的所有缓存项。默认为 false
  • beforeInvocation:设置为 true 时,缓存会在方法执行之前失效;默认为 false,即方法执行成功后才清除缓存。

2. 使用 @CacheEvict 实现缓存失效的场景

@CacheEvict 适合用于以下场景:

  • 更新操作:当数据库中的数据被修改时,需要使缓存中的旧数据失效。
  • 删除操作:当从数据库删除数据时,需要清除缓存中的对应数据,确保不会再访问到已删除的数据。
  • 批量失效:有时需要清空整个缓存区域(如清空用户缓存),可以使用 allEntries=true 实现批量失效。

3. @CacheEvict的具体使用

更新操作中使用 @CacheEvict

在更新操作中,可以使用 @CacheEvict 注解让缓存自动失效。假设我们有一个 ProductService 服务类,当更新产品信息时,我们希望自动清除该产品的缓存。

import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Service;

@Service
public class ProductService {

    @CacheEvict(value = "productCache", key = "#product.id")
    public Product updateProduct(Product product) {
        System.out.println("更新数据库中的产品信息");
        // 这里假设执行数据库更新操作
        return saveProductToDatabase(product); // 模拟数据库更新
    }

    private Product saveProductToDatabase(Product product) {
        // 模拟数据库保存逻辑
        return product;
    }
}

代码解释

  • @CacheEvict(value = "productCache", key = "#product.id"):指定当 updateProduct 方法执行后,清除 productCache 缓存区域中键为 product.id 的缓存项。
  • saveProductToDatabase(product):更新数据库中的产品信息,确保数据库数据是最新的。
  • 效果:调用 updateProduct 方法时,缓存中旧的产品信息会自动失效,下次查询时会从数据库获取最新的数据。

删除操作中使用 @CacheEvict

在删除操作中,可以使用 @CacheEvict 注解让缓存中对应的数据自动失效。假设我们有一个删除产品的方法 deleteProductById,希望删除数据库中的数据后清除缓存。

import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Service;

@Service
public class ProductService {

    @CacheEvict(value = "productCache", key = "#productId")
    public void deleteProductById(Long productId) {
        System.out.println("从数据库中删除产品信息,ID: " + productId);
        // 这里假设执行数据库删除操作
        deleteProductFromDatabase(productId); // 模拟数据库删除
    }

    private void deleteProductFromDatabase(Long productId) {
        // 模拟数据库删除逻辑
    }
}

代码解释

  • @CacheEvict(value = "productCache", key = "#productId"):删除 productCache 中键为 productId 的缓存项。
  • deleteProductFromDatabase(productId):从数据库删除产品信息。
  • 效果:执行 deleteProductById 时,Redis 中缓存的产品数据会自动清除,避免用户访问到已删除的数据。

批量删除缓存中的所有项

在某些场景中(如大规模数据更新或系统重启),我们可能需要清空某个缓存区域的所有缓存项。这时可以使用 allEntries=true 实现批量清除。

import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Service;

@Service
public class ProductService {

    @CacheEvict(value = "productCache", allEntries = true)
    public void clearAllProductCache() {
        System.out.println("清空所有产品缓存");
        // 执行批量清除缓存的逻辑
    }
}

代码解释

  • @CacheEvict(value = "productCache", allEntries = true):清空 productCache 缓存区域中的所有缓存项。
  • 效果:执行 clearAllProductCache 方法时,productCache 中的所有缓存项将被清除。

3.4.2.手动清理缓存(在数据库更新后手动清理缓存)

在某些情况下,我们需要更精细地控制缓存清理逻辑。例如,可能需要在执行一系列更新操作后统一清除缓存,或者在复杂业务逻辑中根据特定条件手动清理缓存。此时可以手动调用 Redis API 来删除缓存。

1. 手动清理缓存的场景

手动清理缓存适合以下场景:

  • 复杂的缓存清理逻辑:当需要清除多个缓存项或批量缓存时,手动清理可以提供更高的灵活性。
  • 分布式环境:在分布式系统中,一个服务更新数据后需要通知其他服务清理缓存。
  • 条件清理:当缓存清理条件较复杂时,手动清理可以实现更精准的控制。

假设在更新产品信息后,我们希望手动清理 Redis 中的缓存项,而不是使用 @CacheEvict 注解。

2.手动清理缓存的具体代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class ProductService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String CACHE_PREFIX = "productCache::";

    public Product updateProduct(Product product) {
        System.out.println("更新数据库中的产品信息");
        // 1. 更新数据库中的产品信息
        product = saveProductToDatabase(product);

        // 2. 手动清理缓存
        String cacheKey = CACHE_PREFIX + product.getId();
        redisTemplate.delete(cacheKey);
        System.out.println("手动清理缓存项: " + cacheKey);

        return product;
    }

    private Product saveProductToDatabase(Product product) {
        // 模拟数据库保存逻辑
        return product;
    }
}

代码解释

  • saveProductToDatabase(product):更新数据库中的产品信息。
  • redisTemplate.delete(cacheKey):手动删除 Redis 中对应的缓存项。
  • 效果:在 updateProduct 方法中手动清理缓存,确保缓存中的数据是最新的。可以根据业务需求灵活控制缓存的清理时机。

3.批量清理缓存示例

如果需要清理多个缓存项或指定前缀的缓存,可以使用 RedisTemplate 提供的批量删除方法。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Set;

@Service
public class ProductService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String CACHE_PREFIX = "productCache::";

    public void clearProductCacheByPrefix(String prefix) {
        // 1. 找到所有匹配指定前缀的缓存键
        Set<String> keys = redisTemplate.keys(CACHE_PREFIX + prefix + "*");

        // 2. 批量删除缓存项
        if (keys != null && !keys.isEmpty()) {
            redisTemplate.delete(keys);
            System.out.println("清理缓存项: " + keys);
        }
    }
}

代码解释

  • redisTemplate.keys(CACHE_PREFIX + prefix + "*"):查找所有符合指定前缀的缓存键,返回一个键的集合。
  • redisTemplate.delete(keys):批量删除这些缓存项,确保缓存中不会有过时的数据。
  • 效果:可以灵活地根据前缀清理多个缓存项,例如清理所有以 "productCache::category:" 开头的缓存项。

4. 分布式锁的实现

  • Redis 的 SETNX 和 EXPIRE 实现:通过手动使用 SETNXEXPIRE 实现分布式锁,但在高并发场景中安全性不如 Redisson。
  • Redisson 的 RLock 实现:Redisson 提供了简化的 RLock API,用于安全可靠的分布式锁管理,适合复杂的业务需求,自动续期和异常安全,建议使用 Redisson 实现分布式锁。

Redis 在缓存和服务层的业务逻辑中都能发挥作用,适合那些读写频繁、实时性高的场景。在缓存中,它加速数据访问;在服务层,它为业务逻辑中的高并发控制、分布式锁、计数和限流等需求提供解决方案。  

6.1.使用 SETNXEXPIRE 命令手动实现分布式锁 

1. 分布式锁的原理

Redis 的 SETNXEXPIRE 命令可以实现基础的分布式锁功能:

  • SETNX(SET if Not Exists):尝试设置一个键(代表锁),如果该键不存在,则设置成功,表示锁被成功获取;如果键已存在,表示锁被占用。
  • EXPIRE:为锁设置过期时间,确保锁在持有方崩溃或出错时能自动释放,避免死锁。

实现步骤如下:

  1. 获取锁:使用 SETNX 设置一个锁键。如果返回成功,表示当前客户端获取到锁;否则获取锁失败。
  2. 设置过期时间:在获取锁成功后,使用 EXPIRE 为锁设置一个过期时间,防止锁因意外情况无法释放。
  3. 释放锁:在操作完成后,客户端主动删除锁,释放资源。

2. 在 Spring Boot 中实现 Redis 分布式锁

我们可以通过 StringRedisTemplate 来操作 Redis,创建一个分布式锁服务类 RedisLockService,用于管理锁的获取和释放。

代码示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class RedisLockService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String LOCK_KEY = "lock_key"; // 锁的键名称
    private static final int LOCK_EXPIRE = 10; // 锁的过期时间,10秒

    // 尝试获取锁
    public boolean tryLock() {
        // 使用 SETNX 命令尝试获取锁
        Boolean success = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "1");
        if (Boolean.TRUE.equals(success)) {
            // 获取锁成功后,使用 EXPIRE 设置锁的过期时间
            redisTemplate.expire(LOCK_KEY, LOCK_EXPIRE, TimeUnit.SECONDS);
            return true; // 返回 true 表示成功获取到锁
        }
        return false; // 获取锁失败,返回 false
    }

    // 释放锁
    public void unlock() {
        redisTemplate.delete(LOCK_KEY); // 删除锁键,释放锁资源
    }
}

代码详细解释

  • tryLock() 方法:用于尝试获取锁

    • setIfAbsent(LOCK_KEY, "1"):使用 SETNX 命令尝试设置锁。如果成功返回 true,表示当前客户端获取到锁。如果返回 false,表示锁已被占用。
    • expire(LOCK_KEY, LOCK_EXPIRE, TimeUnit.SECONDS):设置锁的过期时间为 10 秒,防止锁在客户端意外退出时长期占用。
  • unlock() 方法:用于释放锁

    • delete(LOCK_KEY):删除锁键,释放锁资源。确保操作完成后锁被释放,让其他客户端可以继续获取该锁。

3. 使用示例:库存扣减操作

假设我们有一个商品库存扣减的场景,为了防止多个请求同时扣减库存导致超卖问题,可以通过分布式锁来确保同一时间只有一个请求可以扣减库存。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class InventoryService {

    @Autowired
    private RedisLockService redisLockService;

    public String deductInventory(String productId) {
        // 尝试获取锁
        if (redisLockService.tryLock()) {
            try {
                // 执行扣减库存的逻辑
                boolean success = reduceStock(productId);
                return success ? "扣减成功" : "库存不足";
            } finally {
                // 确保操作完成后释放锁
                redisLockService.unlock();
            }
        } else {
            return "请稍后再试"; // 如果获取锁失败,提示用户稍后重试
        }
    }

    private boolean reduceStock(String productId) {
        System.out.println("扣减库存,产品ID: " + productId);
        return true; // 假设库存扣减成功
    }
}

使用示例的代码解释

  • tryLock():调用 tryLock() 方法尝试获取锁,如果成功,则执行扣减库存操作。
  • 业务逻辑:调用 reduceStock(productId) 方法执行库存扣减操作,确保在持有锁的情况下完成操作。
  • 释放锁:操作完成后无论成功与否,调用 unlock() 方法释放锁。

4. 锁的有效期与安全性

由于 SETNXEXPIRE 不是原子操作,存在并发情况下锁的过期时间可能未设置的问题。虽然可以通过 Lua 脚本来实现原子性,但这种实现较为复杂。为确保分布式锁的可靠性和安全性,通常建议在高并发场景中使用 Redisson 来简化操作和提升安全性。

6.2.使用Redisson的RLock实现分布式锁

Redisson 是一个 Redis 客户端,封装了 Redis 的锁机制,并提供了 RLock 接口来管理分布式锁。相比使用 SETNXEXPIRE 组合手动管理锁的方式,Redisson 的分布式锁具备以下优势:

  1. 自动续期:Redisson 的 RLock 支持锁的自动续期,确保在长时间持有锁时不意外失效。
  2. 可重入性:同一线程可以多次获取同一锁,不会造成死锁。
  3. 高可靠性:Redisson 提供了丰富的 API,可以灵活管理锁的超时和等待时间,适合高并发场景。
  • Redisson 的自动续期机制设计的核心是确保锁在“正常持有”时不会意外释放,但一旦持有锁的线程中断或崩溃,续期停止,锁在过期时间到达后自动释放。这样既保证了锁在正常情况下的稳定性,也确保了线程异常时的自动释放。 
  • 在业务逻辑中,可能会出现递归调用或嵌套调用的情况,即一个持有锁的方法在调用的过程中又尝试获取同一锁。支持可重入性后,同一线程在持有锁的情况下可以再次获取锁,不会发生阻塞或死锁
  • 在分布式系统中,业务逻辑通常分为多个层次,每一层可能都有自己的锁需求。可重入性使得在多个层次调用同一锁时不需要担心冲突,同一线程可以在各个层次持有同一把锁

6.2.1.使用步骤 

1. 引入 Redisson 依赖

首先,在 Spring Boot 项目中添加 Redisson 的依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.15.3</version> <!-- 请使用最新版本 -->
</dependency>

2. 配置 RedissonClient Bean

application.properties 文件中配置 Redis 的连接信息:

spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=yourpassword   # 如果没有设置密码,可以省略这一行
spring.redis.timeout=3000            # 连接超时时间(毫秒)

接下来,创建一个 RedissonConfig 配置类,将 RedissonClient 配置为一个 Spring Bean。通过这个 Bean,可以在项目中方便地获取 RLock 对象,实现分布式锁。

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return Redisson.create(config);
    }
}

配置解释

  • Config config = new Config();:创建 Redisson 的配置对象。
  • config.useSingleServer().setAddress("redis://localhost:6379");:设置 Redis 服务器的地址。
  • return Redisson.create(config);:通过配置创建 RedissonClient 实例,并注册为 Spring Bean。

3. 获取 RLock 对象

在业务代码中,可以通过 RedissonClient 获取 RLock 对象。 RLock 是一个分布式锁接口,提供了丰富的锁管理方法,支持可重入和自动续期功能。

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class LockService {

    @Autowired
    private RedissonClient redissonClient;

    public void lockExample() {
        RLock lock = redissonClient.getLock("my_lock");
        // 使用 lock 进行分布式锁操作
    }
}

6.2.2. RLock 的常用方法和使用示例

RLock 提供了多种方法来管理分布式锁的获取和释放,以下是常用方法及其使用示例:

(1)lock():阻塞获取锁

该方法会阻塞直到获取到锁,适合在没有超时等待要求的场景。

public void lockExample() {
    RLock lock = redissonClient.getLock("my_lock");
    lock.lock(); // 阻塞获取锁
    try {
        // 执行同步的业务逻辑
    } finally {
        lock.unlock(); // 释放锁
    }
}

解释

  • lock.lock():阻塞方式获取锁,如果锁已经被其他线程持有,则等待直至锁被释放。
  • lock.unlock():释放锁,让其他线程可以获取该锁。

(2)tryLock():非阻塞获取锁

tryLock() 方法用于尝试获取锁。如果锁已经被持有,则立即返回 false,不阻塞等待。

public boolean tryLockExample() {
    RLock lock = redissonClient.getLock("my_lock");
    boolean locked = lock.tryLock(); // 非阻塞获取锁
    if (locked) {
        try {
            // 执行同步的业务逻辑
            return true;
        } finally {
            lock.unlock(); // 释放锁
        }
    } else {
        System.out.println("获取锁失败,锁已被持有");
        return false;
    }
}

解释

  • tryLock():立即尝试获取锁,如果锁被持有则返回 false,表示获取锁失败;如果锁可用则返回 true,表示获取锁成功。

(3)tryLock(long waitTime, long leaseTime, TimeUnit unit):带超时的获取锁

这个方法支持等待和超时设置,可以在锁被其他线程持有时等待一定时间。如果在等待时间内未获取到锁则放弃请求,同时设置锁的持有时间。

  • 等待时间:当前线程等待的最大时间。
  • 持有时间:锁成功获取后自动释放的时间。
public boolean tryLockWithTimeoutExample() {
    RLock lock = redissonClient.getLock("my_lock");
    try {
        // 等待时间为5秒,锁的持有时间为10秒
        if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
            try {
                // 执行同步的业务逻辑
                return true;
            } finally {
                lock.unlock(); // 释放锁
            }
        } else {
            System.out.println("获取锁超时,未能成功获取锁");
            return false;
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        System.out.println("获取锁时发生异常");
        return false;
    }
}

解释

  • tryLock(5, 10, TimeUnit.SECONDS):尝试获取锁,等待最多 5 秒,如果在 5 秒内获取到锁,持有 10 秒后自动释放。

实际应用示例:在库存扣减中使用 RLock

假设一个电商系统中有一个库存扣减的操作,为了防止超卖,需要确保每次只有一个线程能够扣减库存,可以通过 RLock 实现这一功能。

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class InventoryService {

    @Autowired
    private RedissonClient redissonClient;

    private static final String LOCK_KEY = "inventory_lock"; // 锁的键名称

    public String deductInventory(String productId) {
        RLock lock = redissonClient.getLock(LOCK_KEY);
        try {
            // 尝试加锁,等待时间5秒,锁定时间10秒
            if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
                try {
                    // 执行扣减库存的逻辑
                    boolean success = reduceStock(productId);
                    return success ? "扣减成功" : "库存不足";
                } finally {
                    lock.unlock(); // 释放锁
                }
            } else {
                return "请稍后再试"; // 获取锁失败
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "获取锁失败";
        }
    }

    private boolean reduceStock(String productId) {
        System.out.println("扣减库存,产品ID: " + productId);
        return true; // 假设库存扣减成功
    }
}

代码解释

  • redissonClient.getLock(LOCK_KEY):获取一个分布式锁对象 RLock,锁的名称为 "inventory_lock"
  • tryLock(5, 10, TimeUnit.SECONDS):尝试获取锁,等待时间 5 秒,持有时间 10 秒。
  • 释放锁:通过 lock.unlock() 确保在业务逻辑执行完成后释放锁。

6.3. 加锁后为什么缓存数据仍可能被其他线程修改

这是因为Redis 分布式锁和缓存并没有直接联系,它们是两个独立的系统。锁的机制只是确保在同一时间只有一个线程可以执行某段代码,并不保证锁住的内容不会被其他线程修改。

在分布式锁场景中,缓存和锁是分离的:

  • 锁控制代码逻辑的独占访问:通过 lock.tryLock(),确保只有一个线程可以访问数据库和缓存更新逻辑(即从数据库获取数据并更新缓存)。
  • 缓存并没有被锁住:在 Redis 中,缓存键的读写是独立的,任何线程都可以访问缓存并进行修改,锁并不能限制其他线程对缓存的访问。

5. 缓存穿透、击穿与雪崩的防范

5.1. 缓存穿透

5.1.1什么是缓存穿透

缓存穿透指的是请求的数据在缓存和数据库中都不存在,这种请求直接穿过缓存访问数据库。当恶意请求大量涌入时,缓存层无法拦截这些无效请求,导致数据库承受大量压力,从而影响性能。比如,用户可能不断请求一个不存在的商品 ID,Redis 缓存中不存在此数据,数据库中也没有。当此类请求频繁出现时,会绕过缓存层直接请求数据库,造成缓存穿透。

5.1.2.什么是布隆过滤器

布隆过滤器(Bloom Filter)是一种概率性数据结构,用于判断一个元素是否存在于集合中。它可以在较少的内存占用下实现快速判断,并且在大多数情况下准确可靠。布隆过滤器能够有效防止缓存穿透,拦截无效请求,从而减少数据库访问压力。

布隆过滤器的工作原理

布隆过滤器由一个长度为 m 的位数组(bit array)和 k 个不同的哈希函数组成。布隆过滤器的基本操作步骤如下:

  • 插入元素:当插入一个元素(如 key)时,布隆过滤器会对该元素通过 k 个哈希函数分别计算出 k 个哈希值,然后将位数组中对应的 k 个位置设为 1

  • 查询元素:当需要判断一个元素是否存在时,对该元素进行相同的 k 个哈希计算,检查位数组中对应的 k 个位置。如果所有位置的值都是 1,则表示该元素“可能存在”;如果有任意一个位置的值为 0,则可以确定该元素不存在。

布隆过滤器的误判问题

由于布隆过滤器的概率性,如果查询结果是“存在”,并不能完全保证该数据确实存在(存在一定的误判概率);但如果查询结果为“不存在”,则可以确定该数据绝对不存在。因此,布隆过滤器非常适合用于防止缓存穿透,因为它能高效筛除不存在的数据请求。

布隆过滤器的优势

  • 拦截无效请求:布隆过滤器可以有效拦截不存在的数据请求,将大量无效请求挡在缓存层之外,避免对数据库的冲击。

  • 高效、低成本:布隆过滤器占用的空间非常小,通过位数组和哈希函数实现,查询速度快且内存占用少,适合大规模数据场景。

  • 减少数据库负担:通过提前过滤掉不存在的数据请求,可以显著降低数据库的负载,提升系统性能。 

5.1.3.配置布隆过滤器

Spring Boot 中可以使用 Redisson 提供的布隆过滤器来实现防止缓存穿透。Redisson 是一个强大的 Redis 客户端,支持布隆过滤器等高级功能。我们将以下列代码来演示如何使用布隆过滤器。

代码示例

首先,确保项目中已经添加了 Redisson 依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.15.3</version> <!-- 使用最新版本 -->
</dependency>

然后,我们实现一个布隆过滤器服务 BloomFilterService。在该服务中初始化布隆过滤器,将所有合法的 key 添加进去,并提供判断方法来检测 key 是否存在。

布隆过滤器服务类

import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class BloomFilterService {

    private static final String BLOOM_FILTER_NAME = "productBloomFilter";

    @Autowired
    private RedissonClient redissonClient;

    // 初始化布隆过滤器
    public void initBloomFilter() {
        // 获取 Redis 布隆过滤器对象
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(BLOOM_FILTER_NAME);

        // 初始化布隆过滤器的大小和误判率
        bloomFilter.tryInit(1000000L, 0.01); // 设置预期插入量为100万,误判率为1%

        // 将所有合法的 key 添加到布隆过滤器中
        bloomFilter.add("product_12345");
        bloomFilter.add("product_67890");
        // 可以通过批量读取数据库中的所有商品 ID,将它们批量加入布隆过滤器
    }

    // 判断 key 是否在布隆过滤器中
    public boolean existsInBloomFilter(String key) {
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter(BLOOM_FILTER_NAME);
        return bloomFilter.contains(key); // 判断 key 是否可能存在于布隆过滤器中
    }
}

代码详细解释

  • 初始化布隆过滤器

    • 使用 redissonClient.getBloomFilter(BLOOM_FILTER_NAME) 获取布隆过滤器实例。
    • 调用 tryInit 方法设置布隆过滤器的大小(即预期的插入量)和误判率。例如设置为 100 万个 key,误判率为 1%。
    • 误判率越低,布隆过滤器的空间开销越大。因此误判率的选择要在内存占用和准确性之间找到平衡。
  • 加入合法的 key

    • 通过 bloomFilter.add 将所有合法的 key(如数据库中所有商品 ID)加入布隆过滤器。可以从数据库中批量读取这些 key,然后依次加入布隆过滤器。
  • 判断 key 是否存在

    • 在处理请求时,首先调用 existsInBloomFilter 方法检查 key 是否在布隆过滤器中。
    • 如果布隆过滤器返回 false,表示该 key 不存在,不需要再访问数据库和缓存,直接返回空结果。
    • 如果返回 true,表示该 key 可能存在,继续访问缓存或数据库获取数据。

5.1.4.使用布隆过滤器示例

在缓存穿透的防范中,我们可以通过布隆过滤器提前筛除无效请求:

  • 在系统启动时,将所有有效的 key(如数据库中已有商品的 ID)存入布隆过滤器。
  • 在每次查询之前,先判断 key 是否存在于布隆过滤器中:
    • 如果布隆过滤器判断为不存在,则直接返回空结果,不去查询数据库。
    • 如果布隆过滤器判断为存在,则继续检查缓存,如果缓存未命中再去访问数据库
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class ProductService {

    private static final String CACHE_KEY_PREFIX = "product_cache_";

    @Autowired
    private BloomFilterService bloomFilterService;

    @Autowired
    private StringRedisTemplate redisTemplate;

    public String getProduct(String productId) {
        // 1. 使用布隆过滤器判断是否可能存在该商品
        if (!bloomFilterService.existsInBloomFilter("product_" + productId)) {
            return "商品不存在"; // 布隆过滤器判断不存在,直接返回
        }

        // 2. 查询缓存
        String cacheKey = CACHE_KEY_PREFIX + productId;
        String cachedProduct = redisTemplate.opsForValue().get(cacheKey);

        if (cachedProduct != null) {
            return cachedProduct; // 缓存命中,返回缓存数据
        }

        // 3. 查询数据库(缓存未命中,且布隆过滤器判断可能存在)
        String dbProduct = queryDatabase(productId);

        if (dbProduct != null) {
            // 将查询结果加入缓存,避免下次穿透
            redisTemplate.opsForValue().set(cacheKey, dbProduct);
        }

        return dbProduct != null ? dbProduct : "商品不存在";
    }

    private String queryDatabase(String productId) {
        // 模拟数据库查询
        System.out.println("查询数据库,商品ID:" + productId);
        // 假设数据库中存在商品 "product_12345"
        if ("12345".equals(productId)) {
            return "Product data for " + productId;
        }
        return null;
    }
}

代码解释

  • 布隆过滤器判断:在查询商品之前,首先调用 bloomFilterService.existsInBloomFilter 方法判断该商品 ID 是否可能存在。如果布隆过滤器返回 false,说明数据库中也不存在该数据,可以直接返回“该商品不存在”。

  • 缓存查询:如果布隆过滤器判断数据可能存在,则继续查询 Redis 缓存。

  • 数据库查询:如果缓存未命中(即 Redis 中不存在该商品的缓存数据),则查询数据库获取数据,并将数据缓存到 Redis 中,避免下次请求时再次访问数据库。

5.2.缓存击穿 

5.2.1什么是缓存击穿?

缓存击穿指的是缓存中某个热点数据失效,导致大量请求直接访问数据库的情况。通常在高并发的场景下,一个热点数据被大量访问,突然失效后所有请求同时涌入数据库,增加数据库负担,可能导致性能瓶颈或崩溃。

典型场景

假设一个电商平台有一个非常热门的商品,所有用户都在查询该商品信息。当该商品的缓存突然过期后,大量请求会直接查询数据库,给数据库带来巨大压力。


缓存击穿的防范措施

  • 加锁机制:在缓存失效时,通过加锁确保只有一个线程可以查询数据库并重建缓存,其他线程等待锁释放后再读取缓存。此方法适用于并发访问较高的热点数据。

  • 热点数据永不过期:对极少数热点数据设置永不过期,手动更新这些数据的缓存,确保高频数据不会因缓存过期而直接冲击数据库。此方法适用于访问频繁且数据更新较少的场景。


5.2.2. 使用加锁机制解决缓存击穿

在缓存失效时,可以通过分布式锁确保只有一个线程访问数据库并更新缓存,其他线程等待锁释放后再读取缓存。这样可以避免大量并发请求直接冲击数据库。

实现原理

  1. 查询缓存:先查询缓存,如果缓存命中则直接返回结果。
  2. 加锁:如果缓存未命中,尝试获取分布式锁,确保只有一个线程可以访问数据库。
  3. 双重检查缓存:在获取锁后再次检查缓存,防止在获取锁期间缓存已被其他线程更新。
  4. 查询数据库并更新缓存:如果缓存仍然未命中,查询数据库并将数据写入缓存。
  5. 释放锁:数据库查询和缓存更新完成后,释放锁,让其他线程可以直接读取缓存。

这里为什么加锁之后还能让其他线程修改缓存:因为加锁的意思是对这段代码进行加锁,这段代码的操作只有被加锁的线程能够进行,但缓存是任何线程都可以修改,加锁只是对这段操作加锁,并不是对缓存加锁。 

实现代码示例

以下是一个在 Spring Boot 中使用 Redisson 的加锁机制实现缓存击穿的防范示例:

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class CacheService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    private static final String LOCK_KEY = "product_lock_";
    private static final String CACHE_KEY = "product_cache_";

    public String getProduct(String productId) {
        // 1. 查询缓存
        String cacheValue = redisTemplate.opsForValue().get(CACHE_KEY + productId);
        if (cacheValue != null) {
            return cacheValue; // 缓存命中,直接返回
        }

        // 2. 缓存未命中,尝试加锁
        RLock lock = redissonClient.getLock(LOCK_KEY + productId);
        try {
            if (lock.tryLock(5, 10, TimeUnit.SECONDS)) { // 获取锁,等待5秒,锁定10秒
                // 3. 双重检查缓存是否已经被其他线程更新
                cacheValue = redisTemplate.opsForValue().get(CACHE_KEY + productId);
                if (cacheValue != null) {
                    return cacheValue; // 如果缓存已经被其他线程更新,直接返回
                }

                // 4. 查询数据库
                String dbValue = queryDatabase(productId);

                // 5. 更新缓存,设置过期时间
                redisTemplate.opsForValue().set(CACHE_KEY + productId, dbValue, 10, TimeUnit.MINUTES);

                return dbValue;
            } else {
                // 获取锁失败,等待缓存更新完成后重新获取
                Thread.sleep(100); // 可以增加重试机制
                return redisTemplate.opsForValue().get(CACHE_KEY + productId);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock(); // 6. 释放锁
        }
    }

    private String queryDatabase(String productId) {
        // 模拟数据库查询
        System.out.println("查询数据库,商品ID:" + productId);
        return "Product data for " + productId;
    }
}

代码详细解释

  • 查询缓存:首先从缓存中读取数据。如果缓存命中则直接返回结果,避免了不必要的锁操作。

  • 加锁查询数据库

    • 如果缓存未命中,使用 Redisson 获取分布式锁 lock,确保只有一个线程可以查询数据库并更新缓存。
    • 双重检查缓存:在成功加锁后再次检查缓存,防止在等待锁的过程中其他线程已经更新了缓存。
  • 重建缓存:持有锁的线程从数据库中读取数据并更新到缓存中,同时设置缓存的过期时间。

  • 释放锁:缓存更新完成后,释放锁,确保其他线程可以从缓存读取最新数据。

优势

  • 防止缓存击穿:加锁确保在缓存失效时,只有一个线程查询数据库并更新缓存,避免大量并发请求直接冲击数据库。
  • 双重检查减少重复操作:锁内再次检查缓存,确保其他线程不会重复查询数据库,减少数据库压力。

5.2.3. 设置热点数据永不过期

对于少数非常高频访问的热点数据,可以将其缓存设置为永不过期,这样确保数据不会失效,从而避免缓存击穿。

实现原理

  • 热点数据初始化:系统启动时将热点数据加载到缓存中,设置为永不过期。
  • 数据更新策略:通过数据库更新事件或定时任务主动更新热点缓存数据,而不依赖过期机制。

这种方式适合稳定且访问频繁的数据,例如电商首页的推荐商品、分类信息等,可以有效减少缓存失效的概率。

代码示例

以下示例展示如何在 Spring Boot 中将一些热点数据设置为永不过期,并通过手动更新的方式维护缓存数据。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class HotDataCacheService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String CACHE_KEY = "hot_product_cache_";

    public String getHotProduct(String productId) {
        // 查询缓存
        String cacheValue = redisTemplate.opsForValue().get(CACHE_KEY + productId);
        if (cacheValue != null) {
            return cacheValue; // 缓存命中,直接返回
        }

        // 缓存未命中,加载数据库并更新缓存(永不过期)
        String dbValue = queryDatabase(productId);

        // 设置热点数据永不过期
        redisTemplate.opsForValue().set(CACHE_KEY + productId, dbValue); // 不设置过期时间

        return dbValue;
    }

    private String queryDatabase(String productId) {
        // 模拟数据库查询
        System.out.println("查询数据库,商品ID:" + productId);
        return "Hot product data for " + productId;
    }

    // 手动更新缓存
    public void updateHotProductCache(String productId, String productData) {
        redisTemplate.opsForValue().set(CACHE_KEY + productId, productData); // 不设置过期时间
    }
}

代码详细解释

  1. 查询缓存:首先尝试读取缓存数据。如果缓存命中则直接返回结果,减少数据库访问。

  2. 更新缓存:如果缓存未命中,查询数据库并更新缓存,不设置过期时间,确保该热点数据不会因过期而失效。

  3. 手动更新缓存:提供 updateHotProductCache 方法,以便在热点数据有变动时手动更新缓存内容。可以通过数据库更新事件或定时任务主动更新缓存。

优势和适用场景

  • 适合高频访问的热点数据:特别是访问频率很高、不会频繁变化的数据,例如首页推荐商品、热门分类等。
  • 避免缓存失效带来的冲击:热点数据不会因为缓存失效而直接查询数据库,有效减少数据库负载。

5.3.缓存雪崩 

5.3.1.什么是缓存雪崩?

缓存雪崩指的是在某个时间点,大量缓存同时失效,导致所有请求直接访问数据库,给数据库带来巨大的压力,甚至可能导致系统崩溃。这种情况通常发生在以下场景:

  • 缓存服务器宕机:整个缓存服务不可用,所有请求直接落到数据库。
  • 大量缓存集中在同一时间过期:大量缓存设置了相同的过期时间,导致在某一时刻同时失效。

为防止缓存雪崩,可以采取以下措施:

  • 缓存数据的过期时间设置随机化:在缓存过期时间的基础上,加上一个随机值,避免大量缓存同时失效。

  • 缓存预热:在系统启动或高峰期到来之前,提前将热点数据加载到缓存中。

  • 多级缓存:在本地增加一级缓存,如 Guava Cache,减轻对远程缓存的依赖。

  • 限流和降级:在缓存失效时,对数据库的访问进行限流,必要时进行服务降级。


5.3.2. 设置不同的缓存失效时间

原理

为每个缓存数据设置一个基础过期时间,并在此基础上添加一个随机的时间偏移量,使缓存的过期时间分布在一定的范围内。这样可以避免大量缓存数据在同一时刻失效,分散过期时间点,从而减轻数据库的瞬时访问压力。

实现代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Random;
import java.util.concurrent.TimeUnit;

@Service
public class CacheService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String CACHE_KEY_PREFIX = "product_cache_";
    private static final int BASE_EXPIRE_TIME = 10; // 基础过期时间,单位:分钟

    public void cacheProduct(String productId, String productData) {
        // 生成随机过期时间,范围在 BASE_EXPIRE_TIME 到 BASE_EXPIRE_TIME + 5 分钟之间
        int expireTime = BASE_EXPIRE_TIME + new Random().nextInt(5);
        
        // 将数据存入 Redis,并设置不同的过期时间
        redisTemplate.opsForValue().set(CACHE_KEY_PREFIX + productId, productData, expireTime, TimeUnit.MINUTES);
    }

    public String getProduct(String productId) {
        // 从 Redis 中获取数据
        return redisTemplate.opsForValue().get(CACHE_KEY_PREFIX + productId);
    }
}

代码解释

  • 基础过期时间BASE_EXPIRE_TIME 设置为 10 分钟,表示缓存的基础过期时间。

  • 随机过期时间:在 BASE_EXPIRE_TIME 基础上,加上一个 0 到 4 分钟的随机数 new Random().nextInt(5),使得缓存的失效时间在 10 到 14 分钟之间。这样不同的缓存条目会有略微不同的过期时间,避免在同一时刻集中失效。

  • 缓存数据设置redisTemplate.opsForValue().set 方法将数据写入 Redis,并指定过期时间 expireTime。不同的缓存条目会有不同的失效时间,有效降低缓存雪崩的风险。


5.3.3. 缓存预热

缓存预热是指在系统启动时,提前将部分热点数据加载到缓存中,避免在系统运行后产生大量的缓存未命中情况。通过缓存预热,可以保证在系统启动或高峰期来临时,热点数据已经存在于缓存中,减少对数据库的访问压力。

实现思路

  • 在系统启动时:加载一些常用的、访问频率高的数据到缓存中。
  • 定时任务:定期更新缓存中的热点数据,确保缓存中的数据始终有效。

实现代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;

@Service
public class CacheWarmUpService {

    @Autowired
    private CacheService cacheService;

    @PostConstruct
    public void warmUpCache() {
        // 获取热点数据的ID列表
        List<String> hotProductIds = getHotProductIds();

        // 将每个热点数据加载到缓存中
        for (String productId : hotProductIds) {
            String productData = queryDatabase(productId);
            cacheService.cacheProduct(productId, productData); // 将数据预先缓存
        }
    }

    private List<String> getHotProductIds() {
        // 从数据库或配置中获取热点数据ID列表
        return Arrays.asList("1001", "1002", "1003"); // 示例数据
    }

    private String queryDatabase(String productId) {
        // 模拟数据库查询
        return "Product data for " + productId;
    }
}

代码解释

  • @PostConstruct 注解:该注解会在 Spring Boot 容器启动后自动调用 warmUpCache() 方法。这可以确保在系统启动时,热点数据就已经加载到缓存中。

  • 获取热点数据getHotProductIds 方法用于获取高访问量的商品 ID 列表,可以从数据库或配置文件中获取。
  • 缓存热点数据:遍历热点商品 ID,通过 queryDatabase 方法查询数据库,然后调用 cacheService.cacheProduct 方法将数据缓存起来。

 5.3.4. 其他防范方法

1.限流与降级

在缓存雪崩发生时,即大量缓存突然失效,访问量瞬间增加,可以通过限流和降级的方式来保护数据库,避免数据库被大量请求压垮。

限流

限流可以控制进入数据库的请求量,将超出部分的请求暂时阻止或延迟,避免瞬时高并发对数据库造成过大的压力。常用的限流算法包括令牌桶漏桶算法等。

降级

当缓存不可用时,可以考虑提供简化的数据或延迟响应,避免数据库压力过大。例如:

  • 返回缓存中的旧数据,虽然不够实时,但可以减少数据库的访问。
  • 对非核心业务进行降级,暂时返回空结果或简单提示,确保核心业务的正常运行。

2.多级缓存

多级缓存通过本地缓存和远程缓存结合使用的方式,进一步减轻远程缓存的压力,提升系统的访问速度。多级缓存常见的实现方式是本地缓存(如 Caffeine 或 Guava Cache) + Redis 远程缓存

  • 本地缓存:将热点数据存储在应用服务器本地的内存中,访问速度极快,可以应对短期的缓存雪崩。
  • 远程缓存:使用 Redis 作为分布式缓存,缓存更多数据。

6.Redis 高可用架构方案

在 Redis 的高可用技术方案中,高可用指的是在出现节点故障或网络波动时,系统能够持续提供缓存服务,避免服务中断或性能严重下降。高可用的设计保证了即使某个缓存节点故障,缓存服务仍然可以通过故障转移等机制正常工作,减少了单点故障的风险,保证了系统的可靠性和稳定性。

6.1. Redis Sentinel(主从复制故障监控)

Redis Sentinel 是 Redis 官方提供的一种高可用解决方案,主要通过主从复制故障监控来实现。在 Redis Sentinel 架构中,Sentinel 负责监控 Redis 集群中的主从节点,当主节点出现故障时,Sentinel 会自动将一个从节点提升为主节点,保证服务的持续运行。这种架构适用于不需要数据分片的场景,适合数据量较小且对高可用性有要求的系统。


6.1.1.Redis Sentinel 高可用架构原理

Redis Sentinel 通过三个主要功能来实现 Redis 集群的高可用性:

  • 主从复制:在 Sentinel 架构中,Redis 主节点负责处理写请求,并将数据同步到从节点。多个从节点是主节点的备份,只负责同步数据,不参与写操作。当主节点故障时,Sentinel 可以将从节点提升为主节点,从而实现高可用。

  • 故障转移:每个 Sentinel 实例不断发送 PING 命令来检测 Redis 主节点和其他 Sentinel 的状态。如果在设定的时间范围内没有收到主节点的响应,Sentinel 会认为主节点故障,发起故障转移(Failover),选举某个从节点为新主节点。

  • 通知应用程序:当主节点发生变化时,Sentinel 会将新主节点的地址信息推送给 Redis 客户端(应用程序),让客户端感知主节点的变化,避免因主节点切换而导致的连接错误。


6.1.2.在 Spring Boot 中集成 Redis Sentinel

在 Spring Boot 项目中集成 Redis Sentinel 包括以下几个主要步骤:

  • 配置 Redis Sentinel 集群:配置 Redis 主从节点并启动多个 Sentinel 实例监控主节点的状态。
  • 在 Spring Boot 中配置 Sentinel 集群:在 application.properties 中指定 Sentinel 集群的节点信息和主节点名称。Spring Boot 会自动管理主节点切换。
  • 应用程序使用:在 Spring Boot 中直接使用 RedisTemplateStringRedisTemplate 进行 Redis 操作,Sentinel 会自动切换主节点,无需手动干预。
1.搭建并配置 Redis Sentinel 集群

配置 Redis 主从复制

首先需要配置 Redis 的主从复制,使得数据可以从主节点同步到从节点。

  • 主节点配置:假设 Redis 主节点运行在 127.0.0.1:6379,无需额外配置。

  • 从节点配置:在 Redis 从节点的配置文件(如 redis-slave.conf)中,添加如下配置,将该节点设置为主节点的从节点。

    replicaof 127.0.0.1 6379
    

    这样,从节点会自动从主节点复制数据,保证数据一致性。

配置并启动 Redis Sentinel

接着需要在 Redis 服务器上配置 Redis Sentinel。创建或编辑 sentinel.conf 文件,添加以下配置来监控 Redis 主节点。

# 监控的主节点名称和地址
sentinel monitor mymaster 127.0.0.1 6379 2

# 在 5 秒内未响应则判定节点不可达
sentinel down-after-milliseconds mymaster 5000

# 故障转移最大超时 10 秒
sentinel failover-timeout mymaster 10000

# 故障转移时同步新主节点的从节点数量
sentinel parallel-syncs mymaster 1
  • sentinel monitormymaster 是主节点名称,127.0.0.1:6379 是主节点地址,2 表示至少两个 Sentinel 节点判定主节点不可达时才进行故障转移。
  • sentinel down-after-milliseconds:指定 Sentinel 判断主节点失效的超时时间(5 秒)。
  • sentinel failover-timeout:设置故障转移的最大等待时间(10 秒)。
  • sentinel parallel-syncs:指定故障转移完成后,允许多个从节点并行同步新主节点。

启动 Redis Sentinel 服务

在每台需要运行 Sentinel 的服务器上启动 Sentinel 服务。一般至少配置三个 Sentinel 节点,以保证高可用性和故障切换的准确性。

redis-server /path/to/sentinel.conf --sentinel

2.在 Spring Boot 中配置 Redis Sentinel 集成

在 Spring Boot 项目中,直接通过配置文件指定 Redis Sentinel 信息,Spring Boot 会自动识别并连接到 Redis Sentinel 集群。

配置 application.properties

application.properties 中配置 Redis Sentinel 的主节点名称和 Sentinel 节点地址,Spring Boot 将会自动管理主节点切换,无需手动更改主节点地址。

# Redis Sentinel 集群的主节点名称和 Sentinel 节点地址
spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381
spring.redis.password=yourpassword
  • spring.redis.sentinel.master:指定 Redis 主节点名称,必须与 sentinel.conf 中的名称一致。
  • spring.redis.sentinel.nodes:配置所有 Sentinel 节点的 IP 和端口,Spring Boot 会自动连接并监控这些 Sentinel 节点。
  • spring.redis.password:如果 Redis 设置了密码,可以在此处配置。

3.在 Spring Boot 中使用 Redis Sentinel

完成以上配置后,Spring Boot 会自动连接 Redis Sentinel 集群。当主节点发生故障时,Redis Sentinel 会自动将某个从节点提升为新的主节点,Spring Boot 无需手动干预,客户端会自动连接到新的主节点。

示例代码

在 Spring Boot 中,可以使用 StringRedisTemplateRedisTemplate 来操作 Redis 数据。以下是一个简单的 Redis 服务类,展示了如何在应用中使用 Redis Sentinel:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class RedisService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public void saveData(String key, String value) {
        stringRedisTemplate.opsForValue().set(key, value);
    }

    public String getData(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }
}

RedisService 中,StringRedisTemplate 负责与 Redis Sentinel 集群交互,执行数据的读写操作。当 Redis Sentinel 发生故障转移时,Spring Boot 会自动更新主节点连接,因此无需额外的逻辑来处理主节点变化。


6.1.3.Redis Sentinel 的工作流程总结

  1. 故障检测:每个 Sentinel 节点不断向主节点、从节点和其他 Sentinel 节点发送 PING 命令,检测节点是否正常响应。

  2. 故障判定:当某个 Sentinel 节点在设定时间内未收到主节点的响应时,会将该主节点标记为主观下线状态(Subjectively Down,简称 SDOWN)。当多个 Sentinel 节点认为主节点不可达时,主节点会被标记为客观下线(Objectively Down,简称 ODOWN),并触发故障转移。

  3. 故障转移:在主节点被判定故障后,Sentinel 集群会选举出一个健康的从节点,将其提升为新的主节点,并重新配置其他从节点连接到该新主节点。

  4. 通知客户端:故障转移完成后,Sentinel 将新主节点的信息通知给 Redis 客户端,使客户端应用自动切换到新的主节点。


6.1.4.Redis Sentinel 的特点

优势:

  • 高可用性:当主节点故障时,Redis Sentinel 可以自动选择从节点进行故障转移,保证服务的持续可用。
  • 自动主节点发现:应用程序无需手动调整主节点地址,Redis Sentinel 在主节点发生变化后自动通知客户端,Spring Boot 会自动连接新的主节点。
  • 适合非分片场景:Redis Sentinel 不支持数据分片,因此适用于数据量相对较小、不需要水平扩展的场景。

局限性:

  • 不支持分片:Redis Sentinel 只适用于单一主从结构,无法分片,数据量过大时需要 Redis Cluster 等其他分布式方案。
  • 哨兵节点的高可用性依赖:为了确保故障切换的准确性和服务的稳定性,通常需要至少三个 Sentinel 节点。

6.2 Redis Cluster(数据分片和自动故障转移)

Redis Cluster 是 Redis 官方提供的一种高可用和高性能的分布式缓存方案,通过数据分片和自动故障转移来实现高可用性。Redis Cluster 将数据分片存储在多个主从节点上,支持水平扩展,适合大数据量和高并发需求的场景。在 Redis Cluster 中,每个主节点负责一部分数据,并有一个或多个从节点作为备份,当某个主节点发生故障时,从节点会自动提升为新的主节点,以确保缓存服务的正常运行。

6.2.1 Redis Cluster 高可用架构原理

Redis Cluster 通过以下几个主要功能来实现高可用性:

  • 数据分片:Redis Cluster 将数据划分为 16384 个哈希槽(Hash Slots),每个节点负责一部分哈希槽,支持水平扩展。
  • 自动故障转移:每个主节点拥有一个或多个从节点。当主节点出现故障时,从节点自动提升为主节点,确保数据的高可用性。
  • 高并发支持:由于 Redis Cluster 分散数据存储,可以处理大规模数据请求,适合高并发场景。

6.2.2 在 Spring Boot 中集成 Redis Cluster

在 Spring Boot 项目中集成 Redis Cluster 包括以下几个主要步骤:

  1. 配置 Redis Cluster 集群:配置并启动多个 Redis 主从节点,创建 Redis Cluster 集群。
  2. 在 Spring Boot 中配置 Cluster 集群信息:在 application.properties 中指定 Redis Cluster 节点信息,Spring Boot 会自动管理节点连接和数据分片。
  3. 应用程序使用:在 Spring Boot 中直接使用 RedisTemplateStringRedisTemplate 进行 Redis 操作,Redis Cluster 会自动进行分片存储和主从切换。
1. 搭建并配置 Redis Cluster 集群

配置 Redis Cluster 的主从节点

在 Redis 集群中,每个节点需要独立的配置文件(例如 redis-7000.confredis-7001.conf 等),以下是示例配置:

# redis-7000.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 5000
appendonly yes

其他节点(7001、7002 等)配置类似,只需修改端口号即可。

启动 Redis 节点

启动所有 Redis 节点,例如 7000 至 7005 端口的 Redis 实例:

redis-server /path/to/redis-7000.conf
redis-server /path/to/redis-7001.conf
# ...依次启动其他节点

创建 Redis 集群

使用 redis-cli 命令行工具将这些节点添加到集群,并指定主从关系:

redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1

此命令会创建 3 个主节点(7000、7001、7002)和 3 个从节点(7003、7004、7005),实现主从配置和数据分片。

2. 在 Spring Boot 中配置 Redis Cluster 集成

在 Spring Boot 中,通过配置文件指定 Redis Cluster 的节点信息和连接属性,Spring Boot 会自动管理 Redis Cluster 集群。

配置 application.properties

# 配置 Redis Cluster 集群节点
spring.redis.cluster.nodes=127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005

# 最大重定向次数(适用于分片数据重定向)
spring.redis.cluster.max-redirects=3

# Redis 密码(如果 Redis 设置了密码)
spring.redis.password=yourpassword
  • spring.redis.cluster.nodes:指定 Redis Cluster 集群中所有节点的 IP 和端口,Spring Boot 会自动连接到这些节点。
  • spring.redis.cluster.max-redirects:设置最大重定向次数,用于分片数据的重定向处理。
  • spring.redis.password:如果 Redis 设置了密码,可以在这里配置。
3. 在 Spring Boot 中使用 Redis Cluster

完成配置后,开发者可以直接使用 RedisTemplateStringRedisTemplate 操作数据。Spring Boot 会根据 Redis Cluster 的节点信息自动进行分片管理和主从切换。

示例代码

以下是一个 Redis 服务类示例,展示了如何在 Spring Boot 中使用 Redis Cluster 进行数据的增删改查操作:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class RedisClusterService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 存储数据到 Redis Cluster
    public void saveData(String key, String value) {
        stringRedisTemplate.opsForValue().set(key, value);
    }

    // 从 Redis Cluster 获取数据
    public String getData(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }

    // 删除 Redis Cluster 中的数据
    public void deleteData(String key) {
        stringRedisTemplate.delete(key);
    }
}
  • 数据存储:使用 StringRedisTemplateopsForValue().set() 方法将数据存储到 Redis Cluster,数据会自动分配到对应的哈希槽。
  • 数据读取:使用 opsForValue().get() 方法读取数据,Redis Cluster 会根据数据的哈希槽位置定位并返回数据。
  • 自动故障转移:当某个主节点发生故障时,Redis Cluster 会自动将从节点提升为主节点,Spring Boot 无需额外配置即可继续使用。

6.2.3 Redis Cluster 的特点

优势

  • 数据分片:Redis Cluster 将数据分片存储在多个节点上,支持水平扩展,适合大数据量场景。
  • 自动故障转移:当主节点故障时,Redis Cluster 会自动提升从节点为主节点,实现自动故障转移,保障服务的高可用性。
  • 高并发支持:分片结构允许 Redis Cluster 处理更多的并发请求,是高并发场景的理想选择。

局限性

  • 跨节点事务支持有限:Redis Cluster 不支持多键操作的跨节点事务,分布在不同节点上的键无法进行原子操作。
  • 节点之间的网络通信:Redis Cluster 的节点间需要互相通信,因此在网络不稳定的环境中可能导致集群不一致。
  • 最低节点要求:Redis Cluster 要求至少 6 个节点(3 个主节点和 3 个从节点)才能实现高可用。

适用场景

  • 大规模缓存系统:在需要处理大数据量的缓存场景中,Redis Cluster 提供数据分片和自动容错机制。
  • 高并发的会话管理:适合处理大量用户会话,特别是在高并发应用中。
  • 需要水平扩展的场景:Redis Cluster 通过分片实现了水平扩展,适合随着业务增长扩展数据容量的场景。

6.2.4.关于槽点的重新分配

1. 添加或删除节点后的槽点分配

当 Redis Cluster 中添加或删除节点时,Redis 不会自动重新分配槽点,因此需要手动进行槽点迁移。解决方法如下:

  • 手动重新分配槽点
    • 使用 redis-trib.rbredis-cli --cluster reshard 命令,将槽点重新分配到新的节点或从即将删除的节点迁出。
    • 该操作需要管理员手动执行,可以通过指定源节点、目标节点和槽数进行槽点迁移。
  • 自动化方案(可选):
    • 通过 Spring Boot 中的定时任务(@Scheduled)检查集群状态,检测到节点增减后,自动调用 redis-trib.rbredis-cli 执行槽点重新分配操作。

2. 负载均衡的槽点分配

Redis Cluster 不支持自动基于负载的槽点分配调整,因此实现负载均衡也需要手动操作。解决方法如下:

  • 手动负载均衡
    • 定期使用 Redis 客户端(如 Jedis)或监控工具检查各节点的负载情况。
    • 如果发现某些节点负载过高,可以手动执行 redis-trib.rbredis-cli --cluster reshard 命令,将部分槽点从负载较高的节点迁移至负载较低的节点,达到负载均衡。
  • 自动化方案(可选):
    • 在 Spring Boot 中通过定时任务(@Scheduled)自动监控节点负载。
    • 结合负载监控逻辑,使用 ProcessBuilder 调用 redis-trib.rb 脚本或 redis-cli 执行槽点迁移,从而实现更灵活的负载均衡自动化。

8. Spring Boot 中的 Redis 消息队列实现

 8.1.发布与订阅(Pub/Sub)

在 Spring Boot 中,利用 Redis 的发布/订阅(Pub/Sub)机制可以实现消息的实时推送与接收。这个过程分为三个主要步骤:

  1. 定义消息订阅者:创建一个订阅者,用来监听 Redis 频道的消息。
  2. 配置 Redis 的消息监听容器:利用 RedisMessageListenerContainer 来绑定频道和订阅者。
  3. 实现消息发布:通过 RedisTemplate 将消息发布到 Redis 频道。

8.1.1.四大组件 

  • 监听者(Listener)

    • 每个监听者是一个自定义的类,用于接收和处理 Redis 频道的消息。
    • 监听者实现了 MessageListener 接口,并重写 onMessage 方法,当 Redis 频道接收到消息时,该方法会被自动调用。
    • 每个监听者通常负责处理特定的频道消息内容,可以包含自定义的业务逻辑。
  • 频道(Channel)

    • Redis 频道是消息的逻辑分组,用于将消息发布到订阅的客户端或服务端。
    • 在 Spring Boot 中,ChannelTopic 表示 Redis 中的一个频道,频道名称通常以字符串表示,如 "channel1""channel2"
    • 不同的频道可以承载不同类型的消息,实现了消息的分组和隔离。
  • 适配器(MessageListenerAdapter)

    • MessageListenerAdapter 是连接 RedisMessageListenerContainer 和自定义监听者(Listener)的桥梁。
    • 每个 MessageListenerAdapter 只能绑定一个监听者,但可以通过 RedisMessageListenerContainer 监听多个频道。
    • 适配器的主要作用是将监听者转换为 Redis 可识别的 MessageListener,以便容器能够调用监听者的 onMessage 方法。
  • 容器(RedisMessageListenerContainer)

    • RedisMessageListenerContainer 是 Redis 消息监听器的核心管理组件。
    • 容器会持续监听 Redis 中的所有绑定频道,当指定频道中有消息发布时,容器会找到绑定的 MessageListenerAdapter
    • 容器通过适配器将频道消息传递给监听者,从而触发监听者的 onMessage 方法。

8.1.2.定义监听者(Listener)

首先,我们定义两个监听者类 Channel1SubscriberChannel2Subscriber。这两个类分别监听 channel1channel2 频道中的消息,并实现特定的处理逻辑。

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

@Service
public class Channel1Subscriber implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(pattern); 
        String messageBody = new String(message.getBody());
        System.out.println("Channel1 Subscriber received message from [" + channel + "]: " + messageBody);
        // 处理来自 channel1 的消息的特定逻辑
    }
}
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;


@Service
public class Channel2Subscriber implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(pattern); 
        String messageBody = new String(message.getBody());
        System.out.println("Channel2 Subscriber received message from [" + channel + "]: " + messageBody);
        // 处理来自 channel2 的消息的特定逻辑
    }
}

代码解释

  • MessageListener 接口:两个监听者都实现了 MessageListener 接口,这样可以使它们具备接收 Redis 消息的能力。
  • onMessage 方法:当指定频道接收到新消息时,Redis 会调用监听者的 onMessage 方法。
    • pattern 参数:表示消息来自的频道名称,通常是字节数组,我们将其转换为字符串。
    • message.getBody():获取消息的内容,将其转换为字符串以便处理。
  • @Service 注解:将监听者标记为 Spring Bean,使其可以被 Spring 容器管理和注入。

通过这个设置,每当 channel1channel2 中有新消息发布时,Channel1SubscriberChannel2Subscriber 会自动接收到该消息并输出到控制台。此时,两个监听者类只是负责接收消息,并执行特定的消息处理逻辑。


8.1.3.配置 Redis 消息监听器容器和适配器

为了让监听者接收消息,需要通过 RedisMessageListenerContainerMessageListenerAdapter 进行配置。这里,我们使用 RedisConfig 配置类来完成这些配置。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfig {

    // 配置 Redis 消息监听器容器
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter channel1ListenerAdapter,
                                                   MessageListenerAdapter channel2ListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        
        // 将 channel1 的适配器与频道绑定
        container.addMessageListener(channel1ListenerAdapter, new ChannelTopic("channel1")); 
        
        // 将 channel2 的适配器与频道绑定
        container.addMessageListener(channel2ListenerAdapter, new ChannelTopic("channel2")); 
        
        return container;
    }

    // 配置 Channel1 的消息监听适配器
    @Bean
    public MessageListenerAdapter channel1ListenerAdapter(Channel1Subscriber channel1Subscriber) {
        return new MessageListenerAdapter(channel1Subscriber);
    }

    // 配置 Channel2 的消息监听适配器
    @Bean
    public MessageListenerAdapter channel2ListenerAdapter(Channel2Subscriber channel2Subscriber) {
        return new MessageListenerAdapter(channel2Subscriber);
    }
}

代码解释

RedisConfig 配置类

  • 使用 @Configuration 注解声明为配置类,Spring 启动时会自动加载此配置类。

RedisMessageListenerContainer 容器

  • 作为 Redis 消息监听的核心容器,负责监听绑定的频道并将消息转发给绑定的监听者。
  • setConnectionFactory:设置连接工厂,连接到 Redis 实例。
  • addMessageListener 方法:绑定频道和适配器,使适配器可以监听特定的频道。
    • 第一个参数为 MessageListenerAdapter,表示监听哪个适配器。
    • 第二个参数为 ChannelTopic,指定频道名称。
  • 通过 addMessageListener(channel1ListenerAdapter, new ChannelTopic("channel1")),将 channel1ListenerAdapter 绑定到 channel1 频道,类似地绑定 channel2 频道和 channel2ListenerAdapter

MessageListenerAdapter

  • channel1ListenerAdapter 方法:将 Channel1Subscriber 监听者适配为 Redis 可用的 MessageListenerAdapter
  • channel2ListenerAdapter 方法:将 Channel2Subscriber 监听者适配为 MessageListenerAdapter
  • 每个 MessageListenerAdapter 都只能绑定一个监听者,确保每个适配器只处理一个监听者的消息接收。

8.1.4. 实现消息发布类

为实现发布消息到 Redis 频道,我们创建一个 MessagePublisher 类,利用 RedisTemplate 提供的 convertAndSend 方法将消息发布到指定的频道。这个发布类可以与 REST API 或其他服务调用相结合,方便消息的动态发布。

代码示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // 发布消息到指定频道
    public void publish(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
        System.out.println("Published message to channel [" + channel + "]: " + message);
    }
}

代码解释

  • MessagePublisher:使用 @Service 注解将该类标记为 Spring 的 Bean,使其可以在其他组件中被注入和调用。

  • RedisTemplate

    • 通过 @Autowired 注入 RedisTemplate,该类提供了操作 Redis 的各种方法,包括发送消息。
    • RedisTemplate 是 Spring Data Redis 提供的用于执行 Redis 操作的工具,支持字符串、哈希、列表、集合等各种类型的操作。
  • publish 方法

    • 参数channel 表示目标频道的名称,message 表示要发送的消息内容。
    • convertAndSend 方法:通过 RedisTemplate.convertAndSend 方法,将消息发送到指定的频道。
      • 第一个参数 channel 是频道名称,第二个参数 message 是要发布的消息内容。
    • 控制台输出:输出消息发布成功的日志,便于观察消息的发送情况。

效果:调用 publish 方法会将消息发布到指定的 Redis 频道,频道名称可以根据需要动态传入。


8.1.5. 测试发布与订阅

为了测试发布和订阅功能是否正常工作,我们可以创建一个 REST 控制器,通过 API 接口触发消息发布,然后观察控制台的输出,以确认订阅者是否接收到了来自不同频道的消息。

代码示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private MessagePublisher messagePublisher;

    // 发布消息的 API 接口
    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam String channel, @RequestParam String message) {
        messagePublisher.publish(channel, message);
        return "Message published successfully to channel: " + channel;
    }
}

代码解释

  • MessageController 控制器
    • 使用 @RestController 注解,将该类标记为 REST API 控制层,可以通过 HTTP 请求与客户端交互。
  • sendMessage 方法
    • 使用 @GetMapping 注解暴露一个 GET 请求的 API 接口 /sendMessage,用于向指定频道发送消息。
    • 请求参数
      • channel:指定要发布消息的频道名称。
      • message:指定要发布的消息内容。
    • 方法逻辑:调用 messagePublisher.publish(channel, message) 将消息发布到指定频道,并返回成功消息给客户端。

 8.2.异步处理任务(Stream)

在 Spring Boot 中,利用 Redis Stream 实现异步任务队列是处理高并发任务的一种高效方案。Redis Stream 提供了多消费者、消费者组、消息确认等高级功能,可以适用于订单处理、实时数据处理等需要高并发和高可靠性的场景。我们可以在同一个消费者组中使用多个消费者,并且根据需求来动态分配任务处理逻辑。以下是 Redis Stream 异步任务队列的详细实现和说明。


Redis Stream 异步任务处理流程

  1. 任务生产者:任务生产者将任务消息推送到 Redis Stream 队列中。
  2. 创建消费者组:通过 Redis 的消费者组机制,多个消费者可以从同一个队列中读取任务,并保证每条消息只被一个消费者处理。
  3. 任务消费者:多个消费者从消费者组中取出任务,按需进行任务分配和负载均衡。
  4. 确认机制:消费者处理完任务后,向 Redis 确认已处理该消息,防止重复消费。

8.2.1. 任务生产者:将任务放入 Redis Stream

任务生产者负责将任务消息添加到 Redis Stream 中。任务内容可以是字符串、JSON 或键值对,每条消息都会获得一个唯一的 RecordId,方便后续处理和确认。

代码示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class StreamTaskProducer {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String STREAM_NAME = "taskStream";

    // 将任务消息添加到 Redis Stream
    public RecordId produceTask(String taskData) {
        var record = StreamRecords.objectBacked(taskData).withStreamKey(STREAM_NAME);
        RecordId recordId = redisTemplate.opsForStream().add(record);
        System.out.println("Produced task with ID: " + recordId.getValue() + " and data: " + taskData);
        return recordId;
    }
}

代码解释

  • StreamTaskProducer:作为任务生产者,将任务推送到 Redis Stream。
  • produceTask 方法
    • 参数 taskData 表示任务内容,可以是字符串、JSON 或更复杂的对象。
    • StreamRecords.objectBacked(taskData).withStreamKey(STREAM_NAME) 创建了一个 StreamRecord 对象,将 taskData 添加到 taskStream 队列中。
    • redisTemplate.opsForStream().add(record) 将任务记录添加到 Stream 中并返回一个 RecordId

当调用 produceTask 方法时,任务数据被添加到 taskStream 中,等待消费者处理。


8.2.2. 创建消费者组

为了让多个消费者能够并发消费同一个任务队列的任务,我们可以创建一个消费者组。消费者组负责管理任务的分配、记录消息消费状态、未确认消息等。

代码示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class StreamGroupManager {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String STREAM_NAME = "taskStream";
    private static final String GROUP_NAME = "taskGroup";

    // 创建消费者组
    public void createGroup() {
        try {
            redisTemplate.opsForStream().createGroup(STREAM_NAME, GROUP_NAME);
            System.out.println("Consumer group created: " + GROUP_NAME);
        } catch (Exception e) {
            System.out.println("Group already exists or error: " + e.getMessage());
        }
    }
}

代码解释

  • StreamGroupManager:用于创建和管理消费者组 taskGroup
  • createGroup 方法:使用 redisTemplate.opsForStream().createGroup() 方法创建一个消费者组。若组已存在,则会捕获异常,避免重复创建。

通过调用 createGroup,消费者组 taskGroup 创建成功,使多个消费者可以并发读取任务。


8.2.3. 多个消费者从同一个消费者组中读取和处理任务

在 Redis 中,一个消费者组可以包含多个消费者,每个消费者都有一个唯一的名称。这种设计能够实现任务的自动负载均衡和确认机制。多个消费者可以并发地从同一队列中读取任务,Redis 会根据任务的消费状态自动将任务分配给不同的消费者。

代码示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;

@Service
public class ConsumerService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String STREAM_NAME = "taskStream";
    private static final String GROUP_NAME = "taskGroup";

    public void consumeTasks(String consumerName) {
        List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
            Object.class,
            org.springframework.data.redis.connection.stream.Consumer.from(GROUP_NAME, consumerName),
            StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed())
        );

        // 处理读取到的消息
        if (messages != null) {
            for (MapRecord<String, Object, Object> message : messages) {
                System.out.println(consumerName + " processing task ID: " + message.getId() + ", Data: " + message.getValue());
                // 确认消息已被处理
                redisTemplate.opsForStream().acknowledge(GROUP_NAME, message);
            }
        }
    }
}

代码解释

  • ConsumerService:该类通过传入不同的 consumerName 参数来模拟多个消费者的工作。不同的 consumerName 使得消费者组能够识别不同的消费者,并在同一组内为多个消费者分配任务。
  • consumeTasks(String consumerName) 方法
    • Consumer.from(GROUP_NAME, consumerName):指定消费者组 taskGroup 和当前消费者的名称 consumerName,从该组中获取未消费的消息。
    • StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed()):从上一次消费的位置继续消费,保证消息不会被重复消费。
    • 消息确认:每条消息在处理完毕后,通过 acknowledge 方法确认,防止消息被重复分配。

调用 consumeTasks 并传入不同的 consumerName,可以实现多消费者并发从同一任务队列中获取任务,Redis 会自动进行任务的负载均衡。

8.2.4.具体应用示例

在 Spring Boot 中,可以使用 @Scheduled 注解实现定时任务,以便模拟生产者持续产生任务,消费者持续轮询消费任务。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@EnableScheduling
public class MessageQueueApplication implements CommandLineRunner {

    @Autowired
    private StreamTaskProducer taskProducer;

    @Autowired
    private StreamGroupManager groupManager;

    @Autowired
    private ConsumerService consumerService;

    private int taskCounter = 1;

    @Override
    public void run(String... args) throws Exception {
        // Step 1: 创建消费者组(如果已经存在则跳过)
        groupManager.createGroup();
    }

    // Step 2: 定时产生任务,每隔5秒产生一个新任务
    @Scheduled(fixedRate = 5000)
    public void produceTasks() {
        String taskData = "Task data " + taskCounter++;
        taskProducer.produceTask(taskData);
    }

    // Step 3: 消费者1轮询消费任务,每隔3秒检查新任务
    @Scheduled(fixedRate = 3000)
    public void consumer1Tasks() {
        consumerService.consumeTasks("consumer1");
    }

    // Step 4: 消费者2轮询消费任务,每隔3秒检查新任务
    @Scheduled(fixedRate = 3000)
    public void consumer2Tasks() {
        consumerService.consumeTasks("consumer2");
    }
}

代码解释

  • @EnableScheduling:启用 Spring 的定时任务功能。
  • @Scheduled 注解:实现定时任务。
    • produceTasks():每隔 5 秒钟调用一次,模拟任务生产者持续产生新任务。
    • consumer1Tasks()consumer2Tasks():分别每隔 3 秒调用一次,模拟两个消费者持续轮询获取任务。

produceTasks() 中,任务计数器 taskCounter 自动递增,生成不同的数据内容,确保每个任务的数据是唯一的。

8.2.5. 不同任务类型的处理

在某些应用场景中,不同类型的任务可能需要不同的处理逻辑。我们可以在任务消息中添加“任务类型”字段,消费者在读取消息后,根据任务类型选择合适的处理方法。

代码示例

1.在任务消息中添加类型字段

Map<String, Object> taskData = new HashMap<>();
taskData.put("type", "TYPE_A"); // 设置任务类型
taskData.put("content", "Task content for TYPE_A");

// 将任务添加到 Stream 中
redisTemplate.opsForStream().add(StreamRecords.mapBacked(taskData).withStreamKey(STREAM_NAME));

2.在消费者中根据任务类型处理不同任务

public void consumeTasks(String consumerName) {
    List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
        Object.class,
        org.springframework.data.redis.connection.stream.Consumer.from(GROUP_NAME, consumerName),
        StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed())
    );

    if (messages != null) {
        for (MapRecord<String, Object, Object> message : messages) {
            // 解析任务类型
            String taskType = (String) message.getValue().get("type");

            // 根据任务类型调用不同的处理方法
            if ("TYPE_A".equals(taskType)) {
                handleTypeATask(message);
            } else if ("TYPE_B".equals(taskType)) {
                handleTypeBTask(message);
            }

            // 确认消息已被处理
            redisTemplate.opsForStream().acknowledge(GROUP_NAME, message);
        }
    }
}

private void handleTypeATask(MapRecord<String, Object, Object> message) {
    System.out.println("Handling TYPE_A task: " + message.getValue().get("content"));
}

private void handleTypeBTask(MapRecord<String, Object, Object> message) {
    System.out.println("Handling TYPE_B task: " + message.getValue().get("content"));
}

代码解释

  • 任务类型字段:在消息数据中添加 type 字段,用于指定任务类型。
  • 任务筛选和处理:在消费时,根据 type 字段的值调用不同的处理方法,如 handleTypeATaskhandleTypeBTask
  • 消息确认:无论任务类型如何,处理完毕后通过 acknowledge 确认消息,防止重复消费。

通过在消息中添加 type 字段,可以实现多种类型任务的处理方案,满足复杂场景的需求。

8.3.任务调度(延迟队列)

通过 Redis 的 Sorted Set 可以轻松实现一个延迟队列,这种延迟队列适用于需要定时触发的任务或批处理任务场景。其基本思路是:在 Sorted Set 中,任务的 score 表示其执行时间戳。通过定期检查,将所有到期任务取出并执行,实现任务的延迟处理。

实现思路

  1. 任务添加:将任务添加到 Redis 的 Sorted Set 中,使用未来的时间戳作为任务的 score 值,表示任务的到期时间。
  2. 任务检查与取出:设置定时任务定期检查 Sorted Set 中到期任务,将 score(时间戳)小于或等于当前时间的任务取出,这些任务即为已到期的任务。
  3. 任务处理与删除:处理到期任务,并将它们从 Sorted Set 中移除,确保任务不会重复执行。

8.3.1.两大组件

为了实现这个延迟队列,我们将定义两个主要组件:

  • DelayedTaskProducer:将任务添加到 Redis 延迟队列中。
  • DelayedTaskScheduler:定期检查 Redis 延迟队列,并处理到期任务。
1. DelayedTaskProducer:延迟任务生产者

这个类负责将任务添加到 Redis 的 Sorted Set 中。我们使用任务的未来时间戳作为 score 来标记任务的到期时间。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class DelayedTaskProducer {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String DELAYED_TASK_QUEUE = "delayed_task_queue";

    /**
     * 添加延迟任务
     *
     *  taskId         任务ID或描述
     *  delayInSeconds 延迟时间(秒)
     */
    public void addDelayedTask(String taskId, long delayInSeconds) {
        // 当前时间 + 延迟时间 = 任务的到期时间戳(秒)
        long score = System.currentTimeMillis() / 1000 + delayInSeconds;
        // 将任务加入 Redis Sorted Set,key 为任务 ID,score 为到期时间戳
        redisTemplate.opsForZSet().add(DELAYED_TASK_QUEUE, taskId, score);
        System.out.println("Added task " + taskId + " with delay " + delayInSeconds + " seconds.");
    }
}

代码解释

  • DELAYED_TASK_QUEUE:这是 Redis 中存储延迟任务的 Sorted Set 键名,所有延迟任务都存储在这个集合中。
  • addDelayedTask 方法
    • 参数 taskId:任务的唯一标识,可以是任务的 ID 或者任务的描述。
    • 参数 delayInSeconds:表示任务的延迟时间,以秒为单位。
    • 计算到期时间 scoreSystem.currentTimeMillis() / 1000 + delayInSeconds 计算未来的时间戳,表示任务的到期时间。
    • 添加到 Sorted Set:使用 redisTemplate.opsForZSet().add() 方法将任务添加到 Sorted Set 中。taskId 作为集合的成员,score 表示任务的到期时间,用于排序。

当你调用 addDelayedTask 方法时,任务会被添加到 Sorted Set 中,并根据 score 进行排序。即:到期时间最近的任务会排在集合前面,确保任务可以按顺序被处理。


2. DelayedTaskScheduler:定期检查和处理到期任务

DelayedTaskScheduler 负责定期检查 Redis 中的 Sorted Set,取出已到期的任务并处理。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Set;

@Service
public class DelayedTaskScheduler {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String DELAYED_TASK_QUEUE = "delayed_task_queue";

    /**
     * 定期检查并处理到期任务
     */
    @Scheduled(fixedRate = 5000) // 每隔5秒执行一次
    public void processDelayedTasks() {
        long now = System.currentTimeMillis() / 1000; // 当前时间戳(秒级)

        // 获取所有到期任务(score 小于等于当前时间的任务)
        Set<Object> taskIds = redisTemplate.opsForZSet().rangeByScore(DELAYED_TASK_QUEUE, 0, now);

        // 遍历每个到期任务,执行处理并移除
        if (taskIds != null && !taskIds.isEmpty()) {
            for (Object taskId : taskIds) {
                // 处理任务逻辑
                System.out.println("Processing delayed task: " + taskId);

                // 从队列中删除已处理的任务
                redisTemplate.opsForZSet().remove(DELAYED_TASK_QUEUE, taskId);
            }
        }
    }
}

代码解释

  • processDelayedTasks 方法:每隔 5 秒运行一次,定期检查 Sorted Set 中是否有到期任务。
    • 当前时间 now:获取当前的时间戳,用于判断任务的到期情况。
    • rangeByScore 方法redisTemplate.opsForZSet().rangeByScore(DELAYED_TASK_QUEUE, 0, now)Sorted Set 中获取所有 score 小于等于当前时间戳的任务,这些任务即已到期。
    • 遍历和处理任务:对于每个已到期的任务,执行相应的处理逻辑(这里以输出日志表示任务的处理过程)。
    • 移除任务:使用 remove 方法将已处理的任务从 Sorted Set 中删除,防止任务被重复处理。

processDelayedTasks 方法确保到期任务在预定时间后自动执行,并且通过定时检查保持任务调度的稳定性。


8.3.2.具体应用示例

在应用的某个部分,可以使用 DelayedTaskProduceraddDelayedTask 方法来添加任务。例如,可以在控制器或服务中使用此方法:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class DelayedTaskInitializer {

    @Autowired
    private DelayedTaskProducer delayedTaskProducer;

    @PostConstruct
    public void init() {
        // 在应用启动时,添加一些延迟任务示例
        delayedTaskProducer.addDelayedTask("task1", 10); // 延迟10秒执行
        delayedTaskProducer.addDelayedTask("task2", 20); // 延迟20秒执行
        delayedTaskProducer.addDelayedTask("task3", 30); // 延迟30秒执行
    }
}

代码解释

  • @PostConstruct 注解init 方法会在 DelayedTaskInitializer Bean 初始化完成后自动执行,因此应用启动时会自动添加示例任务。
  • 任务添加addDelayedTask 方法将三个任务分别添加到延迟队列中,它们的到期时间分别是 10 秒、20 秒和 30 秒。

8.3.3.执行流程总结

1.添加延迟任务

  • 在应用启动时,DelayedTaskInitializer 中的 init 方法通过 @PostConstruct 自动执行,添加延迟任务 task1task2task3
  • 每个任务会被插入 Redis Sorted Set 中,且带有不同的到期时间。

2.定时检测任务

  • DelayedTaskScheduler 中的 processDelayedTasks 方法每 5 秒执行一次。
  • 在每次执行时,processDelayedTasks 会检查 Sorted Set 中所有分数小于等于当前时间的任务,并将这些任务视为到期任务。

3.任务处理和清理

  • 对于每个到期任务,调度器会执行处理逻辑(如打印任务 ID),并将任务从 Sorted Set 中移除,避免任务被重复执行。

9. 性能优化与监控

9.1. 连接池配置

连接池可以帮助管理 Redis 和 MySQL 的连接资源,避免频繁建立和销毁连接,从而减少系统开销,提高效率。合理的连接池配置可以防止资源耗尽,确保系统稳定运行。

9.1.1 Redis 连接池配置

在 Spring Boot 中可以通过 lettucejedis 来配置 Redis 连接池,以下示例基于 lettuce 配置连接池参数。

# Redis 连接配置
spring.redis.host=localhost
spring.redis.port=6379

# Lettuce 连接池配置
spring.redis.lettuce.pool.max-active=10  # 最大连接数
spring.redis.lettuce.pool.max-idle=5     # 最大空闲连接数
spring.redis.lettuce.pool.min-idle=2     # 最小空闲连接数
spring.redis.lettuce.pool.max-wait=1000  # 最大等待时间(毫秒)

代码解释

  • max-active:最大连接数,设置 Redis 可以同时保持的最大连接数。
  • max-idle:最大空闲连接数,连接池中可以保持的最大空闲连接数,超过的连接将会被释放。
  • min-idle:最小空闲连接数,保持的最少空闲连接数,当连接数低于此值时会创建新连接。
  • max-wait:最大等待时间,连接池获取连接的等待时间(毫秒),如果超过时间未获取到连接,则抛出异常。

通过合理配置连接池参数,可以避免 Redis 的连接池资源耗尽,提高应用的并发处理能力。

9.1.2 MySQL 连接池配置

Spring Boot 默认使用 HikariCP 作为 MySQL 数据库的连接池,可以通过以下配置进行优化。

# MySQL 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/mydatabase
spring.datasource.username=root
spring.datasource.password=password

# HikariCP 连接池配置
spring.datasource.hikari.maximum-pool-size=10    # 最大连接数
spring.datasource.hikari.minimum-idle=2         # 最小空闲连接数
spring.datasource.hikari.idle-timeout=30000     # 空闲连接的最大存活时间
spring.datasource.hikari.connection-timeout=30000 # 获取连接的最大等待时间(毫秒)
spring.datasource.hikari.max-lifetime=1800000   # 连接的最大存活时间(毫秒)

代码解释

  • maximum-pool-size:最大连接数,即连接池中允许的最大连接数。
  • minimum-idle:最小空闲连接数,保持的最少空闲连接数,低于此值时会创建新连接。
  • idle-timeout:空闲连接的最大存活时间,超过此时间的空闲连接将被释放。
  • connection-timeout:连接池中获取连接的最大等待时间,如果超过此时间未获取到连接,则抛出异常。
  • max-lifetime:连接的最大存活时间,避免长时间使用的连接失效。

通过配置 HikariCP 参数,可以更好地管理数据库连接池,减少系统在高并发场景下的连接资源问题。


9.2. 性能监控

性能监控可以帮助实时查看系统运行状态,识别性能瓶颈。可以使用 Spring Boot Actuator、Redis CLI 和 MySQL 监控工具来跟踪关键指标,如缓存命中率、连接池状态、数据库响应时间等。

9.2.1 使用 Spring Boot Actuator

Spring Boot Actuator 提供了丰富的监控端点,可以帮助开发者快速了解应用的运行状态。

<!-- 引入 Spring Boot Actuator 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

配置 Actuator 端点

application.properties 中启用 Actuator 所需的端点,以便查看 Redis 和 MySQL 的状态。

management.endpoints.web.exposure.include=health,metrics,beans
management.endpoint.health.show-details=always

使用端点

  • /actuator/health:查看应用的健康状态,包含 Redis 和 MySQL 连接状态。
  • /actuator/metrics:查看应用的各种性能指标,如内存使用、CPU、线程等信息。

9.2.2 Redis CLI 监控 Redis 状态

Redis 提供了 INFO 命令,可以查看缓存的命中率、内存使用等详细信息。

redis-cli INFO

输出中的重要字段:

  • keyspace_hitskeyspace_misses:表示缓存命中和未命中的次数,可以计算缓存命中率:命中率 = keyspace_hits / (keyspace_hits + keyspace_misses)
  • connected_clients:当前连接到 Redis 的客户端数量。
  • used_memory:Redis 使用的内存总量。
  • expired_keys:过期的键数量,有助于判断是否需要优化键的过期策略。

9.2.3 MySQL 监控工具

可以使用 MySQL 的 SHOW STATUS 命令来监控 MySQL 的性能指标,查看连接数、查询数等。

SHOW GLOBAL STATUS;

常用的状态字段:

  • Threads_connected:当前连接的客户端数量。
  • Connections:成功连接的总次数。
  • Queries:服务器处理的查询总数。
  • Slow_queries:执行时间超过 long_query_time 的慢查询总数。

这些信息有助于优化数据库性能,如增加连接池大小或优化查询语句。


9.3. Redis Keyspace Notifications

Redis Keyspace Notifications 是 Redis 提供的键变动通知功能,可以配置 Redis 将键的变动(如过期、删除、更新等)通知到客户端,以便实时监控缓存变化。

9.3.1.配置 Redis Keyspace Notifications

可以在 Redis 配置文件(redis.conf)中启用 Keyspace Notifications,或使用命令行动态配置。

# 监听所有键的过期事件和删除事件
config set notify-keyspace-events Ex
  • Ex:表示启用键的过期事件和删除事件通知。
  • K:表示所有键(keyspace)事件的通知。

9.3.2.Spring Boot 中使用 Redis Keyspace Notifications

在 Spring Boot 中使用 Redis Keyspace Notifications,可以创建一个监听器来接收 Redis 键变动的通知事件。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 监听所有键的过期事件
        container.addMessageListener(listenerAdapter, new PatternTopic("__keyevent@0__:expired"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(new RedisKeyExpirationListener());
    }

    public static class RedisKeyExpirationListener implements MessageListener {
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String expiredKey = message.toString();
            System.out.println("Key expired: " + expiredKey);
            // 在这里处理键过期事件,例如日志记录或重新加载缓存
        }
    }
}

代码解释

  • RedisMessageListenerContainer:配置 Redis 消息监听器容器,允许监听 Redis 中的事件。
  • PatternTopic("__keyevent@0__:expired"):监听 Redis 中 expired 事件,即键过期事件。@0 表示 Redis 数据库索引。
  • RedisKeyExpirationListener:自定义监听器,监听 Redis 键的过期事件。收到过期事件时会调用 onMessage 方法,并打印或记录过期的键。

通过 Keyspace Notifications,可以实时监控 Redis 键的变化,如缓存失效、删除等事件,适合于对缓存内容有严格要求的场景。

10. 应用场景实践

10.1.用户登录与会话管理

在用户登录和会话管理中,通过 Redis 缓存会话信息,可以提高会话的管理效率。Redis 的高性能访问、自动过期机制以及丰富的数据结构,特别适合会话管理。

10.1.1.用户登录与会话管理的实现流程

  1. 用户登录时创建会话:用户成功登录后,将用户信息存入 Redis 缓存中,生成唯一的会话 ID 并设置有效期。
  2. 访问控制:每次用户请求时,通过会话 ID 检查 Redis 中的会话信息,确保用户的会话有效。
  3. 会话续期:当用户在会话有效期内操作时,可以自动延长会话有效期。
  4. 会话删除:当用户登出或会话超时时,从 Redis 中移除会话信息。

10.1.2. 创建用户会话

当用户登录成功时,调用 createSession 方法,在 Redis 中创建会话信息,并设置过期时间。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

@Service
public class SessionService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String SESSION_KEY_PREFIX = "session:"; // 会话键前缀
    private static final long SESSION_TIMEOUT = 30; // 会话超时时间(分钟)

    /**
     * 创建用户会话
     * @param sessionId 会话ID
     * @param userInfo 用户信息对象
     */
    public void createSession(String sessionId, Object userInfo) {
        String sessionKey = SESSION_KEY_PREFIX + sessionId;
        // 存储会话信息,并设置超时时间
        redisTemplate.opsForValue().set(sessionKey, userInfo, SESSION_TIMEOUT, TimeUnit.MINUTES);
        System.out.println("Session created: " + sessionId);
    }
}

代码解释

  • RedisTemplate:Spring Boot 提供的 Redis 操作模板,简化了与 Redis 的交互。
  • SESSION_KEY_PREFIX:会话键的前缀,所有会话数据的键都带有该前缀,以便在 Redis 中区分不同的数据类型。sessionId 作为唯一标识符,将用户信息与特定的会话关联。
  • SESSION_TIMEOUT:会话的超时时间,在用户登录成功后,将会话信息存入 Redis,并设置超时时间(如 30 分钟),超过时间后会话自动失效。
  • createSession 方法
    • sessionKey:Redis 中存储的键,用 sessionId 生成独特键名(例如 session:user123)。
    • redisTemplate.opsForValue().set(...):将 sessionKeyuserInfo 对象绑定,并设置超时时间,使用 TimeUnit.MINUTES 表示 30 分钟后自动过期。

当用户登录成功时,调用 createSession,将会话信息存储到 Redis 中。因为 Redis 高性能访问特性,这样的会话存储操作非常迅速,适合高并发的登录场景。


10.1.3. 获取会话信息

在每次用户请求时,调用 getSession 方法,通过 Redis 查询用户的会话信息,以便验证会话是否有效。

/**
 * 获取用户会话信息
 * @param sessionId 会话ID
 * @return 用户信息对象
 */
public Object getSession(String sessionId) {
    String sessionKey = SESSION_KEY_PREFIX + sessionId;
    return redisTemplate.opsForValue().get(sessionKey);
}

代码解释

  • getSession 方法
    • 根据 sessionId 构造 sessionKey
    • redisTemplate.opsForValue().get(sessionKey) 从 Redis 中获取用户信息。
    • 返回的用户信息对象(userInfo)将用于验证用户的身份、权限等。
    • 如果返回 null,表示会话已过期或无效。

通过 getSession 可以快速获取会话数据,验证会话有效性。


10.1.4. 延长会话有效期

当用户在会话有效期内进行操作时,可以调用 extendSession 方法,延长会话的过期时间,避免用户频繁重新登录。

/**
 * 延长用户会话有效期
 * @param sessionId 会话ID
 */
public void extendSession(String sessionId) {
    String sessionKey = SESSION_KEY_PREFIX + sessionId;
    redisTemplate.expire(sessionKey, SESSION_TIMEOUT, TimeUnit.MINUTES);
    System.out.println("Session extended: " + sessionId);
}

代码解释

  • extendSession 方法
    • redisTemplate.expire(...):重新设置会话的过期时间,延长 SESSION_TIMEOUT 分钟。
    • 每次调用 extendSession 时,Redis 会更新会话的过期时间。
    • 在用户有操作时,通过定期延长有效期,可以有效提高用户体验,避免会话自动过期。

10.1.5. 删除会话信息

在用户主动登出时,调用 deleteSession 删除会话信息,从 Redis 中移除该会话数据。

/**
 * 删除用户会话
 * @param sessionId 会话ID
 */
public void deleteSession(String sessionId) {
    String sessionKey = SESSION_KEY_PREFIX + sessionId;
    redisTemplate.delete(sessionKey);
    System.out.println("Session deleted: " + sessionId);
}

代码解释

  • deleteSession 方法
    • redisTemplate.delete(sessionKey):根据会话 ID 删除 Redis 中对应的会话数据。
    • 当用户退出登录时,删除会话信息,避免无效会话数据滞留在 Redis 中,节省内存空间。

通过 Redis 的自动过期和删除机制,可以实现灵活的会话管理,当用户不再活动或主动退出时,相关会话数据会被自动清理。


10.1.6.应用场景与优势

使用 Redis 管理用户会话可以显著提高系统的响应速度,特别适用于大规模用户并发登录的场景:

  • 高性能访问:Redis 的内存存储和快速响应适合会话管理,确保用户会话数据的高效读写。
  • 自动清理:利用 Redis 的过期机制,自动清理超时会话,避免存储无效数据。
  • 支持分布式部署:在分布式应用中,Redis 提供的会话存储可以在不同节点之间共享,确保会话的一致性和持久性。

 10.2.购物车与库存管理

在电商系统中,购物车与库存管理是高并发场景的核心需求。Redis 提供的哈希结构和原子性操作,非常适合实现高效的购物车管理和库存扣减操作。通过 Redis,可以实现购物车的快速更新,确保用户购物体验流畅,同时利用原子性操作保证库存扣减的一致性。


10.2.1.购物车与库存管理的实现流程

  • 购物车管理:每个用户的购物车可以通过 Redis 哈希结构进行存储,以用户 ID 为前缀的键名为哈希键名,商品 ID 为字段,商品数量为字段值。
  • 库存管理:通过 Redis 的 incrBy 方法进行库存操作,可以确保库存扣减的原子性。
  • 事务控制:在 Redis 中使用事务操作,保证添加商品到购物车和库存扣减的一致性。

我们通过一个 ShoppingCartService 类来实现购物车和库存的操作,包括添加商品到购物车、扣减库存以及将两者操作结合的事务控制。

10.2.2. 购物车管理

Redis 的哈希结构可以将每个用户的购物车作为一个哈希表,键名是 cart:{userId},字段是商品 ID,字段值是商品数量。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class ShoppingCartService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String CART_KEY_PREFIX = "cart:"; // 购物车键前缀

    /**
     * 添加商品到购物车
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 数量
     */
    public void addToCart(String userId, String productId, int quantity) {
        String cartKey = CART_KEY_PREFIX + userId;
        redisTemplate.opsForHash().increment(cartKey, productId, quantity); // 累加商品数量
        System.out.println("Product added to cart: " + productId + ", Quantity: " + quantity);
    }

    /**
     * 查看购物车
     * @param userId 用户ID
     * @return 购物车内容
     */
    public Map<Object, Object> viewCart(String userId) {
        String cartKey = CART_KEY_PREFIX + userId;
        return redisTemplate.opsForHash().entries(cartKey);
    }
}

代码解释

  • CART_KEY_PREFIX:购物车键的前缀,用于区分用户的购物车数据。例如,用户 user1 的购物车键名为 cart:user1
  • addToCart 方法
    • 使用 increment 方法增加商品数量,如果购物车中已有此商品,会累加数量;如果没有,则会新建该商品的记录。
    • 例如:用户 user1 购买商品 product123 数量为 2 时,键 cart:user1 中字段 product123 的值会累加 2。
  • viewCart 方法:使用 entries 获取购物车的所有内容。每个购物车都是一个哈希表,包含商品 ID 和数量的键值对。

在 Redis 中,哈希结构不仅减少了存储空间,还能高效处理购物车中多个商品的数据。


10.2.3. 库存管理

通过 Redis 的原子操作 increment,可以确保库存扣减的操作具有原子性,从而避免超卖问题。每次扣减库存前都会检查是否有足够的库存量。

private static final String STOCK_KEY_PREFIX = "stock:"; // 库存键前缀

/**
 * 减少商品库存
 * @param productId 商品ID
 * @param quantity 数量
 * @return 是否成功
 */
public boolean deductStock(String productId, int quantity) {
    String stockKey = STOCK_KEY_PREFIX + productId;
    Long stock = redisTemplate.opsForValue().increment(stockKey, -quantity); // 扣减库存

    if (stock != null && stock >= 0) {
        System.out.println("Stock deducted for product: " + productId + ", Remaining: " + stock);
        return true;
    } else {
        // 库存不足,回滚扣减操作
        redisTemplate.opsForValue().increment(stockKey, quantity);
        System.out.println("Stock not enough for product: " + productId);
        return false;
    }
}

代码解释

  • STOCK_KEY_PREFIX:库存键的前缀,例如 stock:product123 表示商品 product123 的库存。
  • deductStock 方法
    • 使用 increment 方法原子性地减少库存。-quantity 表示扣减库存。
    • 如果库存足够,返回 true 表示扣减成功;如果库存不足,库存值会回滚,即重新加回 quantity,确保不会产生负库存。
    • 这种原子性操作能有效避免高并发下的超卖问题。

10.2.4. 事务控制:添加商品到购物车并扣减库存

在实际场景中,添加商品到购物车和库存扣减应作为一个事务进行控制。我们可以通过 Redis 的 watch 机制,监视库存变化,并确保购物车和库存操作的一致性。

import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;

public void addToCartAndDeductStock(String userId, String productId, int quantity) {
    redisTemplate.execute(new SessionCallback<Object>() {
        @Override
        public Object execute(RedisOperations operations) {
            String cartKey = CART_KEY_PREFIX + userId;
            String stockKey = STOCK_KEY_PREFIX + productId;

            // 监视库存键,确保库存在操作过程未被其他操作修改
            operations.watch(stockKey);

            // 检查库存是否充足
            Long stock = (Long) operations.opsForValue().get(stockKey);
            if (stock == null || stock < quantity) {
                System.out.println("Stock not enough for product: " + productId);
                return false; // 库存不足,操作终止
            }

            operations.multi(); // 开启事务
            operations.opsForHash().increment(cartKey, productId, quantity); // 添加商品到购物车
            operations.opsForValue().increment(stockKey, -quantity); // 扣减库存
            return operations.exec(); // 执行事务
        }
    });
}

代码解释

  • watch(stockKey):通过 watch 监视库存键 stockKey,确保在事务执行过程中,库存不会被其他操作修改。
  • 库存检查:在事务执行前,先检查 Redis 中的库存是否足够,若不足则终止操作。
  • multi()exec():启动 Redis 事务,先将购物车和库存扣减的指令放入队列,调用 exec 执行事务。若期间库存发生变化,事务将失败,以确保库存和购物车数据的一致性。
  • 事务执行逻辑
    • 使用 increment 将商品添加到购物车。
    • 同时,使用 increment 原子性地减少库存数量。
    • 通过事务控制,确保购物车添加和库存扣减的一致性,防止在高并发情况下库存不足时的错误扣减问题。

10.2.5.应用场景与优势

使用 Redis 进行购物车和库存管理,适合高并发电商场景:

  • 高效购物车管理:Redis 哈希结构可以快速更新和查询购物车中的商品,减少数据库访问压力。
  • 原子性库存操作:Redis 的原子操作确保了库存扣减的并发安全性,避免超卖问题。
  • 事务控制:通过 Redis 事务,可以确保购物车和库存的操作一致性,避免出现库存不足却添加到购物车的情况。

 10.3.数据统计与排行榜

在社交应用、游戏等场景中,经常需要实现实时的数据统计和排行榜功能。Redis 的 Sorted Set 数据结构通过分数排序,为实时更新和快速查询排名提供了理想的解决方案。在 Sorted Set 中,元素是有序的,且可以根据分数动态调整排名,非常适合排行榜、得分统计等需求。


10.3.1.数据统计与排行榜的实现流程

  1. 添加或更新分数:将用户的得分(或其他数据统计值)添加到 Sorted Set 中。如果用户已存在则更新分数,如果用户不存在则添加用户。
  2. 获取排行榜:通过 Sorted Set 的排序特性,可以快速获取前 N 名用户,形成实时排行榜。
  3. 查询用户排名:Redis 提供的 rank 操作可以查询特定用户的当前排名,实现用户排名的实时查询。

以下代码以一个简单的 LeaderboardService 为例,展示了排行榜的增、查操作,包括添加或更新分数、获取前 N 名以及获取用户的实时排名。

10.3.2. 添加或更新分数

使用 Redis 的 ZADD 操作将用户和分数存入 Sorted Set 中。如果用户已存在,则更新分数;如果用户不存在,则添加用户。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class LeaderboardService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String LEADERBOARD_KEY = "leaderboard"; // 排行榜键

    /**
     * 添加或更新用户分数
     * @param userId 用户ID
     * @param score 新增分数
     */
    public void addOrUpdateScore(String userId, double score) {
        redisTemplate.opsForZSet().incrementScore(LEADERBOARD_KEY, userId, score);
        System.out.println("Score updated for user: " + userId + ", Score: " + score);
    }
}

代码解释

  • LEADERBOARD_KEY:用于存储排行榜的 Sorted Set 键名,可以使用业务相关的名称来区分不同排行榜(例如:积分榜、竞赛榜等)。
  • addOrUpdateScore 方法
    • 使用 incrementScore 方法添加或更新用户的分数。
    • userId 表示用户的唯一标识符,score 表示得分。
    • 如果用户已存在于 Sorted Set 中,分数会累加;如果用户不存在则新增该用户的分数记录。
    • 每次分数变动时,Sorted Set 会根据分数自动重新排序,从而确保排行榜始终有序。

10.3.3. 获取排行榜

可以通过 Redis 的 ZRANGE 操作,按分数从高到低获取前 N 名用户,并构成排行榜。此方法适用于展示全局排名。

/**
 * 获取排行榜
 * @param topN 取前N名
 * @return 排名前N的用户ID列表
 */
public Set<Object> getTopN(int topN) {
    return redisTemplate.opsForZSet().reverseRange(LEADERBOARD_KEY, 0, topN - 1);
}

代码解释

  • getTopN 方法
    • 使用 reverseRange 获取 Sorted Set 中前 N 名用户的数据。reverseRange 会按分数从高到低排序。
    • topN 参数指定需要获取的前 N 名用户。
    • 返回的 Set<Object> 中包含前 N 名用户的 ID,可以根据需要进一步查询用户详细信息。

这种方式可以在排行榜页面中直接显示前 N 名用户的实时排名,非常适合用作游戏积分榜、社交平台活跃用户榜等。


10.3.4. 获取用户排名

为了查询特定用户的当前排名,可以使用 Redis 的 ZREVRANK 操作,通过分数从高到低排列,返回用户的名次。

/**
 * 获取用户排名
 * @param userId 用户ID
 * @return 用户排名(1为最高)
 */
public Long getUserRank(String userId) {
    Long rank = redisTemplate.opsForZSet().reverseRank(LEADERBOARD_KEY, userId);
    if (rank != null) {
        System.out.println("Rank for user " + userId + ": " + (rank + 1));
    }
    return rank != null ? rank + 1 : null; // 排名从0开始,需加1
}

代码解释

  • getUserRank 方法
    • 使用 reverseRank 查询指定用户的排名,从分数从高到低排序中获取用户的索引值。
    • 如果用户存在,则返回的索引值加 1 表示用户的实际排名。例如,索引值 0 表示第一名,故 rank + 1 为最终排名。
    • 如果用户不存在,返回 null 表示用户未上榜。

通过实时查询用户排名,可以向用户展示其当前排名位置,增加用户的竞争体验。


10.4.5. 获取用户的分数

有时除了查询用户排名,还需要查询用户的得分。可以使用 Redis 的 ZSCORE 操作获取用户的分数信息。

/**
 * 获取用户的分数
 * @param userId 用户ID
 * @return 用户的分数
 */
public Double getUserScore(String userId) {
    Double score = redisTemplate.opsForZSet().score(LEADERBOARD_KEY, userId);
    if (score != null) {
        System.out.println("Score for user " + userId + ": " + score);
    }
    return score;
}

代码解释

  • getUserScore 方法
    • 使用 score 获取 Sorted Set 中指定用户的分数。
    • 如果用户存在,返回其当前分数;如果用户不存在,返回 null
    • 分数信息可以在用户详情页面、个人排行榜等场景中展示,帮助用户了解自己的得分情况。

10.4.6.应用场景与优势

使用 Redis 的 Sorted Set 实现排行榜和实时数据统计,有以下优势:

  • 实时性强:Redis 的分数排序机制使得排行榜能够实时更新,即用户分数更新后立即调整排名。
  • 高并发性能:Redis 基于内存的数据结构和原子性操作,能够在高并发场景下保持高性能,特别适合游戏、社交平台等。
  • 操作简洁:通过 ZADDZRANGEZSCORE 等操作,能够快速实现排名查询和数据统计,降低开发复杂度。

应用场景

  • 游戏积分排行榜:展示游戏玩家的得分和排名,实时更新玩家的竞争状态。
  • 社交平台活跃度排名:根据活跃度(如发帖、点赞、评论等)进行排序,鼓励用户保持活跃。
  • 商户销售排名:电商平台可以基于商户的销售额,展示月度或年度销售排行榜,激励商家提升业绩。

原文地址:https://blog.csdn.net/m0_73837751/article/details/143511561

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!