35人参与 • 2025-08-12 • Redis
我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。
结合我的其中某一业务需求:多个用户在同一个区域内发现只有一辆可租的车,最终结果肯定只有一位用户租车成功,这就产生了多线程(多个用户)抢同一资源的问题。
暂且抛开性能问题,项目为了高可用,都会做集群部署,那么synchronized就失去了加锁的意义,这里多嘴解释一下:
没错!!乐观锁可以解决的我的问题,但是在高并发的场景,频繁的操作数据库,数据库的资源是很珍贵的,并且还存在性能的问题。但是我这里简单说下乐观锁的使用:
select * from u_car where car_id = 10;
update u_car set status = 2,version = version +1 where car_id = 10 and version = 1
在修改的时候将version作为参数,如果其他用户锁车,那么version已经发生变化(version = version +1),所以version = 1不成立,修改失败
乐观锁不是本次的终点,但还是简单说下;
public boolean lock(string key, v v, int expiretime){ //获取锁 //在redis早期版本中,设置key和key的存活时间是分开的,设置key成功,但是设置存活时间时服务宕机,那么你的key就永远不会过期,有bug //后来redis将加锁和设置时间用同一个命令 //这里是重点,redis.setnx(key,value,time)方法是原子性的,设置key成功说明锁车成功,如果失败说明该车被别人租了 boolean b = false; try { b = redis.setnx(key, v, expiretime); } catch (exception e) { log.error(e.getmessage(), e); } return b; } public boolean unlock(string key){ return redis.delete(key); } }
但是这样写还是存在bug的,我的key设置了加锁时间为5秒,但是我的业务逻辑5秒还没有执行完成,key过期了,那么其他用户执行redis.setnx(key, v, expiretime)时就成功了,将该车锁定,又产生了抢资源;我们想一下,如果我能够在业务逻辑没有执行完的时候,让锁过期后能够延长锁的时间,是不是就解决了上面的bug;
实现这个锁的延长,非要自己动手的话就得另启一个线程来监听我们的业务线程,每隔1秒监测当前业务线程是否执行完成,如果没有就获取key的存活时间,时间小于一个阈值时,就自动给key设置n秒;当然,我们可以不用自己动手,redission已经帮我们实现key的时间时间过期问题;
//引入依赖 <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency> <dependency> <groupid>org.redisson</groupid> <artifactid>redisson-spring-boot-starter</artifactid> <version>3.10.6</version> </dependency>
redisson支持单点、集群等模式,这里选择单点的。
spring: redis: host: 127.0.0.1 port: 6379 password:
@configuration public class redisconfig { @value("${spring.redis.host}") private string host; @bean(name = {"redistemplate", "stringredistemplate"}) public stringredistemplate stringredistemplate(redisconnectionfactory factory) { stringredistemplate redistemplate = new stringredistemplate(); redistemplate.setconnectionfactory(factory); return redistemplate; } @bean public redisson redisson() { config config = new config(); config.usesingleserver().setaddress("redis://" + host + ":6379"); return (redisson) redisson.create(config); } }
private logger log = loggerfactory.getlogger(getclass()); @resource private redisson redisson; //加锁 public boolean lock(string key,long waittime,long leasetime){ boolean b = false; try { rlock rlock = redisson.getlock(key); //说下参数 waittime:锁的存活时间 leasetime:锁的延长时间 后面的参数是单位 b = rlock.trylock(waittime,leasetime,timeunit.seconds); } catch (exception e) { log.error(e.getmessage(), e); } } return b; } //释放锁 public void unlock(string key){ try { rlock rlock = redisson.getlock(key); if(null!=lock){ lock.unlock(); lock.forceunlock(); filelog.info("unlock succesed"); } } catch (exception e) { filelog.error(e.getmessage(), e); } }
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception { long time = unit.tomillis(waittime); long current = system.currenttimemillis(); long threadid = thread.currentthread().getid(); //尝试获取锁,如果没取到锁,则获取锁的剩余超时时间 long ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null) { return true; } //如果waittime已经超时了,就返回false time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(threadid); return false; } current = system.currenttimemillis(); rfuture<redissonlockentry> subscribefuture = subscribe(threadid); if (!await(subscribefuture, time, timeunit.milliseconds)) { if (!subscribefuture.cancel(false)) { subscribefuture.oncomplete((res, e) -> { if (e == null) { unsubscribe(subscribefuture, threadid); } }); } acquirefailed(threadid); return false; } try { time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(threadid); return false; } //进入死循环,反复去调用tryacquire尝试获取锁,ttl为null时就是别的线程已经unlock了 while (true) { long currenttime = system.currenttimemillis(); ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null) { return true; } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(threadid); return false; } // waiting for message currenttime = system.currenttimemillis(); if (ttl >= 0 && ttl < time) { getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds); } else { getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds); } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(threadid); return false; } } } finally { unsubscribe(subscribefuture, threadid); } // return get(trylockasync(waittime, leasetime, unit)); }
可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。
上面的lua(俗称胶水语言)脚本比较重要,主要是为了执行命令的原子性解释一下:
第一段if就是判断你的key是否存在,如果不存在,就执行redis call(hset key argv[2],1)加锁和设置redis call(pexpire key argv[1])存活时间;
当第二个客户来加锁时,第一个if判断已存在key,就执行第二个if判断key的hash是否存在客户端2的id,很明显不是;
则进入到最后的return返回该key的剩余存活时间
当加锁成功后会在后台启动一个watch dog(看门狗)线程,key的默认存活时间为30秒,则watch dog每隔10秒钟就会检查一下客户端1是否还持有该锁,如果持有,就会不断的延长锁key的存活时间
所以这里建议大家在设置key的存活时间时,最好大于10秒,延续时间也大于等于10秒
所以,总体流程应该是这样的。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论