SpringBoot 使用 Zookeeper 与 Redis 实现分布式锁

SpringBoot 使用 Zookeeper 与 Redis 实现分布式锁

文章目录

  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。


系统环境:

  • Redis 版本:5.0.7
  • Java JDK 版本:1.8
  • Redisson 版本:3.13.1
  • SpringBoot 版本:2.3.12.RELEASE
  • Zookeeper 版本:zookeeper-3.4.14

参考地址:

示例项目地址:

一、简介

1、什么是分布式锁

       理解分布式锁之前,首先了解下什么是分布式架构,所谓 分布式架构 简单来说就是将相同或者不同服务部署在不同的服务器上,一个服务负责一个或多个功能,服务间通过网络协议进行通信。

       那么,再来谈谈什么是分布式锁,所谓 分布式锁 就是 控制分布式服务间同步访问共享资源的一种方式,在分布式系统中,常常需要协调他们的动作。如果不同的服务或是同一个服务的不同实例之间共享了一个或一组资源,那么访问这些资源的时候,往往需要 互斥 来防止彼此干扰来 保证一致性,在这种情况下,便需要使用到分布式锁。

2、分布式锁使用场景

上面介绍了下什么是分布式锁,那么再说下它的使用场景:

       比如,要开发商城系统时候使用 分布式架构,在分布式架构情况下的服务一般有多个实例。这里这个系统中的 商品 都会存在 库存库存服务 也是分别有两个,分别为 服务实例A服务实例B 实例,在同一时间内 用户A用户B 同时对 商品A 进行下单,比如当前 商品A库存为 100,然后执行 两个下单命令 后正常来说 剩余库存应当为 98, 不过,因为是分布式系统,两个用户执行下单后,下单指令被负载均衡到 服务实例A服务实例B,两个实例同时对库存进行扣减操作,不过问题来了,它们在执行扣减前得先获取当前商品库存数量,于是它们都同时获取 当前库存数 100,然后 服务实例A 执行扣减库存 100-1=99服务实例B 也执行扣减库存 100-1=99,最终 商品A 库存为 99

出现 上面的情况,其最主要的 原因就是两个实例之前没有通信,无法得知在执行扣减库存时候,是否存另一个实例也正在操作更改库存数量。所以,这时候如果有一个 中间件 可以存储这个公共的锁,这两个实例都通过这个 中间件 获取这把锁,谁能得到这把锁就能操作库存的更改,增加或者扣减库存,这样就能解决这个问题,这个就是分布式锁。

3、分布式锁的实现方式

而分布式锁常用中间件有如下三种:

  • 基于 Redis 实现;
  • 基于 Zookeeper 实现;
  • 基于数据库实现(性能差,问题多,所以不推荐,所以不过多叙述);

4、分布式锁具备条件

  • 具备可重入特性;
  • 具备锁失效机制,防止死锁;
  • 由谁加锁,就应该由谁解锁;
  • 高可用、高性能的获取锁与释放锁;
  • 具备互斥性,统一时间只能有一个实例获取锁;
  • 具有原子性,加锁和设置超时时间等操作应该为一个原子操作;

5、Redis 简介

什么是 Redis:

       Redis 是一个高性能的 Key-Value 数据库,它是完全开源免费的,而且 Redis 是一个 NoSQL 类型数据库,是为了解决 高并发、高扩展,大数据存储 等一系列的问题而产生的数据库解决方案,是一个非关系型的数据库。

使用 Redis 如何实现分布式锁:

在 Java 的 Spring 框架中,常与 spring-data-redis 组件结合 操作 Redis,虽然里面有很多直接操作 Redis 的方法,不过遗憾的是其中并 没有 Redis 锁 的方法实现,其实现还是需要自己进行一些封装,且封装好的方法大多数都是针对 Redis 单节点 的,对于 Redis 集群 可能实现起来比较困难。所以,常常需要使用 Redis 锁时,更推荐使用另一款组件 Redisson,该组件已经封装好了实现 Redis 分布式锁 的方法,我们可以轻松调用,且其也实现了 红锁(支持 Redis 集群)公平锁 等等很多。Redisson 实现分布式锁的原理这里不过多叙述,推荐看 Redisson 实现分布式锁 这篇文章。

6、Zookeeper 简介

什么是 Zookeeper:

       ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 Hbase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括 配置维护域名服务分布式同步组服务等。

