SpringBoot 结合 Redis 实现滑动时间窗口限流

SpringBoot 结合 Redis 实现滑动时间窗口限流

文章目录

  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。

系统环境:

  • SpringBoot 版本: 3.3.2
  • JAVA JDK 版本: openjdk 21.0.2

参考地址:

一、什么是滑动时间窗口限流

滑动时间窗口是一种限流算法,它可以在一段时间内限制请求的数量,以保护后端服务免受大流量冲击。

滑动时间窗口限流算法会划分一个特定大小的时间窗口,然后这个时间窗口随时间轴移动,每隔一段时间或者每次请求时就会将时间窗口内的请求次数进行累加求和,如果总的请求次数超过了预设的阈值,那么后续执行的请求将会被拒绝执行。

滑动时间窗口限流算法的主要优点是可以在单位时间内平滑的控制流量,相较于固定时间窗口限流仅能简单地设置固定的请求数或速率相比,滑动时间窗口可以更为灵活的应对突发流量或峰值流量,而不会因为固定速率的限制而浪费或降低系统性能。

二、滑动时间窗口限流的应用场景

滑动时间窗口限流算法适用的场景主要有以下几个:

  • API限流: 控制 API 调用频率,防止滥用。
  • 防止恶意攻击 防止恶意用户对服务进行洪水攻击。
  • 应对突发流量 滑动时间窗口算法可以更好地应对突发流量,平滑处理访问请求。
  • 控制服务降级: 在高负载的服务中,控制服务的降级,保证核心功能的正常运行。
  • 控制高并发访问: 对于一些高并发的系统,如电商秒杀、抢票等场景,滑动时间窗口限流可以有效地控制单位时间内的访问量,防止服务器过载。

三、滑动时间窗口的实现原理

3.1 滑动时间窗口算法原理

时间滑动窗口算法是一种用于实现平滑限流的技术,它不像固定时间窗口那样定义明确的起点和终点,而是采用一种动态的方式。具体来说,该算法的工作原理如下:

  • ⑴ 确定当前时间点: 每次处理请求时,都将当前请求的时间点视为统计时间窗口的终点。
  • ⑵ 计算窗口起点: 从这个终点时间向前回溯一个预设的时间间隔 (即时间窗口长度),得到窗口的起点。
  • ⑶ 统计请求次数: 统计从窗口起点到当前时间点 (即窗口终点) 这段时间内的请求数量。
  • ⑷ 检查阈值: 最后,判断这段时间内的请求总数是否超过了设定的限流阈值。

这种方法可以更准确地控制单位时间内通过系统的请求数量,避免了固定窗口方法中可能出现的突发流量和临界问题。

3.2 滑动时间窗口算法图示

滑动时间窗口算法需要定义一个 时间窗口大小,并且设置时间窗口内 限流次数。然后以当前请求时间作为时间窗口的 终点,以终点时间点向前推时间窗口长度的时间点作为时间窗口的 起点,最后统计从时间窗口起点到终点这段时间内的 请求次数,是否达到了 限流次数。具体如下图所示:

滑动时间窗口限流图示

也就是说,在每次进行请求时,先判断窗口内总的请求次数是否达到了限流次数,如果没有则使窗口内的请求次数值 +1,否则进行提醒已经达到了请求上限。

四、Redis 实现滑动时间窗口流程

4.1 ZSET 集合的几个特性

使用 Redis 的 ZSET 有序集合来实现滑动窗口限流是一种常见的做法。这种实现方式利用了 ZSET 的几个关键特性:

  • ZSET 集合中的值不能重复,保证了每个请求只能被记录一次,避免了重复计数的问题。
  • ZSET 集合可以使用 ZADD 命令,向集合中添加数据,并可以指定时间戳作为分数(score)。
  • ZSET 集合可以按照分数 (score) 排序,也就是说使用时间戳作为分数,则可以按照时间戳进行排序。
  • ZSET 集合可以使用 ZREMRANGEBYSCORE 命令,移除那些已经超出滑动窗口时间范围的旧请求。
  • ZSET 集合可以使用 ZCARD 命令,统计集合中的元素数量。

4.2 ZSET 集合实现的步骤

