Redis分布式锁 分布式锁,是一种思想,它的实现方式有很多。比如,我们将沙滩当做分布式锁的组件,那么它看起来应该是这样的:
在沙滩上踩一脚,留下自己的脚印,就对应了加锁操作。其他进程或者线程,看到沙滩上已经有脚印,证明锁已被别人持有,则等待。
把脚印从沙滩上抹去,就是解锁的过程。
为了避免死锁,我们可以设置一阵风,在单位时间后刮起,将脚印自动抹去。
分布式锁的实现有很多,比如基于数据库、memcached、Redis、系统文件、zookeeper等。它们的核心的理念跟上面的过程大致相同。
Jedis 1、加锁 加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间。
1 SET lock_key random_value NX PX 5000
值得注意的是: random_value
是客户端生成的唯一的字符串。 NX
代表只在键不存在时,才对键进行设置操作。 PX 5000
设置键的过期时间为5000毫秒。
这样,如果上面的命令执行成功,则证明客户端获取到了锁。
2、解锁 解锁的过程就是将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉。这时候random_value
的作用就体现出来。
为了保证解锁操作的原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功。
1 2 3 4 5 if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
3、实现 首先,我们在pom文件中,引入Jedis。在这里,笔者用的是最新版本,注意由于版本的不同,API可能有所差异。
1 2 3 4 5 6 7 8 9 10 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.0.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.3.3.RELEASE</version> </dependency>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 server: port: 8100 spring: redis: port: 6379 host: 192.168 .2 .100 database: 6 jedis: pool: max-active: 8 max-wait: -1 max-idle: 8 timeout: 0
加锁的过程很简单,就是通过SET指令来设置值,成功则返回;否则就循环等待,在timeout时间内仍未获取到锁,则获取失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Service public class RedisLock { Logger logger = LoggerFactory.getLogger(this .getClass()); private String lock_key = "redis_lock" ; protected long internalLockLeaseTime = 30000 ; private long timeout = 999999 ; SetParams params = SetParams.setParams().nx().px(internalLockLeaseTime); @Autowired JedisPool jedisPool; public boolean lock (String id) { Jedis jedis = jedisPool.getResource(); Long start = System.currentTimeMillis(); try { for (;;){ String lock = jedis.set(lock_key, id, params); if ("OK" .equals(lock)){ return true ; } long l = System.currentTimeMillis() - start; if (l>=timeout) { return false ; } try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } }finally { jedis.close(); } } public boolean unlock (String id) { Jedis jedis = jedisPool.getResource(); String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else" + " return 0 " + "end" ; try { Object result = jedis.eval(script, Collections.singletonList(lock_key), Collections.singletonList(id)); if ("1" .equals(result.toString())){ return true ; } return false ; }finally { jedis.close(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Controller public class IndexController { @Autowired RedisLock redisLock; int count = 0 ; @RequestMapping("/index") @ResponseBody public String index () throws InterruptedException { int clientcount =1000 ; CountDownLatch countDownLatch = new CountDownLatch(clientcount); ExecutorService executorService = Executors.newFixedThreadPool(clientcount); long start = System.currentTimeMillis(); for (int i = 0 ;i<clientcount;i++){ executorService.execute(() -> { String id = IdUtil.getId(); try { redisLock.lock(id); count++; }finally { redisLock.unlock(id); } countDownLatch.countDown(); }); } countDownLatch.await(); long end = System.currentTimeMillis(); logger.info("执行线程数:{},总耗时:{},count数为:{}" ,clientcount,end-start,count); return "Hello" ; } }
至此,单节点Redis的分布式锁的实现就已经完成了。比较简单,但是问题也比较大,最重要的一点是,锁不具有可重入性。
Redission Redisson实现分布式锁的原理 前言: 现在面试,一般都会聊聊分布式系统这块的东西。通常面试官都会从服务框架(Spring Cloud、Dubbo)聊起,一路聊到分布式事务、分布式锁、ZooKeeper等知识。具体的来看看Redis分布式锁的实现原理。
如果在公司里落地生产环境用分布式锁的时候,一定是会用开源类库的,比如Redis分布式锁,一般就是用Redisson 框架就好了,非常的简便易用。
可以去看看Redisson的官网,看看如何在项目中引入Redisson的依赖,然后基于Redis实现分布式锁的加锁与释放锁。
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.10.1</version > </dependency >
简单的使用代码片段 1 2 3 RLock lock = redission.getLock("myLock"); lock.lock();#加锁 lock.unlock();#解锁
还支持redis单实例、redis哨兵、redis cluster、redis master-slave等各种部署架构,都可以完美实现。
Redisson实现分布式锁的底层原理
1.加锁机制 现在某个客户端要加锁。如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。
这里注意 ,仅仅只是选择一台机器!这点很关键!
紧接着,就会发送一段lua脚本到redis上,那段lua脚本如下所示:
为啥要用lua脚本呢?
因为一大坨复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性 。
那么,这段lua脚本是什么意思呢?
**KEYS[1]**代表的是你加锁的那个key,比如说:
RLock lock = redisson.getLock(“myLock”);
这里你自己设置了加锁的那个锁key就是“myLock”。
**ARGV[1]**代表的就是锁key的默认生存时间,默认30秒。
**ARGV[2]**代表的是加锁的客户端的ID,类似于下面这样:
8743c9c0-0795-4907-87fd-6c719a6b4586:1
第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。
如何加锁呢?很简单,用下面的命令:
hset myLock
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:
上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。
接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。
2.锁互斥机制 那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?
很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。
接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。
所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。 比如还剩15000毫秒的生存时间。
此时客户端2会进入一个while循环,不停的尝试加锁
3.watch dog自动延期机制 客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?
简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下 ,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
4.可重入加锁机制 那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?
这时我们来分析一下上面那段lua脚本。
第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。
第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此时就会执行可重入加锁的逻辑,他会用:
incrby myLock
8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
通过这个命令,对客户端1的加锁次数,累加1。
此时myLock数据结构变为下面这样:
1 2 3 myLock { 8743c9c0-0795-4907-87fd-6c719a6b4586:1 2 }
那个myLock的hash数据结构中的那个客户端ID,就对应着加锁的次数
5.释放锁机制 如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。
如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
“del myLock”命令,从redis里删除这个key。
然后呢,另外的客户端2就可以尝试完成加锁了。
这就是所谓的分布式锁的开源Redisson框架的实现机制。
一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。
6.上述Redis分布式锁的缺点 其实上面那种方案最大的问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。
但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。
接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。
此时就会导致多个客户端对一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生 。
所以这个就是redis cluster,或者是redis master-slave架构的主从异步复制 导致的redis分布式锁的最大缺陷:在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。
代码演示 1.首先,通过配置获取RedissonClient客户端的实例,然后getLock
获取锁的实例,进行操作即可。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); config.useSingleServer().setPassword("redis1234"); final RedissonClient client = Redisson.create(config); RLock lock = client.getLock("lock1"); try{ lock.lock(); }finally{ lock.unlock(); } }
2、获取锁实例 我们先来看RLock lock = client.getLock("lock1");
这句代码就是为了获取锁的实例,然后我们可以看到它返回的是一个RedissonLock
对象。
1 2 3 public RLock getLock(String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name); }
在RedissonLock
构造方法中,主要初始化一些属性。
1 2 3 4 5 6 7 8 9 10 11 public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); //命令执行器 this.commandExecutor = commandExecutor; //UUID字符串 this.id = commandExecutor.getConnectionManager().getId(); //内部锁过期时间 this.internalLockLeaseTime = commandExecutor. getConnectionManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; }
3、加锁 当我们调用lock
方法,定位到lockInterruptibly
。在这里,完成了加锁的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { //当前线程ID long threadId = Thread.currentThread().getId(); //尝试获取锁 Long ttl = tryAcquire(leaseTime, unit, threadId); // 如果ttl为空,则证明获取锁成功 if (ttl == null) { return; } //如果获取锁失败,则订阅到对应这个锁的channel RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { //再次尝试获取锁 ttl = tryAcquire(leaseTime, unit, threadId); //ttl为空,说明成功获取锁,返回 if (ttl == null) { break; } //ttl大于0 则等待ttl时间后继续尝试获取 if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { //取消对channel的订阅 unsubscribe(future, threadId); } //get(lockAsync(leaseTime, unit)); }
如上代码,就是加锁的全过程。先调用tryAcquire
来获取锁,如果返回值ttl为空,则证明加锁成功,返回;如果不为空,则证明加锁失败。这时候,它会订阅这个锁的Channel,等待锁释放的消息,然后重新尝试获取锁。流程如下:
获取锁 获取锁的过程是怎样的呢?接下来就要看tryAcquire
方法。在这里,它有两种处理方式,一种是带有过期时间的锁,一种是不带过期时间的锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { //如果带有过期时间,则按照普通方式获取锁 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //先按照30秒的过期时间来执行获取锁的方法 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync( commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间 ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
接着往下看,tryLockInnerAsync
方法是真正执行获取锁的逻辑,它是一段LUA脚本代码。在这里,它使用的是hash数据结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit , long threadId, RedisStrictCommand<T> command ) { internalLockLeaseTime = unit .toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
这段LUA代码看起来并不复杂,有三个判断:
通过exists判断,如果锁不存在,则设置值和过期时间,加锁成功
通过hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功
如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败
加锁成功后,在redis的内存数据中,就有一条hash结构的数据。Key为锁的名称;field为随机字符串+线程ID;值为1。如果同一线程多次调用lock
方法,值递增1。
1 2 3 127.0.0.1:6379> hgetall lock1 1) "b5ae0be4-5623-45a5-8faa-ab7eb167ce87:1" 2) "1"
4、解锁 我们通过调用unlock
方法来解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public RFuture<Void> unlockAsync(final long threadId) { final RPromise<Void> result = new RedissonPromise<Void>(); //解锁方法 RFuture<Boolean> future = unlockInnerAsync(threadId); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { if (!future.isSuccess()) { cancelExpirationRenewal(threadId); result.tryFailure(future.cause()); return; } //获取返回值 Boolean opStatus = future.getNow(); //如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException(" attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } //解锁成功,取消刷新过期时间的那个定时任务 if (opStatus) { cancelExpirationRenewal(null); } result.trySuccess(null); } }); return result; }
然后我们再看unlockInnerAsync
方法。这里也是一段LUA脚本代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 protected RFuture<Boolean> unlockInnerAsync (long threadId ) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
如上代码,就是释放锁的逻辑。同样的,它也是有三个判断:
如果锁已经不存在,通过publish发布锁释放的消息,解锁成功
如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常
通过hincrby递减1,先释放一次锁。若剩余次数还大于0,则证明当前锁是重入锁,刷新过期时间;若剩余次数小于0,删除key并发布锁释放的消息,解锁成功
至此,Redisson中的可重入锁的逻辑,就分析完了。但值得注意的是,上面的两种实现方式都是针对单机Redis实例而进行的。如果我们有多个Redis实例,请参阅Redlock算法 。该算法的具体内容,请参考http://redis.cn/topics/distlock.html