Zookeeper 多层级节点:

       Zookeeper 提供一个多层级的节点命名空间 znode,每个节点都用一个以斜杠 / 分隔的路径表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。

       例如 /foo/doo 这个表示一个 znode,它的父节点为 /foo,父父节点为 /,而 / 为根节点没有父节点。与文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据而目录节点不行。Zookeeper 为了保证 高吞吐低延迟,在内存中维护了这个树状的目录结构,这种特性使得 Zookeeper不能用于存放大量的数据,每个节点的存放数据上限为 1M。

       为了保证高可用 Zookeeper 需要以 集群 形态来部署,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么 Zookeeper 本身仍然是可用的。客户端在使用 Zookeeper 时,需要知道集群机器列表,通过与集群中的某一台机器建立 TCP 连接来使用服务,客户端使用这个 TCP 链接来发送请求、获取结果、获取监听事件以及发送心跳包。如果这个连接异常断开了,客户端可以连接到另外的机器上。

Zookeeper 的四种节点类型:

  • 持久节点(PERSISTENT):创建节点的客户端与 Zookeeper 断开连接后,该节点依旧存在,这是默认的节点类型。
  • 持久顺序节点(PERSISTENT_SEQUENTIAL):创建节点时 Zookeeper 会根据创建的时间顺序给该节点名称进行编号。
  • 临时节点(EPHEMERAL):与持久节点相反,当创建节点的客户端与 Zookeeper 断开连接后,临时节点会被删除。
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):临时顺序节点结合和临时节点和顺序节点的特点,在创建节点时 Zookeeper 会根据创建的时间顺序给该节点名称进行编号,当创建节点的客户端与 Zookeeper 断开连接后,临时节点会被删除。

Zookeeper 的事件监听:

通过 Zookeeper 的事件监听机制可以让客户端收到节点状态变化。主要的事件类型有节点数据变化、节点的删除和创建。

使用 Zookeeper 如何实现分布式锁:

内容过多,这里不过多叙述,推荐看 10分钟看懂!ZooKeeper 典型应用场景:分布式锁 这篇文章。

二、使用 Spring 的 spring-data-redis 实现分布式锁

       Spring 官方项目中存在 spring-data-redis 项目,内部封装了 Lettuce 客户端,所以它是一个非常利于操作 Redis 的组件,再结合 Redis 连接池能够非常方便的操作 Redis,这里我们使用该组件实现分布式锁。

需要注意的是,此方案不支持 Redis 集群模式

1、Maven 引入相关依赖

Maven 中引入需要的相关依赖,主要如下:

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4    <modelVersion>4.0.0</modelVersion>
 5    
 6    <parent>
 7        <groupId>org.springframework.boot</groupId>
 8        <artifactId>spring-boot-starter-parent</artifactId>
 9        <version>2.3.12.RELEASE</version>
10    </parent>
11
12    <groupId>mydlq.club</groupId>
13    <artifactId>springboot-interface-idempotency</artifactId>
14    <version>0.0.1</version>
15    <name>springboot-interface-idempotency</name>
16    <description>springboot interface idempotency</description>
17
18    <properties>
19        <java.version>1.8</java.version>
20    </properties>
21
22    <dependencies>
23        <!--Lombok,业界常用基础组件,方便实现log方法与实体类get、set等-->
24        <dependency>
25            <groupId>org.projectlombok</groupId>
26            <artifactId>lombok</artifactId>
27            <optional>true</optional>
28        </dependency>
29        <!--SpringBoot-->
30        <dependency>
31            <groupId>org.springframework.boot</groupId>
32            <artifactId>spring-boot-starter-web</artifactId>
33        </dependency>
34        <!--Spring-data-redis-->
35        <dependency>
36            <groupId>org.springframework.boot</groupId>
37            <artifactId>spring-boot-starter-data-redis</artifactId>
38        </dependency>
39         <!--为了使用 lettuce 线程池,必须使用该依赖-->
40        <dependency>
41            <groupId>org.apache.commons</groupId>
42            <artifactId>commons-pool2</artifactId>
43        </dependency>
44    </dependencies>
45
46    <build>
47        <plugins>
48            <plugin>
49                <groupId>org.springframework.boot</groupId>
50                <artifactId>spring-boot-maven-plugin</artifactId>
51            </plugin>
52        </plugins>
53    </build>
54
55</project>

2、配置文件中设置相关参数

在 SpringBoot 的 applicaiton.yaml 配置文件中,设置连接 Redis 和连接池的参数,配置如下:

 1spring:
 2  redis:
 3    database: 0             #数据库
 4    #password: 123456       #数据库密码
 5    timeout: 1000           #超时时间
 6    host: 127.0.0.1         #Reids主机地址
 7    port: 6379              #Redis端口号
 8    lettuce:                #使用 lettuce 连接池
 9      pool:
10        max-active: 20      #连接池最大连接数(使用负值表示没有限制)
11        max-wait: -1        #连接池最大阻塞等待时间(使用负值表示没有限制)
12        min-idle: 0         #连接池中的最大空闲连接
13        max-idle: 10        #连接池中的最小空闲连接

3、创建分布式锁操作类