使用 Redis 的 ZSET 有序集合来实现滑动窗口限流是一种有效的方法。下面是基于 ZSET 实现滑动窗口限流的大致步骤:

  • ⑴ 计算时间窗口开始位置: 每次处理新请求时,首先计算当前时间 (即窗口的末尾) 减去设定的时间窗口大小,从而得出当前时间窗口的开始位置。
  • ⑵ 清理窗口前的历史数据: 由于只需要考虑当前时间窗口内的请求次数,因此需要清除窗口开始位置之前的所有请求数据。 。
  • ⑶ 计算窗口内的请求次数: 清理掉事件窗口之前的请求数据后,剩余的就是当前时间窗口内的请求次数,直接统计这个数值即可。
  • ⑷ 判断是否是否达到限流: 判断当前时间窗口内的请求次数是否达到限流阈值,给出处理逻辑,允许请求或者拒绝执行。

将上述步骤应用到 Redis 的 ZSET 集合中,具体操作如下:

  • ⑴ 定义参数: 定义 "时间窗口大小"、"当前时间"、"请求次数阈值" 三个参数;
  • ⑵ 计算时间窗口的开始位置: 用当前时间减去时间窗口大小来计算时间窗口的开始位置。
  • ⑶ 清理窗口前的历史数据: 使用 ZREMRANGEBYSCORE 命令移除所有时间戳小于当前时间窗口开始位置前的元素。
  • ⑷ 计算窗口内的请求次数: 使用 ZCARD 命令统计当前时间窗口内的元素数量;
  • ⑸ 判断是否达到限流阈值: 判断当前时间窗口内的请求次数是否达到限流阈值。如果达到阈值,则返回 0 表示请求被拒绝;否则返回 1 表示请求通过,并将当前请求的时间戳使用 ZADD 命令添加到 ZSET 集合中。

需要注意的是,虽然这种方法适用于请求次数较少的情况,但在高并发环境下 (比如每秒上万次请求),可能会导致 Redis 的性能瓶颈。这是因为每次请求都需要执行 ZADD、ZREMRANGEBYSCORE 和 ZCARD 操作,这可能会导致 Redis 请求过于频繁,从而增加 Redis 服务器的负担。对于高并发场景,可以考虑使用更高效的算法或者数据结构来减少 Redis 的负载。

4.3 使用 Redis Lua 脚本实现

在使用 Redis ZSET 实现滑动窗口限流时,可以通过使用 Redis Lua 脚本来确保操作的原子性,从而避免在高并发情况下出现多个请求同时通过限流的问题。具体的 Lua 脚本内容如下:

 1-- ====== 滑动时间窗口计算 Lua 脚本 ======
 2-- KEYS[1]: zset集合Key名称
 3-- ARGV[1]: 当前时间(窗口终点)
 4-- ARGV[2]: 事件窗口大小
 5-- ARGV[3]: 限流阈值
 6-- ARGV[4]: 随机值(由于ZSET中的值不能重复,所以需要加上随机值来保证唯一性)
 7
 8-- 参数设置
 9local key = KEYS[1]
10local current_time = tonumber(ARGV[1])
11local window_length = tonumber(ARGV[2])
12local limit_number = tonumber(ARGV[3])
13local random_val = ARGV[4]
14
15-- 计算窗口的起始时间(窗口起点)
16local window_start = current_time - window_length
17
18-- 移除窗口时间之前的全部请求次数记录
19redis.call('zremrangebyscore', key, '-inf', window_start)
20
21-- 统计当前事件窗口中的总请求次数
22local current_requests = redis.call('zcard', key)
23
24-- 如果当前窗口内请求次数小于阈值,则添加当前时间到窗口内并返回1,否则返回0
25if current_requests < limit_number then
26    -- 添加当前请求到窗口内,并且设置score为当前事件,设置value为随机值
27    redis.call('zadd', key, current_time, random_val)
28    -- 为了避免Key永不过期,这里设置Key的过期时间为1小时
29    redis.call('expire', key, 3600)
30    return 1
31else
32    return 0
33end

五、SpringBoot+Redis 实现滑动时间窗口示例

这里给出一个 SpringBoot 结合 Redis,以 Lua 脚本的方式实现滑动时间窗口限流的示例。示例整体步骤如下:

  • 在 Maven 的 pom.xml 中引入示例中需要使用到的相关依赖;
  • application.yml 配置文件中设置 Redis 连接参数;
  • 创建 Redis 配置类,并且设置 RedisTemplate 的序列化方式;
  • 创建一个可以作用在方法上的限流注解 @RateLimit,以及当达到限流后抛出的限流异常类;
  • 创建全局异常处理类,用于处理限流异常,返回客户端限流的错误信息;
  • 创建一个 Redis Lua 脚本,并在脚本中实现限流的核心逻辑,从而确保 Redis 相关操作的原子性;
  • 创建限流实现类,并定义 Lua 脚本中使用到的参数,其中包含调用 Lua 脚本执行限流的逻辑;
  • 创建限流 AOP 切面类,用于拦截添加了 @RateLimit 注解的方法,并在逻辑中设置达到限流阈值后抛出限流异常;
  • 创建用于测试限流的 Controller 类,里面定义一个测试的方法,并且在方法上添加 @RateLimit 注解;
  • 启动 SpringBoot 应用,访问测试接口,测试限流是否生效。

