Java-分布式锁
2023-04-27 09:25:24
全家桶分布式锁
线程安全有两种方式:ReentrantLock、synchronized
Mysql解决并发性问题1、jvm本地锁(ReentrantLock、synchronized) 600三种情况导致锁失效- 多例模式(解决方案:service业务使用单例)
- 事务 (解决方案:可以使用提交阅读,READ_UNCOMMITTED)
- 集群部署(解决方案:使用增删时自动锁定,即使用sql语句)
解决方案:三种情况下锁失效的问题
问题:- 锁的范围问题(表级锁和行级锁)where中的过滤条件列,如果使用索引,锁行,不能使用索引,锁表。(详见悲观锁)
- 多个记录相同的商品
- 库存前后的状态无法记录
- 查询或更新锁的条件必须是索引字段
- 查询或更新的条件必须是准确的具体值(否则会变成表级锁)
- 性能问题
- 对于死锁问题,对于多个数据加锁,操作对象加锁顺序应一致。
- 库存操作要统一:select ...for update 与普通selectt一起使用...一个没有锁。
- 高并发性能极低
- ABA问题
- 读写分离导致乐观锁不可靠,因为从IO过多,网络延迟。
性能:sql>悲观锁>jvm锁>乐观锁
如果追求终极性能,业务场景简单,不需要记录数据前后的变化。 优先:sql。
如果并发量低(多读),竞争不是很激烈。优先:乐观锁
如果并发量高,通常会发生冲突。此时,如果选择乐观锁,将导致业务代码不断重试。优先级:mysql悲观锁
jvm本地锁不推荐
乐观锁1redis、JVM本地锁机制(集群不可用)2、乐观锁rediswatch:如果在事务中,可以监控一个或多个key值(exec)执行前,如果key的值发生变化,事务执行将被取消
multi:开始一个事务;
exec:执行一项事务;
问题:性能极低 400 不推荐3、分布式锁 跨进程 跨服务 跨服务器场景:超卖(NoSQL)、缓存击穿
实现分布式锁的方法1、基于redis的实现特点:1、独家排他使用 setnx
2、防死锁的发生
如果redis客户端程序从redis服务中获得锁后立即停机,产生死锁。
解决方案:为锁增加过期时间。
不能重新进入:确保可重新进入解决死锁(A方法中调B方法是同一锁,产生死锁)
3、原子性
在获取锁和过期时间之间实现原子性:set key value ex 3 nx
在判断和释放锁之间实现原子性:lua脚本
4、防误删除:一个请求删除另一个锁(提供uuid)
先判断,再删除
5、确保可重入性 hash+lua脚本实现可重入性
6、自动续期 :判断它是否是你自己的锁,如果你发现它是你自己的锁过期时间
7、在集群情况下,机制失效:
- 客户端程序10010,从主中获取锁
- 还没来得急同步数据,主挂了
- 所以主要是从升级开始
- 客户端程序10086从新主中获得锁,导致锁机制失效
1、应用程序获取系统当前时间
2、应用程序使用相同的kv值从多个redis实例中依次获取锁。如果某个节点在一定时间以上仍未获得锁,则直接放弃。尽快从下一个健康的redis节点获得锁,以避免被停机节点堵塞。
3、计算获取锁的消耗时间=客户端程序系统当前时间-step1中的时间。获取锁的消耗时间小于总锁定时间(30s),超过一半的节点成功获取锁,认为获取锁是成功的
4、剩余的锁定时间=总锁定时间-获取锁的消耗时间
5、如果获取锁失败,将锁释放到所有节点。
Rentrantlock底层原理
Reentrantlock可以重新加入加锁过程.lock() -> NonfairSync.lock()->AQS.acquire(1)->NonfairSync.tryAcquire(1)->Sync.nonfairTryAcquire(1)1、CAS获取锁,如果没有线程占用锁(state == 0) state设置为1,锁定成功,记录当前线程为锁定线程(两次)、如果state的值不是0,则表示锁已被占用,则判断当前线程是否有锁线程,如果是,则重新进入(state+1)3、加锁失败,入队等待Reentrantlock,可以重新加入解锁过程.unlock()->AQS.release()->Sync.tryRelease(1)1、判断当前线程是否有锁定线程,否则抛出异常2、判断state的值在state值减1后是否为0,为0则解锁成功,返回true3、如果减1后的值不为0,则返回false参照Reeentrantlock中的不公平可重入锁实现分布式可重入锁:Hash+Lua脚本
参考Reeetrantlock中的不公平可重入锁,实现redis分布式可重入锁:hash+lua脚本
加锁:
1、判断锁是否存在(exists),直接获得锁hset/hexists key field value
2、如果锁存在,判断它是否是你自己的锁(hexists),如果是自己的锁,重新进入,增加一个(hincrby key field increment)
3、如果锁不是你自己的锁,否则再试一次:递归和循环
if redis.call('exists','lock') == 0then redis.call('hset','lock',uuid,1)redis.call('expire','lock',30)return 1elseif redis.call('hexists','lock',uuid) == 1thenredis.call('hincrby','lock',uuid,1)redis.call('expire','lock',30)return 1elsereturn 0end写法2if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1then redis.call('hincrby',KEYS[1],ARGV[1],1)redis.call('expire',KEYS[1],ARGV[2])return 1elsereturn 0endkeys(静态):lock,argv(动态):uuid,30if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end
解锁:
1、判断你的锁是否存在,如果不存在,返回nil
2、如果你的锁存在,减少1(hincrby -1)判断-1后的值是否为0,0后释放锁(del)返回1
3、不为0则返回0
if redis.call('hexists',KEYS[1],ARGV[1]) == 0then return nil;elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0thenreturnn redis.call('del',KEYS[1])elsereturn 0endif redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincyby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end
自动续期
定时任务(时间驱动)+lua脚本+
lua脚本一次向redis发送多个指令,redis单线程执行遵守onedis-by-one规则
script:lua脚本字符串
numkeys:key列表中的元素数量
key列表:空格分割 从1开始
arg列表:空格分割 从1开始
变量:
全局变量:a = 1
局部变量:local a = 2;
分支控制
if 条件
then
代码块
else
代码块
end
if redis.call('get','lock') == uuidthen return redis.call('del','lock')elsereturn 0end
操作
- 加锁,setnx
- 重试,递归
- 解锁,del
Redison是基于Redis实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅为Java提供了一系列常见的分布式对象,还提供了许多分布式服务。Redison提供了使用Redis最简单、最方便的方法。Redison的目的是促进用户对Redis的关注分离,使用户能够更集中精力处理业务逻辑。
Redisson和jedis和letuce一样是redis客户端,但Redisson功能更强大。
官方文档:https://www.tulingxueyuan.cn/d/file/p/20230427/1qochqm32go class='language-plain'><!-- 以后使用redisson作为所有分布式锁、分布式对象等功能框架--><dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.0</version></dependency>
配置redisson,程序化的配置方法是通过构建config对象实例来实现
package com.example.distribyted.lock.config;import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RedissonConfig { @Bean public RedissonClient redissonClient(){ Config config = new Config(); config .useSingleServer()///设置redis模式(单机、集群) .setAddress("redis://127.0.0.1:6379");///// .setDatabase设置redis数据库编号//// .setUsername().setPassword()设置用户名和密码//// .setConnectionMinimumIdleSize(10);///连接池最小空闲线程数/// .setConnectionPoolSize(50);///// .setIdleConnectionTimeout(60000);//线程超时时间/// .setConnectTimeout();////客户端程序获得redis连接的超时间/// .setTimeout();//超时响应 RedissonClient redissonClient = Redisson.create(); return redissonClient; }}
可重入锁
@ResponseBody@GetMapping("/hello")public String hello(){ //获得一把锁 RLock lock = redissonClient.getLock("my-lock"); //加锁 lock.lock(); ///锁的自动续期,如果业务执行时间过长,锁续期会在运行期间自动延长30秒,不用担心业务时间长,锁会自动过期 ///只要加锁业务运行完成,就不会给当前的锁续期。即使不手动解锁,默认锁也会在30秒后自动删除 try { System.out.println()成功加锁,执行业务... "+Thread.currentThread().getId()); Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } finally { ///手动解锁 System.out.println("解锁..."+Thread.currentThread().getId()); lock.unlock(); } return "hello";}
- lock.lock()默认情况下,锁的过期时间为30s。只要锁设置成功,定时任务就会启动,每10秒自动续期至30s。
- lock.lock(10,TimeUnit.SECONDS),默认锁的过期时间是我们指定的时间。
基于Redisson分布式的Redison可重新进入公平锁也实现了java.util.concurrent.locks.Lock
接口的一种RLock
对象。还提供了异步(Async)、反射式(Reactive)与RxJava2标准接口。它保证了当多个Redisson客户端线程同时要求加锁时,优先分配给先发出要求的线程。所有要求线程将在一个队列中排队。当一个线程停机时,Redisson将等待5秒,然后继续下一个线程。也就是说,如果前面有5个线程处于等待状态,后面的线程将等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock");// fairlock最常见的使用方法.lock();
读写锁
基于Redisson分布式的Redison可以重新读写锁ReadWritelock Java对象已经实现java.util.concurrent.locks.ReadWriteLock
接口。读锁和写锁都继承了RLock
接口。
分布式可重新进入读写锁,允许同时加锁多个读写锁和一个写锁。
读锁
@GetMapping("/read")public String readValue(){ RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock"); String s = ""; //加读锁 RLock rLock = lock.readLock(); rLock.lock(); try { System.out.println(“读锁加锁成功”+Thread.currentThread().getId()); s = redisTemplate.opsForValue().get("writeValue"); Thread.sleep(30000); } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); System.out.println(“读锁释放”+Thread.currentThread().getId()); } return s;}
写锁
@GetMapping("/write")public String writeValue(){ // 获取一把锁 RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock"); String s = ""; // 加写锁 RLock rLock = lock.writeLock(); try { //1、改数据加写锁,阅读数据和阅读锁 rLock.lock(); System.out.println(”写锁加锁成功..."+Thread.currentThread().getId()); s = UUID.randomUUID().toString(); Thread.sleep(30000); redisTemplate.opsForValue().set("writeValue",s); } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); System.out.println(“写锁释放”+Thread.currentThread().getId()); } return s;}
测试
- 先添加写锁,再添加读锁。此时,数据不会立即添加读锁,而是需要等待写锁释放才能添加读锁
- 先加读锁,再加写锁:有读锁,写锁需要等
- 先加读锁,再加读锁:并发读锁相当于无锁模式,同时成功加锁
Redison基于Redis的分布式信号量(Semaphore)Java对象RSemaphore
采用了与java.util.concurrent.Semaphore
类似的界面和用法。它还提供异步(Async)、反射式(Reactive)与RxJava2标准接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore");semaphore.acquire();/或者semaphore.acquireAsync();semaphore.acquire(23);semaphore.tryAcquire();/或者semaphore.tryAcquireAsync();semaphore.tryAcquire(23, TimeUnit.SECONDS);/或者semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);semaphore.release(10);semaphore.release();/或者semaphore.releaseAsync();
Redison小结
redisson:Java客户端redis,分布式锁
玩法
2、3基于zookeeper/etcd实现3、实现基于mysql(唯一键索引)思路:
加锁:Insert into tb_lock(lock_name) values('lock‘)执行成功意味着获取锁并成功释放锁:执行成功请求并执行业务操作。执行完成后,通过delete删除相应的记录进行重试:递归1、独占排他互斥,唯一键索引2、防死锁:客户端程序获取锁后,客户端程序服务器停机。给锁一个获取锁的时间列,定时任务发现解锁(服务定时器检查)不能重复:可以重新进入,可以参考redis分布式锁的hash+lua脚本,先记录服务信息和线程信息 3.重新进入次数实现、防误删除:利用id的独特性防止4、原子性:新增删除自带锁,也可以借助MySQL的悲观锁5、可重复入:6、自动续期 服务器中的定时器重置锁定系统的时间为7、单机故障,mysql主备搭建
总结
1、简易程序:mysql>redis>zookeeper
2、性能
redis>zookeeper>mysql
3、可靠性
zk>redis=mysql