创建用于操作 Redis 实现分布式锁的方法,主要代码如下:

 1import lombok.extern.slf4j.Slf4j;
 2import org.springframework.beans.factory.annotation.Autowired;
 3import org.springframework.data.redis.core.StringRedisTemplate;
 4import org.springframework.data.redis.core.script.DefaultRedisScript;
 5import org.springframework.data.redis.core.script.RedisScript;
 6import org.springframework.stereotype.Service;
 7import java.util.Arrays;
 8import java.util.concurrent.TimeUnit;
 9
10@Slf4j
11@Service
12public class RedisLock {
13
14    /**
15     * RedisTemplate 对象
16     */
17    @Autowired
18    private StringRedisTemplate redisTemplate;
19
20    /**
21     * 获取分布式锁
22     *
23     * @param key            分布式锁 key
24     * @param value          分布式锁 value
25     * @param expireTime     锁的超时时间,防止死锁
26     * @param expireTimeUnit 锁的超时时间单位
27     * @return 是否成功获取分布式锁
28     */
29    public boolean lock(String key, String value, int expireTime, TimeUnit expireTimeUnit) {
30        Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, expireTimeUnit);
31        if (Boolean.TRUE.equals(result)) {
32            log.info("线程 {} - 申请锁( {} | {} )成功", Thread.currentThread().getName(), key, value);
33            return true;
34        }
35        log.info("线程 {} - 申请锁( {} | {} )失败", Thread.currentThread().getName(), key, value);
36        return false;
37    }
38
39    /**
40     * 尝试获取分布式锁,并设置获取锁的超时时间
41     *
42     * @param key                分布式锁 key
43     * @param value              分布式锁 value
44     * @param expireTime         锁的超时时间,防止死锁
45     * @param expireTimeUnit     锁的超时时间单位
46     * @param acquireTimeout     尝试获取锁的等待时间,如果在时间范围内获取锁失败,就结束获取锁
47     * @param acquireTimeoutUnit 尝试获取锁的等待时间单位
48     * @return 是否成功获取分布式锁
49     */
50    public boolean tryLock(String key, String value, int expireTime, TimeUnit expireTimeUnit, int acquireTimeout, TimeUnit acquireTimeoutUnit) {
51        try {
52            // 尝试自旋获取锁,等待配置的一段时间,如果在时间范围内获取锁失败,就结束获取锁
53            long end = System.currentTimeMillis() + acquireTimeoutUnit.toMillis(acquireTimeout);
54            while (System.currentTimeMillis() < end) {
55                // 尝试获取锁
56                Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, expireTimeUnit);
57                // 验证是否成功获取锁
58                if (Boolean.TRUE.equals(result)) {
59                    log.info("线程 {} - 申请锁( {} | {} )成功", Thread.currentThread().getName(), key, value);
60                    return true;
61                }
62                // 睡眠 50 毫秒
63                Thread.sleep(50);
64            }
65        } catch (InterruptedException e) {
66            Thread.currentThread().interrupt();
67        } catch (Exception e) {
68            log.error("申请锁(" + key + "|" + value + ")错误:", e);
69        }
70        log.info("线程 {} - 申请锁( {} | {} )失败", Thread.currentThread().getName(), key, value);
71        return false;
72    }
73
74    /**
75     * 释放锁
76     *
77     * @param key   设置分布式锁 key
78     * @param value 设置分布式锁 value
79     */
80    public void unLock(String key, String value) {
81        String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
82        RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
83        Long result = redisTemplate.execute(redisScript, Arrays.asList(key, value));
84        if (result != null && result != 0L) {
85            log.info("线程 {} - 解锁({} | {}})成功", Thread.currentThread().getName(), key, value);
86        } else {
87            log.info("线程 {} - 解锁({} | {})失败!", Thread.currentThread().getName(), key, value);
88        }
89    }
90
91}

这里的 unLock 方法中需要执行多个操作,所以这里使用 Lua 脚本保证执行的原子性。

4、创建 Controller 类并使用分布式锁

创建一个用于 测试的 Controller 类 来提供测试接口,里面使用线程池来模拟多个线程调用,并且使用上面类中 获取锁解锁 的方法。

 1import lombok.extern.slf4j.Slf4j;
 2import org.springframework.beans.factory.annotation.Autowired;
 3import org.springframework.web.bind.annotation.GetMapping;
 4import org.springframework.web.bind.annotation.RestController;
 5import java.util.UUID;
 6import java.util.concurrent.ExecutorService;
 7import java.util.concurrent.Executors;
 8import java.util.concurrent.TimeUnit;
 9
