分布式锁

分布式锁:核心目标是互斥访问。确保在分布式环境下,同一时间只有一个进程/线程/节点能够访问某个共享资源 (例如:一个文件、数据库中的一行记录、一个共享配置、一个外部 API 调用配额等)。它解决的是资源竞争问题。

通常作用于一个具体的资源或一个临界区代码段。锁的范围相对较小和具体。

分布式锁通常必须满足

  • 互斥性:同一时刻只能有一个客户持有锁,其他客户等待。
  • 可靠性:锁不会无缘无故丢失,即使节点宕机,甚至是网络故障也要保证锁的可靠性
  • 性能:高并发场景下依然能快速争抢和释放锁

锁的实现方式有下面几种方案

  • 基于数据库实现的分布式锁
  • 基于缓存 redis 实现的分布式锁
  • 基于分布式协调软件 zookeeper 实现的分布式锁
  • 基于分布式 kv 存储 etcd 实现的分布式锁

基于数据库做的分布式锁原理也很简单,数据库本身会有锁的机制,创建一张锁表,为申请者在锁表里建立一条记录,记录建立成功则获得锁,消除记录则释放锁。

缺点吗?就是数据库的效率并不高,并且容易产生单点故障以及死锁的问题,如果在一个小项目或者 io 特别低的项目,这种方案完全没有问题。

使用缓存作为分布式锁,因为缓存放置在内存中所以无需磁盘的 io 问题,效率会非常的高,Redis 通常可以使用 setnx(key,value) 函数来实现分布式锁,其中 key 表示锁 id,value = currentTime + timeOut,表示当前时间 + 超时时间,超过超时时间则锁自动释放,setnx 返回 01 0 表示获取锁失败,1 表示获取锁成功。当0时可以设置一定的重试时间间隔,再次尝试获取锁,当1时则直接返回成功。再通过 Lua 脚本确保只有锁拥有者可以解锁,防范误删。

值得一提的是,redis 去实现的分布式锁实现了等待队列的方案,如果未获得锁,则将当前线程加入到等待队列中,当锁释放时,等待队列中的线程会重新尝试获取锁,顺序执行。

redis 等缓存去充当分布式锁的时候,优点在于,性能高,可以跨集群部署,核心就是不同的集群部署相同的数据,然后数据自动同步。

通过 redis 等缓存去实现的分布式锁最不靠谱的地方在于,通过超时时间去释放锁,万一没有执行完成就释放了锁呢?

zookeeper 使用树形结构去实现分布式锁,zk 的树形结构主要有四种,分别是持久节点,这也是 zk 的模式结构,持久顺序节点,根据创建的时间去节点自动进行排序,临时节点,当客户端与 ZooKeeper 断开连接后,该进程创建的临时节点就会被删除。临时顺序节点,而使用 zk 去实现分布式锁是使用的最后一种结构---临时顺序节点

实现锁的方法:

  1. 创建一个持久节点,并设置其值为锁的标识符。在与该方法对应的持久节点的目录下,为每个进程创建一个临时顺序节点。
  2. 每个进程获取所有临时节点列表,对比自己的编号是否最小,若最小,则获得锁。
  3. 若本进程对应的临时节点编号不是最小的,则继续判断:

若本进程为读请求,则向比自己序号小的最后一个写请求节点注册 watch 监听,当 监听到该节点释放锁后,则获取锁;

若本进程为写请求,则向比自己序号小的最后一个读请求节点注册 watch 监听,当 监听到该节点释放锁后,获取锁。

zk 解决分布式锁单点故障、不可重入、死锁等问题,不过因为频繁的创建临时顺序节点,性能比缓存实现的分布式锁差。

不过 zk 由于解决了分布式锁的大多数问题,并且也没有数据库和缓存的众多问题,实现起来也比较方便,各大框架都已经封装好了对 zk 的使用,所以分布式锁的首选就是 zookeeper。

分布式锁 go 语言实战

通常在具体实践中,自己去实现一个分布式锁麻烦又不可靠,所以通常,我们会使用中间件去实现分布式锁,比如 zookeeper、redis、etcd 等等。

下面讲解一下使用 go 语言去调用该中间件的 go driver 去实现具体分布式锁的方法。

redis

使用 SETNX 命令及过期时间 (TTL) 实现锁以及避免死锁这两个功能。

再通过 Lua 脚本确保只有锁拥有者可以解锁,防范误删。