5.1 Maven 引入相关依赖

pom.xml 文件中引入 SpringBoot、Redis、AOP 的依赖,如下所示:

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4    <modelVersion>4.0.0</modelVersion>
 5
 6    <parent>
 7        <groupId>org.springframework.boot</groupId>
 8        <artifactId>spring-boot-starter-parent</artifactId>
 9        <version>3.3.2</version>
10        <relativePath/>
11    </parent>
12
13    <groupId>club.mydlq</groupId>
14    <artifactId>spring-boot-windows-limit</artifactId>
15    <version>0.0.1-SNAPSHOT</version>
16    <name>spring-boot-windows-limit</name>
17    <description>sliding windows limit flow</description>
18
19    <properties>
20        <java.version>21</java.version>
21        <maven.compiler.source>21</maven.compiler.source>
22        <maven.compiler.target>21</maven.compiler.target>
23    </properties>
24
25    <dependencies>
26        <dependency>
27            <groupId>org.springframework.boot</groupId>
28            <artifactId>spring-boot-starter-web</artifactId>
29        </dependency>
30        <dependency>
31            <groupId>org.springframework.boot</groupId>
32            <artifactId>spring-boot-starter-aop</artifactId>
33        </dependency>
34        <dependency>
35            <groupId>org.springframework.boot</groupId>
36            <artifactId>spring-boot-starter-data-redis</artifactId>
37        </dependency>
38    </dependencies>
39
40    <build>
41        <plugins>
42            <plugin>
43                <groupId>org.springframework.boot</groupId>
44                <artifactId>spring-boot-maven-plugin</artifactId>
45            </plugin>
46        </plugins>
47    </build>
48
49</project>

5.2 配置 Redis 连接参数

创建 application.yml 配置文件,配置 Redis 集群的连接参数,如下所示:

 1spring:
 2  application:
 3    name: spring-boot-windows-limit
 4  data:
 5    redis:
 6      host: 127.0.0.1
 7      port: 6379
 8      database: 0
 9      lettuce:
10        pool:
11          max-active: 100
12          max-idle: 20
13          min-idle: 0
14          max-wait: 2000

5.3 创建 Redis 配置类

创建 Redis 配置类,设置 RedisTemplate 的序列化方式。

注: 如果这里不设置序列化方式,会导致 RedisTemplate 执行 lua 脚本时出现参数错误。

 1import org.springframework.context.annotation.Bean;
 2import org.springframework.context.annotation.Configuration;
 3import org.springframework.data.redis.connection.RedisConnectionFactory;
 4import org.springframework.data.redis.core.RedisTemplate;
 5import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
 6import org.springframework.data.redis.serializer.RedisSerializer;
 7import org.springframework.data.redis.serializer.StringRedisSerializer;
 8
 9/**
10 * Redis 配置
11 *
12 * @author mydlq
13 */
14@Configuration
15public class RedisConfig {
16
17    @Bean("rateLimitRedisTemplate")
18    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
19        // 字符串序列化器
20        RedisSerializer<String> stringRedisSerializer = new StringRedisSerializer();
21        // JSON序列化器
22        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
23        // 配置 RedisTemplate 序列化方式
24        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
25        redisTemplate.setConnectionFactory(factory);
26        redisTemplate.setKeySerializer(stringRedisSerializer);
27        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
28        redisTemplate.setHashKeySerializer(stringRedisSerializer);
29        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
30        return redisTemplate;
31    }
32
33}

5.4 创建限流注解

创建限流注解 @RateLimit,这样设置限流只对添加了当前注解的方法生效。

 1import java.lang.annotation.ElementType;
 2import java.lang.annotation.Retention;
 3import java.lang.annotation.RetentionPolicy;
 4import java.lang.annotation.Target;
 5
 6/**
 7 * 限流注解
 8 *
 9 * @author mydlq
10 */
11@Target(ElementType.METHOD)
12@Retention(RetentionPolicy.RUNTIME)
13public @interface RateLimit {
14    /**
15     * 限流的key,如果不指定,则使用类名+方法名
16     */
17    String key() default "";
18
19    /**
20     * 时间窗口大小(即限流时间,单位秒,默认60秒)
21     */
22    int time() default 60;
23
24    /**
25     * 限制次数
26     */
27    int limit();
28}