10/**
11 * Redis Lock 测试接口
12 *
13 * @author mydlq
14 */
15@Slf4j
16@RestController
17public class LockController {
18
19    /**
20     * Redis 分布式锁操作类
21     */
22    @Autowired
23    private RedisLock redisLock;
24
25    /**
26     * 线程池
27     */
28    ExecutorService executor = Executors.newFixedThreadPool(10);
29
30    @GetMapping("/lock")
31    public void lockTest() {
32        for (int i = 0; i < 100; i++) {
33            executor.submit(() -> {
34                // 生成 6 位的 UUID,将该 UUID 作为 Redis 的值。
35                // 注:设置个 UUID 随机值充当 Redis 存入的 Value 是为了保证,在分布式环境且存在多实例情况下,
36                // 进行加锁和解锁操作的都是相同的进程(同一个实例),这样能够避免该锁被别的进程(实例)执行解锁操作。
37                String value = UUID.randomUUID().toString().substring(0, 6);
38                // 获取分布式锁,设置锁超时时间为 10 秒
39                boolean execute = redisLock.lock("test", value, 10, TimeUnit.SECONDS);
40                // 如果获取锁成功,则执行对应逻辑
41                if (execute) {
42                    log.info("线程 {} - 获取分布式锁,执行对应逻辑", Thread.currentThread().getName());
43                    // 执行完成,释放分布式锁
44                    redisLock.unLock("test", value);
45                }
46            });
47        }
48    }
49
50    @GetMapping("/tryLock")
51    public void tryLockTest() {
52        for (int i = 0; i < 10; i++) {
53            executor.submit(() -> {
54                // 生成 6 位的 UUID,将该 UUID 作为 Redis 的值。
55                // 注:设置个 UUID 随机值充当 Redis 存入的 Value 是为了保证,在分布式环境且存在多实例情况下,
56                // 进行加锁和解锁操作的都是相同的进程(同一个实例),这样能够避免该锁被别的进程(实例)执行解锁操作。
57                String value = UUID.randomUUID().toString().substring(0, 6);
58                // 获取分布式锁,设置锁超时时间为 10 秒,尝试获取锁的时间为 2 秒(2秒内没有获取锁就会获取锁失败)
59                boolean execute = redisLock.tryLock("test", value, 10, TimeUnit.SECONDS, 2, TimeUnit.SECONDS);
60                // 如果获取锁成功,则执行对应逻辑
61                if (execute) {
62                    log.info("线程 {} - 获取分布式锁,执行对应逻辑", Thread.currentThread().getName());
63                    // 执行完成,释放分布式锁
64                    redisLock.unLock("test", value);
65                }
66            });
67        }
68    }
69
70}

注意:上面设置 UUID 作为 Redis 的值,主要是是为了保证加锁和解锁的进程都是同一个,避免在分布式多实例下,"实例2"加的锁被"实例1"给解开这种情况。如果 不设置 value 或者在 多实例 中都设置 相同值,那么可能发生下面情况,这里进行一下描述来加深理解。

一个应用存在多个实例,分别为实例1、实例2,然后执行过程中遇到如下情况:

  • 1、 实例1执行业务逻辑,获取分布式锁成功(设置 key=test、value=不设置、过期时间=10 秒),然后正常执行接下来的业务逻辑代码。
  • 2、 实例1执行时间超过了 10 秒还没有执行完成,由于锁超过了指定超时时间,Redis 自动对其进行了删除(即释放了锁)。
  • 3、 实例2执行业务逻辑,也执行和实例1相同的方法,因为实例1已经释放了锁,所以锁获取成功(key=test、value=不设置、过期时间=10 秒),然后也正常执行接下来的业务逻辑。
  • 4、 比如这时,实例1在又经过 2 秒时间后,终于执行完业务逻辑,然后执行释放锁(key=test、value=不设置),这时候相当于将未完成业务逻辑的实例2的锁给释放掉了。
  • 5、 由于实例2的锁被实例1给释放掉了,所以如果这时候再进来新的请求,也调用该业务方法,那么也能顺利拿到锁执行业务逻辑,这样导致锁失去它的作用了。

所以,分布式锁一定要保证,哪个进程加的锁就该由哪个进行进行锁的释放,这里是分布式多实例情况,所以在执行加锁逻辑时,一定要设置个 UUID 唯一值来充当锁的值,在解锁时候也带上该值,来保证加锁和解锁的是在同一实例中,从而避免上述情况发生。

5、SpringBoot 启动类

 1import org.springframework.boot.SpringApplication;
 2import org.springframework.boot.autoconfigure.SpringBootApplication;
 3
 4@SpringBootApplication
 5public class Application {
 6
 7    public static void main(String[] args) {
 8        SpringApplication.run(Application.class, args);
 9    }
10
11}

三、使用 Redis 工具 Redisson 实现分布式锁

       在上面例子中使用了 spring-data-redis 组件实现了分布式锁,不过在 Java 还有很多其它好用的 工具包能够直接操作 Redis,比如 JedisRedisson 等,这些工具对操作 Redis 进行了很多方法封装,非常好用,其中 Redis 官方比较推荐使用 Redission 这个工具,该工具对操作 Redis 进行很多封装,功能丰富,这里介绍下如何使用 Redisson 来实现 分布式锁

       这里说的 Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的 分布式Java 常用对象,还提供了许多分布式服务与使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

