mysql悲观锁

1
2
3
4
5
6
7
8
9
10
11
12
-- 关闭事务自动提交
set autocommit=0;
-- for update为行锁,仅对当前行有效
select * from inventory where id=1 for update;
-- 另一个连接等待
select * from inventory where id=1 for update;
-- 释放锁
commit;
-- 在无索引的情况下升级为表锁
select * from inventory where stocks=100 for update;
-- 另一个连接等待,不使用for update就不会锁住
select * from inventory where stocks=200 for update;

悲观锁实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// SellInfoRequest为购买的商品列表
func (s *Service) Sell(ctx context.Context,
request *inventorypb.SellInfoRequest) (*emptypb.Empty, error) {
// 必须所有商品扣减成功,不能第一件扣除,第二件失败
tx := s.DB.Begin()
for _, goodsInfo := range request.GoodsInfo {
var inv model.Inventory
// 使用for update悲观锁实现分布式锁,提交事物释放锁
res := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(
&model.Inventory{GoodsID: goodsInfo.GoodsId}).First(&inv)
if res.Error != nil {
tx.Rollback()
s.Logger.Error("can not get inv", zap.Error(res.Error))
return nil, status.Error(codes.Internal, "")
}
if res.RowsAffected == 0 {
// 没有该商品的库存信息
tx.Rollback()
return nil, status.Error(codes.NotFound, "")
}
if inv.Stocks < goodsInfo.Num {
// 现有库存小于购买数量
tx.Rollback()
return nil, status.Error(codes.ResourceExhausted, "")
}
// 数据库真实扣减小于预期扣减,超卖问题
inv.Stocks -= goodsInfo.Num
// 需要使用begin返回的对象操作
tx.Save(&inv)
}
tx.Commit()

return &emptypb.Empty{}, nil
}

mysql乐观锁

1
update inventory set stocks = stocks - 10, version = version + 1 where id = 1 and version = version; 

乐观锁实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (s *Service) Sell(ctx context.Context, 
request *inventorypb.SellInfoRequest) (*emptypb.Empty, error) {
// 必须所有商品扣减成功,不能第一件扣除,第二件失败
tx := s.DB.Begin()
for _, goodsInfo := range request.GoodsInfo {
var inv model.Inventory
for {
// 使用乐观锁实现分布式锁,这里查询不要使用事务的对象,因为未提交拿不到最新的version
res := s.DB.Where(&model.Inventory{GoodsID: goodsInfo.GoodsId}).First(&inv)
if res.Error != nil {
s.Logger.Error("can not get inv", zap.Error(res.Error))
return nil, status.Error(codes.Internal, "")
}
if res.RowsAffected == 0 {
// 没有该商品的库存信息
tx.Rollback()
return nil, status.Error(codes.NotFound, "")
}
if inv.Stocks < goodsInfo.Num {
// 现有库存小于购买数量
tx.Rollback()
return nil, status.Error(codes.ResourceExhausted, "")
}
// 数据库真实扣减小于预期扣减,超卖问题
inv.Stocks -= goodsInfo.Num
res = tx.Exec("update inventory set stocks = ?, version = ? where goods_id = ? and version = ?",
inv.Stocks, inv.Version+1, goodsInfo.GoodsId, inv.Version)
if res.Error != nil {
s.Logger.Error("can not update inv", zap.Error(res.Error))
return nil, status.Error(codes.Internal, "")
}
if res.RowsAffected == 0 {
// 扣减失败重试
s.Logger.Info("sell failed")
} else {
break
}
}

}
tx.Commit()

return &emptypb.Empty{}, nil
}