应用场景:电商秒杀等高并发但一致性要求不高的场景。

示例代码:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

var ctx = context.Background()

func acquireLock(client *redis.Client, key, value string, ttl time.Duration) (bool, error) {
    ok, err := client.SetNX(ctx, key, value, ttl).Result()
    return ok, err
}

func releaseLock(client *redis.Client, key, value string) error {
  // lua 脚本,表示确认是否是锁的设置者
    script := `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) end`
    _, err := client.Eval(ctx, script, []string{key}, value).Result()
    return err
}

func main() {
    client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    defer client.Close()

    key := "lock-now"
    value := "client1"// Unique ID
    ttl := 5 * time.Second // 超时设置时间5秒

    if ok, err := acquireLock(client, key, value, ttl); ok && err == nil { // 尝试获取锁
        fmt.Println("任务开始")
        time.Sleep(2 * time.Second) // 模拟任务耗时
        releaseLock(client, key, value) // 释放锁
        fmt.Println("释放锁")
    } else {
        fmt.Println("错误:", err)
    }
}

zookeeper

通过临时有序节点机制进行排队式锁竞争,保障严格一致性。每个客户端创建节点后,检查自己编号是否最小,从而决定是否获得锁

应用场景:强一致性诉求,如金融、关键位置的调度等。需要严格一致性要求。

示例代码:

package main