1、Maven 引入相关依赖

Maven 中引入需要的相关依赖,主要如下:

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4    <modelVersion>4.0.0</modelVersion>
 5    
 6    <parent>
 7        <groupId>org.springframework.boot</groupId>
 8        <artifactId>spring-boot-starter-parent</artifactId>
 9        <version>2.3.12.RELEASE</version>
10    </parent>
11
12    <groupId>mydlq.club</groupId>
13    <artifactId>springboot-redisson-lock-example</artifactId>
14    <version>0.0.1</version>
15    <name>springboot-redisson-lock-example</name>
16    <description>redisson distributed lock demo</description>
17
18    <properties>
19        <java.version>1.8</java.version>
20    </properties>
21
22    <dependencies>
23        <!--Lombok,业界常用基础组件,方便实现log方法与实体类get、set等-->
24        <dependency>
25            <groupId>org.projectlombok</groupId>
26            <artifactId>lombok</artifactId>
27            <optional>true</optional>
28        </dependency>
29        <!--SpringBoot-->
30        <dependency>
31            <groupId>org.springframework.boot</groupId>
32            <artifactId>spring-boot-starter-web</artifactId>
33        </dependency>
34        <dependency>
35            <groupId>org.springframework.boot</groupId>
36            <artifactId>spring-boot-configuration-processor</artifactId>
37            <optional>true</optional>
38        </dependency>
39        <!--Redisson-->
40        <dependency>
41            <groupId>org.redisson</groupId>
42            <artifactId>redisson-spring-boot-starter</artifactId>
43            <version>3.13.1</version>
44        </dependency>
45    </dependencies>
46
47    <build>
48        <plugins>
49            <plugin>
50                <groupId>org.springframework.boot</groupId>
51                <artifactId>spring-boot-maven-plugin</artifactId>
52            </plugin>
53        </plugins>
54    </build>
55
56</project>

2、创建 Redisson 配置文件

resources 文件夹里面创建一个 config 文件夹,里面创建一个名为 redisson-single.yamlRedisson 配置文件:

注意:下面配置中 address: redis://IP:Port 是必须配置的,还有就是如果 Redis 没有配置密码,则设置 password 参数为 null,不能为 ""

redisson-single.yaml

 1#单机模式
 2singleServerConfig:
 3  # 连接空闲超时,单位:毫秒
 4  idleConnectionTimeout: 10000
 5  # 连接超时,单位:毫秒
 6  connectTimeout: 10000
 7  # 命令等待超时,单位:毫秒
 8  timeout: 3000
 9  # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
10  # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
11  retryAttempts: 3
12  # 命令重试发送时间间隔,单位:毫秒
13  retryInterval: 1000
14  # 密码
15  password: null
16  # 单个连接最大订阅数量
17  subscriptionsPerConnection: 5
18  # 客户端名称
19  clientName: null
20  # 节点地址
21  address: redis://127.0.0.1:6379
22  # 发布和订阅连接的最小空闲连接数
23  subscriptionConnectionMinimumIdleSize: 1
24  # 发布和订阅连接池大小
25  subscriptionConnectionPoolSize: 50
26  # 最小空闲连接数
27  connectionMinimumIdleSize: 32
28  # 连接池大小
29  connectionPoolSize: 64
30  # 数据库编号
31  database: 1
32  # DNS监测时间间隔,单位:毫秒
33  dnsMonitoringInterval: 5000
34# 线程池数量,默认值: 当前处理核数量 * 2
35#threads: 0
36# Netty线程池数量,默认值: 当前处理核数量 * 2
37#nettyThreads: 0
38# 编码
39codec: !<org.redisson.codec.JsonJacksonCodec> {}
40# 传输模式
41transportMode : "NIO"

3、SpringBoot 配置文件引入 Redisson 配置文件

在 SpringBoot 的 application 配置文件中,设置 redisson 配置文件 的地址:

1### 载入 Redisson 配置文件
2spring:
3  redis:
4    redisson:
5      config: classpath:config/redisson-single.yaml

4、创建 Controller 类并使用分布式锁

创建一个用于 测试的 Controller 类 来提供测试接口,里面使用线程池来模拟多个线程调用,并且使用 Redisson 锁方法实现 分布式锁

 1import lombok.extern.slf4j.Slf4j;
 2import org.redisson.api.RLock;
 3import org.redisson.api.RedissonClient;
 4import org.springframework.beans.factory.annotation.Autowired;
 5import org.springframework.web.bind.annotation.GetMapping;
 6import org.springframework.web.bind.annotation.RestController;
 7import java.util.concurrent.ExecutorService;
 8import java.util.concurrent.Executors;
 9import java.util.concurrent.TimeUnit;