5.5 创建限流异常类

创建一个继承 RuntimeException 的限流异常类,用于当请求达到限制的次数时,抛出该异常信息。

 1/**
 2 * 限流异常
 3 *
 4 * @author mydlq
 5 */
 6public class RateLimitException extends RuntimeException {
 7
 8    public RateLimitException(String message) {
 9        super(message);
10    }
11
12}

5.6 创建全局异常处理类

创建一个全局异常处理类,在类中添加一个用于处理限流异常的方法,这样当请求达到限制的次数时,会抛出该异常,然后由全局异常处理类进行处理,返回客户端对应的错误信息。

 1import club.mydlq.rate.limit.exception.RateLimitException;
 2import org.springframework.http.HttpStatus;
 3import org.springframework.http.ResponseEntity;
 4import org.springframework.web.bind.annotation.ExceptionHandler;
 5import org.springframework.web.bind.annotation.RestControllerAdvice;
 6
 7/**
 8 * 全局异常处理
 9 *
10 * @author mydlq
11 */
12@RestControllerAdvice
13public class GlobalExceptionHandler {
14
15    /**
16     * 限流异常处理
17     * @param ex 限流异常
18     * @return 限流后响应的信息
19     */
20    @ExceptionHandler(RateLimitException.class)
21    public ResponseEntity<String> handleRateLimitExceededException(RateLimitException ex) {
22        // 返回 HTTP 状态码 429 和自定义的限流错误消息
23        return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(ex.getMessage());
24    }
25
26}

5.7 创建滑动窗口限流 Lua 脚本

/resorces/redis 目录下创建 slidingWindow.lua 脚本,用于实现滑动窗口限流。

 1-- ====== 滑动时间窗口计算 Lua 脚本 ======
 2-- KEYS[1]: zset集合Key名称
 3-- ARGV[1]: 当前时间(窗口终点)
 4-- ARGV[2]: 事件窗口大小
 5-- ARGV[3]: 限流阈值
 6-- ARGV[4]: 随机值(由于ZSET中的值不能重复,所以需要加上随机值来保证唯一性)
 7
 8-- 参数设置
 9local key = KEYS[1]
10local current_time = tonumber(ARGV[1])
11local window_length = tonumber(ARGV[2])
12local limit_number = tonumber(ARGV[3])
13local random_val = ARGV[4]
14
15-- 计算窗口的起始时间(窗口起点)
16local window_start = current_time - window_length
17
18-- 移除窗口时间之前的全部请求次数记录
19redis.call('zremrangebyscore', key, '-inf', window_start)
20
21-- 统计当前事件窗口中的总请求次数
22local current_requests = redis.call('zcard', key)
23
24-- 如果当前窗口内请求次数小于阈值,则添加当前时间到窗口内并返回1,否则返回0
25if current_requests < limit_number then
26    -- 添加当前请求到窗口内,并且设置score为当前事件,设置value为随机值
27    redis.call('zadd', key, current_time, random_val)
28    -- 为了避免Key永不过期,这里设置Key的过期时间为1小时
29    redis.call('expire', key, 3600)
30    return 1
31else
32    return 0
33end

5.8 创建滑动窗口限流类

创建滑动窗口限流类,用于执行 Lua 脚本,实现滑动窗口限流。

 1package club.mydlq.rate.limit.common;
 2
 3import jakarta.annotation.Resource;
 4import org.springframework.core.io.ResourceLoader;
 5import org.springframework.data.redis.core.RedisTemplate;
 6import org.springframework.data.redis.core.script.DefaultRedisScript;
 7import org.springframework.stereotype.Component;
 8import java.util.Collections;
 9import java.util.UUID;
