服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - 如何使用Redis实现电商系统的库存扣减

如何使用Redis实现电商系统的库存扣减

2022-02-10 17:49PHP开源社区 Redis

在日常开发中有很多地方都有类似扣减库存的操作,本文主要介绍了如何使用Redis实现电商系统的库存扣减,具有一定的参考价值,感兴趣的可以了解一下

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

使用mysql数据库,使用一个字段来存储库存,每次扣减库存去更新这个字段。
还是使用数据库,但是将库存分层多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。
将库存放到redis使用redis的incrby特性来扣减库存。

分析

在上面的第一种和第二种方式都是基于数据来扣减库存。

基于数据库单库存

第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

基于数据库多库存

第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。

基于数据库来实现扣减库存还存在的一些问题:

用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:
update number set x=x-1 where x > 0

MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。

当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。

基于redis

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

基于redis实现扣减库存的具体实现

我们使用redis的lua脚本来实现扣减库存
由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

初始化库存回调函数(IStockCallback )

?
1
2
3
4
5
6
7
8
9
10
11
12
/**
 * 获取库存回调
 * @author yuhao.wang
 */
public interface IStockCallback {
 
 /**
  * 获取库存
  * @return
  */
 int getStock();
}

扣减库存服务(StockService)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/**
 * 扣库存
 *
 * @author yuhao.wang
 */
@Service
public class StockService {
    Logger logger = LoggerFactory.getLogger(StockService.class);
 
    /**
     * 不限库存
     */
    public static final long UNINITIALIZED_STOCK = -3L;
 
    /**
     * Redis 客户端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    /**
     * 执行扣库存的脚本
     */
    public static final String STOCK_LUA;
 
    static {
        /**
         *
         * @desc 扣减库存Lua脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock)0:表示没有库存
         * 库存(stock)大于0:表示剩余库存
         *
         * @params 库存key
         * @return
         *   -3:库存未初始化
         *   -2:库存不足
         *   -1:不限库存
         *   大于等于0:剩余库存(扣减之后剩余的库存)
         *      redis缓存的库存(value)是-1表示不限库存,直接返回1
         */
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }
 
    /**
     * @param key           库存key
     * @param expire        库存有效时间,单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,避免并发时重复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final int initStock = stockCallback.getStock();
                        // 将库存设置到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
 
        }
        return stock;
    }
 
    /**
     * 加库存(还原库存)
     *
     * @param key    库存key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {
 
        return addStock(key, null, num);
    }
 
    /**
     * 加库存
     *
     * @param key    库存key
     * @param expire 过期时间(秒)
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判断key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }
 
        Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }
 
        return num;
    }
 
    /**
     * 获取库存
     *
     * @param key 库存key
     * @return -1:不限库存; 大于等于0:剩余库存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }
 
    /**
     * 扣库存
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的ARGV参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));
 
        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }
 
                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
 
}

调用

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * @author yuhao.wang
 */
@RestController
public class StockController {
 
    @Autowired
    private StockService stockService;
 
    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }
 
    /**
     * 获取初始的库存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 1000;
    }
 
    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.getStock(redisKey);
    }
 
    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品ID
        long commodityId = 2;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.addStock(redisKey, 2);
    }
}

到此这篇关于如何使用Redis实现电商系统的库存扣减的文章就介绍到这了,更多相关Redis 库存扣减内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://juejin.cn/post/7049205829316116511

延伸 · 阅读

精彩推荐
  • RedisRedis如何实现数据库读写分离详解

    Redis如何实现数据库读写分离详解

    Redis的主从架构,能帮助我们实现读多,写少的情况,下面这篇文章主要给大家介绍了关于Redis如何实现数据库读写分离的相关资料,文中通过示例代码介绍...

    罗兵漂流记6092019-11-11
  • RedisRedis的配置、启动、操作和关闭方法

    Redis的配置、启动、操作和关闭方法

    今天小编就为大家分享一篇Redis的配置、启动、操作和关闭方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 ...

    大道化简5312019-11-14
  • Redis详解Redis复制原理

    详解Redis复制原理

    与大多数db一样,Redis也提供了复制机制,以满足故障恢复和负载均衡等需求。复制也是Redis高可用的基础,哨兵和集群都是建立在复制基础上实现高可用的...

    李留广10222021-08-09
  • RedisRedis全量复制与部分复制示例详解

    Redis全量复制与部分复制示例详解

    这篇文章主要给大家介绍了关于Redis全量复制与部分复制的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis爬虫具有一定的参考学习...

    豆子先生5052019-11-27
  • Redisredis中如何使用lua脚本让你的灵活性提高5个逼格详解

    redis中如何使用lua脚本让你的灵活性提高5个逼格详解

    这篇文章主要给大家介绍了关于redis中如何使用lua脚本让你的灵活性提高5个逼格的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具...

    一线码农5812019-11-18
  • Redisredis 交集、并集、差集的具体使用

    redis 交集、并集、差集的具体使用

    这篇文章主要介绍了redis 交集、并集、差集的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友...

    xiaojin21cen10152021-07-27
  • RedisRedis 事务知识点相关总结

    Redis 事务知识点相关总结

    这篇文章主要介绍了Redis 事务相关总结,帮助大家更好的理解和学习使用Redis,感兴趣的朋友可以了解下...

    AsiaYe8232021-07-28
  • Redisredis实现排行榜功能

    redis实现排行榜功能

    排行榜在很多地方都能使用到,redis的zset可以很方便地用来实现排行榜功能,本文就来简单的介绍一下如何使用,具有一定的参考价值,感兴趣的小伙伴们...

    乘月归5022021-08-05