10
11@Slf4j
12@RestController
13public class LockController {
14
15    /** Redisson 对象 */
16    @Autowired
17    private RedissonClient redissonClient;
18
19    /** 线程池 */
20    ExecutorService executor = Executors.newFixedThreadPool(10);
21
22    @GetMapping("/lock")
23    public void lockTest() {
24        for (int i = 0; i < 1000; i++) {
25            executor.submit(() -> {
26                // 获取锁对象(可以为"可重入锁"、"公平锁",如果redis是集群模式,还可以使用"红锁")
27                //RLock lock = redissonClient.getFairLock("test");  //公平锁
28                RLock lock = redissonClient.getLock("test");     //可重入锁
29                try {
30                    // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
31                    boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
32                    // 如果获取锁成功,则执行对应逻辑
33                    if (res) {
34                        log.info("获取分布式锁,执行对应逻辑1");
35                        log.info("获取分布式锁,执行对应逻辑2");
36                        log.info("获取分布式锁,执行对应逻辑3");
37                    }
38                } catch (InterruptedException e) {
39                    log.error("", e);
40                    Thread.currentThread().interrupt();
41                } finally {
42                    lock.unlock();
43                }
44            });
45        }
46    }
47
48}

5、SpringBoot 启动类

 1import org.springframework.boot.SpringApplication;
 2import org.springframework.boot.autoconfigure.SpringBootApplication;
 3
 4@SpringBootApplication
 5public class Application {
 6
 7    public static void main(String[] args) {
 8        SpringApplication.run(Application.class, args);
 9    }
10
11}

四、使用 Zookeeper 实现分布式锁

1、Maven 引入相关依赖

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4    <modelVersion>4.0.0</modelVersion>
 5
 6    <parent>
 7        <groupId>org.springframework.boot</groupId>
 8        <artifactId>spring-boot-starter-parent</artifactId>
 9        <version>2.3.12.RELEASE</version>
10    </parent>
11
12    <groupId>mydlq.club</groupId>
13    <artifactId>springboot-zookeeper-lock-example</artifactId>
14    <version>0.0.1</version>
15    <name>springboot-zookeeper-lock-example</name>
16    <description>zookeeper distributed lock demo</description>
17
18    <properties>
19        <java.version>1.8</java.version>
20    </properties>
21
22    <dependencies>
23        <!-- SpringBoot -->
24        <dependency>
25            <groupId>org.springframework.boot</groupId>
26            <artifactId>spring-boot-starter-web</artifactId>
27        </dependency>
28        <dependency>
29            <groupId>org.springframework.boot</groupId>
30            <artifactId>spring-boot-configuration-processor</artifactId>
31            <optional>true</optional>
32        </dependency>
33        <!-- Lombok -->
34        <dependency>
35            <groupId>org.projectlombok</groupId>
36            <artifactId>lombok</artifactId>
37            <optional>true</optional>
38        </dependency>
39        <!-- Curator & Zookeeper -->
40        <dependency>
41            <groupId>org.apache.curator</groupId>
42            <artifactId>curator-recipes</artifactId>
43            <version>5.1.0</version>
44        </dependency>
45        <!-- 创建 Zookeeper 客户端依赖,一定要和 Zookeeper Server 版本保持一致 -->
46        <dependency>
47            <groupId>org.apache.zookeeper</groupId>
48            <artifactId>zookeeper</artifactId>
49            <version>3.4.14</version>
50            <exclusions>
51                <!--因为 zk 包使用的是 log4j 日志,和 springboot 的logback 日志冲突 -->
52                <exclusion>
53                    <groupId>org.slf4j</groupId>
54                    <artifactId>slf4j-log4j12</artifactId>
55                </exclusion>
56            </exclusions>
57        </dependency>
58    </dependencies>
59
60    <build>
61        <plugins>
62            <plugin>
63                <groupId>org.springframework.boot</groupId>
64                <artifactId>spring-boot-maven-plugin</artifactId>
65            </plugin>
66        </plugins>
67    </build>
68
69</project>

2、配置连接 Zookeeper 参数

在 SpringBoot 的 applicaiton 配置文件中,配置连接 Zookeeper 的参数,后面会配置一个读取配置的类来专门获取这些参数,配置如下:

1zookeeper:
2  address: 127.0.0.1:2181     #zookeeper Server地址,如果有多个,使用","隔离
3                              #例如 ip1:port1,ip2:port2,ip3:port3
4  retryCount: 5               #重试次数
5  elapsedTimeMs: 5000         #重试间隔时间
6  sessionTimeoutMs: 30000     #Session超时时间
7  connectionTimeoutMs: 10000  #连接超时时间