10import java.util.concurrent.TimeUnit;
11
12/**
13 * 时间窗口算法实现的限流器
14 *
15 * @author mydlq
16 */
17@Component
18public class SlidingWindowRateLimiter {
19
20    @Resource
21    private ResourceLoader resourceLoader;
22    @Resource(name = "rateLimitRedisTemplate")
23    private RedisTemplate<String, Object> redisTemplate;
24
25    /**
26     * 通过滑动窗口算法实现限流
27     *
28     * @param key   KEY
29     * @param limit 指定时间内允许的请求次数
30     * @param time  时间窗口大小,单位秒
31     * @return true 表示通过,false 表示被限流
32     */
33    public boolean allowRequest(String key, int time, int limit) {
34        // 当前时间戳(窗口终点)
35        long currentTime = System.currentTimeMillis();
36        // 时间窗口大小(默认为秒,所以需要转换为毫秒数)
37        long windowsSize = TimeUnit.SECONDS.toMillis(time);
38        // 随机值(由于ZSET中的值不能重复,所以需要加上随机值来保证唯一性)
39        String randomVal = UUID.randomUUID().toString();
40
41        // 设置 lua 脚本对象
42        DefaultRedisScript<Long> script = new DefaultRedisScript<>();
43        script.setResultType(Long.class);
44        script.setLocation(resourceLoader.getResource("classpath:/redis/slidingWindow.lua"));
45
46        // 设置 lua 脚本中使用到的 ARGV 参数数组
47        Object[] argv = new Object[] {currentTime, windowsSize, limit, randomVal};
48
49        // 执行 Lua 脚本
50        Long result = redisTemplate.execute(script, Collections.singletonList(key), argv);
51
52        // 判断结果,如果结果为null或者1,则表示通过
53        return result == null || result == 1L;
54    }
55
56}

5.9 创建限流 AOP 切面

创建限流 AOP 切面,用于拦截限流注解的方法,并执行限流逻辑。

 1import club.mydlq.rate.limit.exception.RateLimitException;
 2import io.micrometer.common.util.StringUtils;
 3import jakarta.annotation.Resource;
 4import org.aspectj.lang.ProceedingJoinPoint;
 5import org.aspectj.lang.annotation.Around;
 6import org.aspectj.lang.annotation.Aspect;
 7import org.aspectj.lang.reflect.MethodSignature;
 8import org.springframework.stereotype.Component;
 9
10/**
11 * 限流 Aspect
12 *
13 * @author mydlq
14 */
15@Aspect
16@Component
17public class RateLimitAspect {
18
19    @Resource
20    private SlidingWindowRateLimiter slidingWindowRateLimiter;
21
22    private static final String OPEN_RATE_LIMIT = "test,limit:%s:%s";
23
24    @Around("@annotation(rateLimit)")
25    public Object rateLimitAdvice(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
26        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
27        // 获取类名
28        String className = signature.getDeclaringType().getSimpleName();
29        // 获取方法名
30        String methodName = signature.getName();
31
32        // 拼接限流key
33        String key = StringUtils.isBlank(rateLimit.key()) ?
34                OPEN_RATE_LIMIT + className + "_" + methodName :
35                rateLimit.key();
36
37        // 判断是否达到限流阈值,是则抛出限流异常
38        boolean allowRequest = slidingWindowRateLimiter.allowRequest(key, rateLimit.time(), rateLimit.limit());
39        if (!allowRequest) {
40            throw new RateLimitException("请求过于频繁,请稍后再试");
41        }
42
43        return joinPoint.proceed();
44    }
45
46}

5.10 创建限流测试 Controller 类

创建测试的 Controller 类,用于后续对限流进行测试。

 1import club.mydlq.rate.limit.common.RateLimit;
 2import org.springframework.http.ResponseEntity;
 3import org.springframework.web.bind.annotation.GetMapping;
 4import org.springframework.web.bind.annotation.RequestMapping;
 5import org.springframework.web.bind.annotation.RestController;
 6
 7/**
 8 * 限流测试
 9 *
10 * @author mydlq
11 */
12@RestController
13@RequestMapping("/test")
14public class LimitTestController {
15
16    @RateLimit(key = "test-key", time = 10, limit = 5)
17    @GetMapping("/query")
18    public ResponseEntity<String> testLimit() {
19        return ResponseEntity.ok("OK");
20    }
21
22}

5.11 创建启动类

 1import org.springframework.boot.SpringApplication;
 2import org.springframework.boot.autoconfigure.SpringBootApplication;
 3
 4@SpringBootApplication
 5public class Application {
 6
 7    public static void main(String[] args) {
 8        SpringApplication.run(Application.class, args);
 9    }
10
11}

5.12 测试限流效果

在上面我们创建了一个用于测试的 Controller 类,提拱了一个限流的接口 testLimit,并且使用 @RateLimit 注解设置了限流的时间为 10 秒,限流的次数为 5 次。

接下来我们启动 Spring Boot 项目,访问 http://localhost:8080/test/query 接口,连续 5 次访问,可以看到返回结果都为 请求成功,然后再次访问,可以看到返回结果为 请求过于频繁,请稍后再试,说明限流生效。

---END---


  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。