Redis的分布式锁, 大家称呼为”占坑”. 一个指定的坑, 只能一个人占, 其他人来了, 只能等这个人完事了, 再进来.
我认为拿数据库来实现分布式锁的思路都差不多, 多个服务端服务器同时向”存锁的数据服务器”发出锁请求, 有且只有一个服务器能成功存入自己的锁信息, 其他服务器获取锁失败.
请求锁流程
首先, 我们要考虑如果服务端服务器挂了, 比如请求锁成功之后, 获得锁的客户端服务器挂了, 这个时候没有缩放锁. 那么就会造成死锁. 这个时候有一种很方便的处理方式, 给Redis锁设置一个时间, expire key time
. 锁自动消除.
好的, 那我们就这样实现一下.
锁接口
package com.redis.lock;
public interface Lock {
boolean lock(String key);
boolean unlock(String key);
}
实现锁接口的具体锁.
这里用的Jedis的2.9.0
的版本, 好像版本如果搞了, 是没有public void set(String key, String value,String nxxx,String expx,int time)
这个方法的. 这里让我搞了好久.如果没有这个方法, 那就意味着, 我们要先set
, 然后在expire
设置过期时间.
这个方法如果里面已经存在该Key, 那么就会返回Null.
jedis.del(key)
如果成功删除了某个值, 就会返回大于0, 如果没有该key, 或者删除失败就会返回0
package com.redis.lock;
import redis.clients.jedis.Jedis;
public class RedisLock implements Lock {
private Jedis jedis;
public RedisLock(Jedis jedis) {
this.jedis = jedis;
}
public boolean lock(String key) {
return jedis.set(key, "", "nx", "ex", 5L) != null;
}
public boolean unlock(String key) {
return jedis.del(key) > 0;
}
}
好了, 一个简略的Redis分布式锁, 其实我们这样就算已经完成了.
现在我们来试一下, 我们通过开启很多线程模仿客户端去先获取一个对象中的值, 再去累加这个值, 然后让这些客户端尝试并行去获取锁, 然后获取锁的客户端才能进行累加. 这样,通过分布式锁, 并行将变为串行. 为了模仿操作的复杂性, 我们通过Thread.sleep(1)中间休眠一秒客户端.
下面是具体的执行类
class Count {
private int num;
public int getNum(){
return this.num;
}
public void setNum(int target){
this.num = target;
}
}
不加分布式锁的客户端.
class NormalClient extends Thread{
private Count count;
public NormalClient(Count count) {
this.count = count;
}
@Override
public void run() {
int num = count.getNum();
try {
Thread.sleep(1);
}catch (InterruptedException e){
e.printStackTrace();
}
count.setNum(num + 1);
System.out.println(count.getNum());
}
}
加锁的客户端
class RedisLockClient extends Thread{
private Lock redisLock;
private final String key ="只能串行执行";
private Count count;
public RedisLockClient(Count count) {
this.count = count;
this.redisLock = new RedisLock(new Jedis("47.93.50.200"));
}
@Override
public void run() {
boolean isLock = redisLock.lock(key);
while (!isLock){
isLock = redisLock.lock(key);
}
try {
System.out.println(getName() + " client do something ing!");
int num = count.getNum();
Thread.sleep(1);
count.setNum(num + 1);
System.out.println(count.getNum());
}catch (InterruptedException e){
e.printStackTrace();
}finally {
redisLock.unlock(key);
}
}
}
主方法.
public class Main {
public static void main(String[] args) throws Exception{
Count count = new Count();
funb(100,count);
//休眠等待100个客户端处理完毕. (这里可以使用CountDownLatch or join()方法. 但尽量简单易懂就不添加这些东西了)
Thread.sleep(10000);
System.out.println("最终 : " + count.getNum());
}
//运行加锁的客户端
private static void func(int nThread, Count count){
for (int i = 0; i < nThread; i++){
new RedisLockClient(count).start();
}
}
//运行不加锁的客户端
private static void funb(int nThread, Count count){
for (int i = 0; i < nThread; i++) {
new NormalClient(count).start();
}
}
}
先运行不加锁的客户端看一下情况.
惨不忍睹. 加到6就完事了.
加上分布式锁运行情况.
串行执行Nice. 结果完全正确.
(我简化了释放锁的操作, 其实如果释放锁失败, 比如网络出现问题, 是可以进行回滚的)
其实呢, 这样还是有缺陷的, 万一这个Redis挂了呢? 这就要自动切换到从库Redis.
但是Redis的主从同步并不是时刻都一致的.
比如碰到了这种情况, 客户端服务器发出锁请求, 请求锁成功, 但是成功之后, 主Redis挂了, 这个时候如果没有其他服务端来请求是不会出现问题的, 但是主Redis挂之后, 其他服务器立马去请求从Redis, 但是从Redis目前是没有锁记录的, 所以此时, 这个新来的请求者也会请求锁成功. 这就会造成, 前一个服务器还有处理完成, 后一个服务器已经开始了处理, 分布式锁被破坏了.
这个时候怎么办呢?
说一下我的想法.
1.请求加锁和释放锁的同时, 对与主从Redis服务器同时请求锁操作. 同时成功才算成功.(其实这个有点像主动帮助AOF进行同步)
2.在服务器感知主Redis已经挂了之后, 切换从Redis的时候, 等待5秒(等待一个锁过期的时间), 再进行操作.
其实我还有其他想法, 但是我觉得这两个是比较简单且有效的. 就说了这两个.
当然, 以上谈的都是比较极端的情况, 大部分业务处理应该不需要这么严格, 但是如果需要, 就要仔细思考去解决它.