3、创建读取配置文件中参数的类

application 配置文件中读取用于连接 Zookeeper Server 的参数:

 1import lombok.Data;
 2import org.springframework.boot.context.properties.ConfigurationProperties;
 3import org.springframework.context.annotation.Configuration;
 4
 5@Data
 6@Configuration
 7@ConfigurationProperties(prefix = "zookeeper")
 8public class ZookeeperProperties {
 9
10    /** 重试次数 */
11    private int retryCount;
12
13    /** 重试间隔时间 */
14    private int elapsedTimeMs;
15
16    /**连接地址 */
17    private String address;
18
19    /**Session过期时间 */
20    private int sessionTimeoutMs;
21
22    /**连接超时时间 */
23    private int connectionTimeoutMs;
24
25}

4、创建连接 Zookeeper 的 Curator 客户端类

创建连接 ZookeeperCurator 客户端的配置类,并设置初始化时进行连接:

 1import org.apache.curator.retry.RetryNTimes;
 2import org.apache.curator.framework.CuratorFramework;
 3import org.springframework.context.annotation.Bean;
 4import org.springframework.context.annotation.Configuration;
 5import org.apache.curator.framework.CuratorFrameworkFactory;
 6
 7@Configuration
 8public class ZookeeperConfig {
 9
10    /**
11     * 创建 CuratorFramework 对象并连接 Zookeeper
12     *
13     * @param zookeeperProperties 从 Spring 容器载入 zookeeperProperties Bean 对象,读取连接 ZK 的参数
14     * @return CuratorFramework
15     */
16    @Bean(initMethod = "start")
17    public CuratorFramework curatorFramework(ZookeeperProperties zookeeperProperties) {
18        return CuratorFrameworkFactory.newClient(
19                zookeeperProperties.getAddress(),
20                zookeeperProperties.getSessionTimeoutMs(),
21                zookeeperProperties.getConnectionTimeoutMs(),
22                new RetryNTimes(zookeeperProperties.getRetryCount(),
23                        zookeeperProperties.getElapsedTimeMs()));
24    }
25
26}

5、创建分布式锁工具类

 1import lombok.extern.slf4j.Slf4j;
 2import org.apache.curator.framework.CuratorFramework;
 3import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 4import org.springframework.beans.factory.annotation.Autowired;
 5import org.springframework.stereotype.Component;
 6import java.util.concurrent.TimeUnit;
 7
 8@Slf4j
 9@Component
10public class LockUtil {
11
12    @Autowired
13    CuratorFramework curatorFramework;
14
15    /**
16     * 节点名称
17     */
18    public static final String NODE_PATH = "/lock-space/%s";
19
20    /**
21     * 尝试获取分布式锁
22     *
23     * @param key        分布式锁 key
24     * @param expireTime 超时时间
25     * @param timeUnit   时间单位
26     * @return 超时时间单位
27     */
28    public InterProcessMutex tryLock(String key, int expireTime, TimeUnit timeUnit) {
29        try {
30            InterProcessMutex mutex = new InterProcessMutex(curatorFramework, String.format(NODE_PATH, key));
31            boolean locked = mutex.acquire(expireTime, timeUnit);
32            if (locked) {
33                log.info("申请锁(" + key + ")成功");
34                return mutex;
35            }
36        } catch (Exception e) {
37            log.error("申请锁(" + key + ")失败,错误:{}", e);
38        }
39        log.warn("申请锁(" + key + ")失败");
40        return null;
41    }
42
43    /**
44     * 释放锁
45     *
46     * @param key          分布式锁 key
47     * @param lockInstance InterProcessMutex 实例
48     */
49    public void unLock(String key, InterProcessMutex lockInstance) {
50        try {
51            lockInstance.release();
52            log.info("解锁(" + key + ")成功");
53        } catch (Exception e) {
54            log.error("解锁(" + key + ")失败!");
55        }
56    }
57
58}

6、创建 Controller 类并使用分布式锁

创建一个用于 测试的 Controller 类 来提供测试接口,里面使用线程池来模拟多个线程调用,并且使用 Curator 锁方法实现 分布式锁

 1import lombok.extern.slf4j.Slf4j;
 2import mydlq.club.lock.utils.LockUtil;
 3import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 4import org.springframework.beans.factory.annotation.Autowired;
 5import org.springframework.web.bind.annotation.GetMapping;
 6import org.springframework.web.bind.annotation.RestController;
 7import java.util.concurrent.ExecutorService;
 8import java.util.concurrent.Executors;
 9import java.util.concurrent.TimeUnit;
