为什么需要分布式锁
用户下单
锁住 uid,防止重复下单。
库存扣减
锁住库存,防止超卖。
余额扣减
锁住账户,防止并发操作。 分布式系统中共享同一个资源时往往需要分布式锁来保证变更资源一致性。
分布式锁需要具备特性
排他性
锁的基本特性,并且只能被第一个持有者持有。
防死锁
高并发场景下临界资源一旦发生死锁非常难以排查,通常可以通过设置超时时间到期自动释放锁来规避。
可重入
锁持有者支持可重入,防止锁持有者再次重入时锁被超时释放。
高性能高可用
锁是代码运行的关键前置节点,一旦不可用则业务直接就报故障了。高并发场景下,高性能高可用是基本要求。
实现 Redis 锁应先掌握哪些知识点
set 命令
SET key value [EX seconds] [PX milliseconds] [NX|XX]
EX
second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX
millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX
:只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX
:只在键已经存在时,才对键进行设置操作。
Redis.lua 脚本
使用 Redis lua 脚本能将一系列命令操作封装成 pipline 实现整体操作的原子性。
go-zero 分布式锁 RedisLock 源码分析
core/stores/redis/redislock.go
加锁流程
--KEYS[1]:锁key--ARGV[1]:锁value,随机字符串--ARGV[2]:过期时间--判断锁key持有的value是否等于传入的value--如果相等说明是再次获取锁并更新获取时间,防止重入时过期--这里说明是“可重入锁”ifredis.call("GET",KEYS[1])==ARGV[1]then--设置redis.call("SET",KEYS[1],ARGV[1],"PX",ARGV[2])return"OK"else--锁key.value不等于传入的value则说明是第一次获取锁--SETkeyvalueNXPXtimeout:当key不存在时才设置key的值--设置成功会自动返回“OK”,设置失败返回“NULLBulkReply”--为什么这里要加“NX”呢,因为需要防止把别人的锁给覆盖了returnredis.call("SET",KEYS[1],ARGV[1],"NX","PX",ARGV[2])end
解锁流程
--释放锁--不可以释放别人的锁ifredis.call("GET",KEYS[1])==ARGV[1]then--执行成功返回“1”returnredis.call("DEL",KEYS[1])elsereturn0end
源码解析
packageredisimport("math/rand""strconv""sync/atomic""time"red"github.com/go-redis/redis""github.com/tal-tech/go-zero/core/logx")const(letters="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"lockCommand=`ifredis.call("GET",KEYS[1])==ARGV[1]thenredis.call("SET",KEYS[1],ARGV[1],"PX",ARGV[2])return"OK"elsereturnredis.call("SET",KEYS[1],ARGV[1],"NX","PX",ARGV[2])end`delCommand=`ifredis.call("GET",KEYS[1])==ARGV[1]thenreturnredis.call("DEL",KEYS[1])elsereturn0end`randomLen=16//默认超时时间,防止死锁tolerance=500//millisecondsmillisPerSecond=1000)//ARedisLockisaredislock.typeRedisLockstruct{//redis客户端store*Redis//超时时间secondsuint32//锁keykeystring//锁value,防止锁被别人获取到idstring}funcinit(){rand.Seed(time.Now().UnixNano())}//NewRedisLockreturnsaRedisLock.funcNewRedisLock(store*Redis,keystring)*RedisLock{return&RedisLock{store:store,key:key,//获取锁时,锁的值通过随机字符串生成//实际上go-zero提供更加高效的随机字符串生成方式//见core/stringx/random.go:Randnid:randomStr(randomLen),}}//Acquireacquiresthelock.//加锁func(rl*RedisLock)Acquire()(bool,error){//获取过期时间seconds:=atomic.LoadUint32(&rl.seconds)//默认锁过期时间为500ms,防止死锁resp,err:=rl.store.Eval(lockCommand,[]string{rl.key},[]string{rl.id,strconv.Itoa(int(seconds)*millisPerSecond+tolerance),})iferr==red.Nil{returnfalse,nil}elseiferr!=nil{logx.Errorf("Erroronacquiringlockfor%s,%s",rl.key,err.Error())returnfalse,err}elseifresp==nil{returnfalse,nil}reply,ok:=resp.(string)ifok&&reply=="OK"{returntrue,nil}logx.Errorf("Unknownreplywhenacquiringlockfor%s:%v",rl.key,resp)returnfalse,nil}//Releasereleasesthelock.//释放锁func(rl*RedisLock)Release()(bool,error){resp,err:=rl.store.Eval(delCommand,[]string{rl.key},[]string{rl.id})iferr!=nil{returnfalse,err}reply,ok:=resp.(int64)if!ok{returnfalse,nil}returnreply==1,nil}//SetExpiresetstheexpire.//需要注意的是需要在Acquire()之前调用//不然默认为500ms自动释放func(rl*RedisLock)SetExpire(secondsint){atomic.StoreUint32(&rl.seconds,uint32(seconds))}funcrandomStr(nint)string{b:=make([]byte,n)fori:=rangeb{b[i]=letters[rand.Intn(len(letters))]}returnstring(b)}
关于分布式锁还有哪些实现方案
etcd
redis redlock
项目地址
https://github.com/zeromicro/go-zero
欢迎使用 go-zero
并 star 支持我们!
微信交流群
关注『微服务实践』公众号并点击 交流群 获取社区群二维码。