import (
    "fmt"
    "sort"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

func acquireLock(conn *zk.Conn, path string) (string, error) {
    node, err := conn.Create(path+"/lock-", nil, zk.FlagEphemeral|zk.FlagSequence)
    
    if err != nil {
        return"", err
    }

    for {
        kids, _, err := conn.Children(path)
        if err != nil {
            return"", err
        }
        sort.Strings(kids)
        if path+"/"+kids[0] == node {
            return node, nil// You’re up!
        }
        prev := kids[0] // Watch the guy in front
        for i, k := range kids {
            if path+"/"+k == node {
                prev = kids[i-1]
                break
            }
        }
        _, _, ch, _ := conn.Get(path + "/" + prev)
        <-ch // Wait for them to leave
    }
}

func main() {
    conn, _, err := zk.Connect([]string{"localhost:2181"}, 5*time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    path := "/locks"
    if node, err := acquireLock(conn, path); err == nil {
        fmt.Println("Locked:", node)
        time.Sleep(2 * time.Second)
        conn.Delete(node, -1)
        fmt.Println("Unlocked!")
    } else {
        fmt.Println("Oops:", err)
    }
}

etcd

etcd 采用租约 (lease) 与键竞争机制,客户端只要持有租约且键未被他人占用,即可获取锁。

应用场景:云原生、Kubernetes 周边应用,兼顾性能与一致性。

示例代码:

package main

import (
    "context"
    "fmt"
    "time"

    "go.etcd.io/etcd/client/v3"
)

func acquireLock(cli *clientv3.Client, key string, ttl int64) (*clientv3.LeaseGrantResponse, error) {
    
    lease, err := cli.Grant(context.Background(), ttl)
    
    if err != nil {
        returnnil, err
    }

    txn := cli.Txn(context.Background()).
        If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
        Then(clientv3.OpPut(key, "locked", clientv3.WithLease(lease.ID)))
    
    resp, err := txn.Commit()
    
    if err != nil || !resp.Succeeded {
        returnnil, fmt.Errorf("lock failed")
    }

    return lease, nil
}

func main() {
    cli, _ := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    defer cli.Close()

    key := "/desk_lock"
    
    if lease, err := acquireLock(cli, key, 10); err == nil {
        fmt.Println("Desk’s mine!")
        time.Sleep(2 * time.Second)
        cli.Revoke(context.Background(), lease.ID)
        fmt.Println("Desk’s free!")
    } else {
        fmt.Println("No desk:", err)
    }
}

三者对比

工具 特点 优劣权衡 典型场景
Redis 超高并发、简单部署 一致性略弱 秒杀系统
ZooKeeper 强一致性、公平排队 部署和维护复杂 关键资源调度
etcd Go 原生、云原生契合 高压下有延迟风险 K8s 周边

工程实践

常见优化方法

细颗粒度分布式锁

细分锁,不能全局一把锁,一把锁的性能很差

控制超时以及重试

// 一定要设置锁的存活时间 ttl ,防止死锁
func tryLock(client *redis.Client, key string, ttl time.Duration, retries int) (bool, error) {
    ctx, cancel := context.WithTimeout(context.Background(), ttl)
    defer cancel()
    backoff := 100 * time.Millisecond
    for i := 0; i < retries; i++ {
        if ok, err := acquireLock(client, key, "client-123", ttl); ok && err == nil {
            return true, nil
        }
        time.Sleep(backoff)
        backoff *= 2
    }
    return false, fmt.Errorf("gave up after %d tries", retries)
}

监控锁的使用情况

用 Prometheus 等埋点的方法去追踪锁请求/释放延时,发现瓶颈。

常见错误

  • 使用 redis 锁时,锁被误删的情况,解决方案就是使用 lua 脚本去限制只能设置者才能删除锁
  • ZooKeeper 网络波动时锁丢失,解决方案是,增加断线重连和状态二次确认机制
    func lockWithRetry(conn *zk.Conn, path string) (string, error) {
        for {
          node, err := acquireLock(conn, path)
          // 状态二次确认
          if err == nil && conn.State() == zk.StateConnected {
              return node, nil
          }
          time.Sleep(time.Second)
          // 重连
          conn, _, _ = zk.Connect([]string{"localhost:2181"}, 5*time.Second)
        }
      }
    
  • etcd 高并发下租约阻塞:提前分配租约,缓存复用。
        type LeasePool struct {
        leases []clientv3.LeaseID
          sync.Mutex
        }
    
      func (p *LeasePool) Get(cli *clientv3.Client, ttl int64) (clientv3.LeaseID, error) {
          p.Lock()
          defer p.Unlock()
          if len(p.leases) > 0 {
              id := p.leases[0]
              p.leases = p.leases[1:]
              return id, nil
          }
          lease, err := cli.Grant(context.Background(), ttl)
          return lease.ID, err
      }
    

电商秒杀防超卖

package main

import (
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

type Shop struct {
    client *redis.Client
}

func (s *Shop) Buy(itemID, userID string) (bool, error) {
    lockKey := fmt.Sprintf("lock:%s", itemID)
    uuid := userID + "-" + fmt.Sprint(time.Now().UnixNano())
    ttl := 5 * time.Second

    // 获取锁
    if ok, err := acquireLock(s.client, lockKey, uuid, ttl); !ok || err != nil {
        returnfalse, err
    }
    // 最后释放锁
    defer releaseLock(s.client, lockKey, uuid)

    stockKey := fmt.Sprintf("stock:%s", itemID)
    stock, _ := s.client.Get(context.Background(), stockKey).Int()
    if stock <= 0 {
        returnfalse, nil
    }
    s.client.Decr(context.Background(), stockKey)
    returntrue, nil
}

func main() {
    client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    shop := &Shop{client}
    client.Set(context.Background(), "stock:item1", 5, 0) // 5 units
    for i := 0; i < 10; i++ {
        gofunc(id int) {
            if ok, _ := shop.Buy("item1", fmt.Sprintf("user%d", id)); ok {
                fmt.Printf("User %d scored!\n", id)
            } else {
                fmt.Printf("User %d out of luck\n", id)
            }
        }(i)
    }
    time.Sleep(2 * time.Second)
}

分布式任务调度唯一执行

基于 etcd,为定时任务 (如日志清理) 提供 “全局唯一运行” 保障,防止重复执行

package main

import (
    "fmt"
    "time"

    "go.etcd.io/etcd/client/v3"
)

type Scheduler struct {
    client *clientv3.Client
}

func (s *Scheduler) Run(taskID string) error {
    key := fmt.Sprintf("/lock/%s", taskID)
    lease, err := acquireLock(s.client, key, 10)
    if err != nil {
        return err
    }
    defer s.client.Revoke(context.Background(), lease.ID)

    fmt.Printf("Running %s\n", taskID)
    time.Sleep(2 * time.Second) // Fake work
    fmt.Printf("%s done\n", taskID)
    returnnil
}

func main() {
    cli, _ := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}})
    defer cli.Close()
    s := &Scheduler{cli}
    for i := 0; i < 3; i++ {
        gofunc() {
            s.Run("cleanup")
        }()
    }
    time.Sleep(5 * time.Second)
}

参考资料

  • https://mp.weixin.qq.com/s/FsOkz265kFMh_fuQZYDlvA