Redis实现分布式锁

 

Redis的分布式锁, 大家称呼为”占坑”. 一个指定的坑, 只能一个人占, 其他人来了, 只能等这个人完事了, 再进来.

我认为拿数据库来实现分布式锁的思路都差不多, 多个服务端服务器同时向”存锁的数据服务器”发出锁请求, 有且只有一个服务器能成功存入自己的锁信息, 其他服务器获取锁失败.

请求锁流程

Redis锁

首先, 我们要考虑如果服务端服务器挂了, 比如请求锁成功之后, 获得锁的客户端服务器挂了, 这个时候没有缩放锁. 那么就会造成死锁. 这个时候有一种很方便的处理方式, 给Redis锁设置一个时间, expire key time. 锁自动消除.

好的, 那我们就这样实现一下.

锁接口

package com.redis.lock;

public interface Lock {
    boolean lock(String key);
    boolean unlock(String key);
}

实现锁接口的具体锁.

这里用的Jedis的2.9.0的版本, 好像版本如果搞了, 是没有public void set(String key, String value,String nxxx,String expx,int time)这个方法的. 这里让我搞了好久.如果没有这个方法, 那就意味着, 我们要先set, 然后在expire设置过期时间.

这个方法如果里面已经存在该Key, 那么就会返回Null.

jedis.del(key) 如果成功删除了某个值, 就会返回大于0, 如果没有该key, 或者删除失败就会返回0

package com.redis.lock;

import redis.clients.jedis.Jedis;

public class RedisLock implements Lock {
    private Jedis jedis;

    public RedisLock(Jedis jedis) {
        this.jedis = jedis;
    }

    public boolean lock(String key) {
        return jedis.set(key, "", "nx", "ex", 5L) != null;
    }

    public boolean unlock(String key) {
        return jedis.del(key) > 0;
    }
}

好了, 一个简略的Redis分布式锁, 其实我们这样就算已经完成了.

现在我们来试一下, 我们通过开启很多线程模仿客户端去先获取一个对象中的值, 再去累加这个值, 然后让这些客户端尝试并行去获取锁, 然后获取锁的客户端才能进行累加. 这样,通过分布式锁, 并行将变为串行. 为了模仿操作的复杂性, 我们通过Thread.sleep(1)中间休眠一秒客户端.

下面是具体的执行类

class Count {
    private int num;

    public int getNum(){
        return this.num;
    }

    public void setNum(int target){
        this.num = target;
    }
}

不加分布式锁的客户端.

class NormalClient extends Thread{

    private Count count;

    public NormalClient(Count count) {
        this.count = count;
    }

    @Override
    public void run() {
        int num = count.getNum();
        try {
            Thread.sleep(1);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        count.setNum(num + 1);
        System.out.println(count.getNum());
    }
}

加锁的客户端

class RedisLockClient extends Thread{
    private Lock redisLock;
    private final String key ="只能串行执行";
    private Count count;


    public RedisLockClient(Count count) {
        this.count = count;
        this.redisLock = new RedisLock(new Jedis("47.93.50.200"));
    }

    @Override
    public void run() {
        boolean isLock = redisLock.lock(key);
        while (!isLock){
            isLock = redisLock.lock(key);
        }
        try {
            System.out.println(getName() + " client do something ing!");
            int num = count.getNum();
            Thread.sleep(1);
            count.setNum(num + 1);
            System.out.println(count.getNum());
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            redisLock.unlock(key);
        }
    }
}

主方法.

public class Main {
    public static void main(String[] args) throws Exception{
        Count count = new Count();
        funb(100,count);
		//休眠等待100个客户端处理完毕.  (这里可以使用CountDownLatch  or join()方法. 但尽量简单易懂就不添加这些东西了)
        Thread.sleep(10000);
        System.out.println("最终 : " + count.getNum());
    }

	//运行加锁的客户端
    private static void func(int nThread, Count count){
        for (int i = 0; i < nThread; i++){
            new RedisLockClient(count).start();
        }
    }

	//运行不加锁的客户端
    private static void funb(int nThread, Count count){
        for (int i = 0; i < nThread; i++) {
            new NormalClient(count).start();
        }
    }
}

先运行不加锁的客户端看一下情况.

没加锁的情况.gif

惨不忍睹. 加到6就完事了.

加上分布式锁运行情况.

分布式锁运行结果.gif

串行执行Nice. 结果完全正确.

(我简化了释放锁的操作, 其实如果释放锁失败, 比如网络出现问题, 是可以进行回滚的)

其实呢, 这样还是有缺陷的, 万一这个Redis挂了呢? 这就要自动切换到从库Redis.

但是Redis的主从同步并不是时刻都一致的.

比如碰到了这种情况, 客户端服务器发出锁请求, 请求锁成功, 但是成功之后, 主Redis挂了, 这个时候如果没有其他服务端来请求是不会出现问题的, 但是主Redis挂之后, 其他服务器立马去请求从Redis, 但是从Redis目前是没有锁记录的, 所以此时, 这个新来的请求者也会请求锁成功. 这就会造成, 前一个服务器还有处理完成, 后一个服务器已经开始了处理, 分布式锁被破坏了.

这个时候怎么办呢?

说一下我的想法.

1.请求加锁和释放锁的同时, 对与主从Redis服务器同时请求锁操作. 同时成功才算成功.(其实这个有点像主动帮助AOF进行同步)

2.在服务器感知主Redis已经挂了之后, 切换从Redis的时候, 等待5秒(等待一个锁过期的时间), 再进行操作.

其实我还有其他想法, 但是我觉得这两个是比较简单且有效的. 就说了这两个.

当然, 以上谈的都是比较极端的情况, 大部分业务处理应该不需要这么严格, 但是如果需要, 就要仔细思考去解决它.