redis主从复制模式下使用分布式锁:
go协程对redis主服务器setnx,把主服务器的setnx后的结果同步到从服务器

带来的问题:

  1. master节点宕机,从节点没来得及同步到setnx的数据
  2. 网络故障,setnx的数据已过期

非redis主从模式,redis服务器处于同等地位,go协程对所有redis服务器setnx,把set到多数redis服务器的go协程视为拿锁成功

扣减库存程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (s *Service) Sell(ctx context.Context, request *inventorypb.SellInfoRequest) (*emptypb.Empty, error) {
// 必须所有商品扣减成功,不能第一件扣除,第二件失败
tx := s.DB.Begin()
for _, goodsInfo := range request.GoodsInfo {

mutex := s.RedSync.NewMutex(fmt.Sprintf("inv_%d", goodsInfo.GoodsId))
if err := mutex.Lock(); err != nil {
s.Logger.Error("can not get redis lock", zap.Error(err))
return nil, status.Error(codes.Internal, "can not get redis lock")
}

var inv model.Inventory
res := s.DB.Where(&model.Inventory{GoodsID: goodsInfo.GoodsId}).First(&inv)
if res.Error != nil {
s.Logger.Error("can not get inv", zap.Error(res.Error))
return nil, status.Error(codes.Internal, "")
}
if res.RowsAffected == 0 {
// 没有该商品的库存信息
tx.Rollback()
return nil, status.Error(codes.NotFound, "")
}
if inv.Stocks < goodsInfo.Num {
// 现有库存小于购买数量
tx.Rollback()
return nil, status.Error(codes.ResourceExhausted, "")
}
// 数据库真实扣减小于预期扣减,超卖问题
inv.Stocks -= goodsInfo.Num
tx.Save(&inv)

if ok, err := mutex.Unlock(); !ok || err != nil {
s.Logger.Error("can not unlock redis")
return nil, status.Error(codes.Internal, "")
}
}
tx.Commit()

return &emptypb.Empty{}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// redsync lock源码
func (m *Mutex) LockContext(ctx context.Context) error {
// 生成随机value防止被其他协程删除
value, err := m.genValueFunc()
if err != nil {
return err
}

// tries为拿锁失败尝试次数
for i := 0; i < m.tries; i++ {
if i != 0 {
// 拿锁失败sleep一段时间
time.Sleep(m.delayFunc(i))
}

start := time.Now()

// 对每一台redis集群进行setnx
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.acquire(ctx, pool, value)
})
if n == 0 && err != nil {
return err
}

now := time.Now()
// expiry:超时时间,防止拿锁后宕机其他协程无法拿锁导致死锁
// 超时时间 - 拿锁时间 - 时钟飘逸(服务器间有时钟差) = 剩余时间
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.factor)))
// quorum:len(redis集群数)/2 + 1,redis服务器数应为奇数
// 若对大多数redis服务器setnx成功并且key没有过时,才算拿锁成功
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
// 否则释放锁
_, _ = m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}

return ErrFailed
}

func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
type result struct {
Status bool
Err error
}

ch := make(chan result)
// 对每台redis服务器异步拿锁,actFn就是setnx
for _, pool := range m.pools {
go func(pool redis.Pool) {
r := result{}
r.Status, r.Err = actFn(pool)
ch <- r
}(pool)
}
n := 0
var err error
for range m.pools {
r := <-ch
// 拿锁成功
if r.Status {
n++
} else if r.Err != nil {
err = multierror.Append(err, r.Err)
}
}
return n, err
}