10
11@Slf4j
12@RestController
13public class LockController {
14
15    /**
16     * curatorFramework对象
17     */
18    @Autowired
19    private LockUtil lockUtil;
20
21    /**
22     * 线程池
23     */
24    ExecutorService executor = Executors.newFixedThreadPool(10);
25
26    @GetMapping("/lock")
27    public void lockTest() {
28        for (int i = 0; i < 1000; i++) {
29            executor.submit(() -> {
30                try {
31                    String key = "test";
32                    // 获取锁
33                    InterProcessMutex lock = lockUtil.tryLock(key, 10, TimeUnit.SECONDS);
34                    if (lock != null) {
35                        // 如果获取锁成功,则执行对应逻辑
36                        log.info("获取分布式锁,执行对应逻辑1");
37                        log.info("获取分布式锁,执行对应逻辑2");
38                        log.info("获取分布式锁,执行对应逻辑3");
39                        // 释放锁
40                        lockUtil.unLock(key, lock);
41                    }
42                } catch (Exception e) {
43                    log.error("", e);
44                }
45            });
46        }
47    }
48
49}

7、SpringBoot 启动类

 1import org.springframework.boot.SpringApplication;
 2import org.springframework.boot.autoconfigure.SpringBootApplication;
 3
 4@SpringBootApplication
 5public class Application {
 6
 7    public static void main(String[] args) {
 8        SpringApplication.run(Application.class, args);
 9    }
10
11}

四、其它

1、Redisson 连接 Redis 主从、哨兵、集群的配置文件

主从配置文件 redisson-master-slave.yaml

 1masterSlaveServersConfig:
 2  idleConnectionTimeout: 10000
 3  pingTimeout: 1000
 4  connectTimeout: 10000
 5  timeout: 3000
 6  retryAttempts: 3
 7  retryInterval: 1500
 8  reconnectionTimeout: 3000
 9  failedAttempts: 3
10  password: null
11  subscriptionsPerConnection: 5
12  clientName: null
13  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
14  slaveSubscriptionConnectionMinimumIdleSize: 1
15  slaveSubscriptionConnectionPoolSize: 50
16  slaveConnectionMinimumIdleSize: 32
17  slaveConnectionPoolSize: 64
18  masterConnectionMinimumIdleSize: 32
19  masterConnectionPoolSize: 64
20  readMode: "SLAVE"
21  slaveAddresses:
22    - "redis://127.0.0.1:6381"
23    - "redis://127.0.0.1:6380"
24  masterAddress: "redis://127.0.0.1:6379"
25  database: 0
26threads: 0
27nettyThreads: 0
28codec: !<org.redisson.codec.JsonJacksonCodec> {}
29"transportMode":"NIO"

哨兵配置文件 redisson-sentinel.yaml

 1sentinelServersConfig:
 2  idleConnectionTimeout: 10000
 3  pingTimeout: 1000
 4  connectTimeout: 10000
 5  timeout: 3000
 6  retryAttempts: 3
 7  retryInterval: 1500
 8  reconnectionTimeout: 3000
 9  failedAttempts: 3
10  password: null
11  subscriptionsPerConnection: 5
12  clientName: null
13  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
14  slaveSubscriptionConnectionMinimumIdleSize: 1
15  slaveSubscriptionConnectionPoolSize: 50
16  slaveConnectionMinimumIdleSize: 32
17  slaveConnectionPoolSize: 64
18  masterConnectionMinimumIdleSize: 32
19  masterConnectionPoolSize: 64
20  readMode: "SLAVE"
21  sentinelAddresses:
22    - "redis://127.0.0.1:7001"
23    - "redis://127.0.0.1:7002"
24  masterName: "mymaster"
25  database: 0
26threads: 0
27nettyThreads: 0
28codec: !<org.redisson.codec.JsonJacksonCodec> {}
29"transportMode":"NIO"

集群配置文件 redisson-cluster.yaml

 1clusterServersConfig:
 2  idleConnectionTimeout: 10000
 3  pingTimeout: 1000
 4  connectTimeout: 10000
 5  timeout: 3000
 6  retryAttempts: 3
 7  retryInterval: 1500
 8  reconnectionTimeout: 3000
 9  failedAttempts: 3
10  password: null
11  subscriptionsPerConnection: 5
12  clientName: null
13  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
14  slaveSubscriptionConnectionMinimumIdleSize: 1
15  slaveSubscriptionConnectionPoolSize: 50
16  slaveConnectionMinimumIdleSize: 32
17  slaveConnectionPoolSize: 64
18  masterConnectionMinimumIdleSize: 32
19  masterConnectionPoolSize: 64
20  readMode: "SLAVE"
21  nodeAddresses:
22    - "redis://127.0.0.1:7001"
23    - "redis://127.0.0.1:7002"
24    - "redis://127.0.0.1:7003"
25  scanInterval: 1000
26threads: 0
27nettyThreads: 0
28codec: !<org.redisson.codec.JsonJacksonCodec> {}
29"transportMode":"NIO"

---END---


  !版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。