服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - Redis分布式限流组件设计与使用实例

Redis分布式限流组件设计与使用实例

2021-09-16 17:34程序员小强 Redis

本文主要讲解基于 自定义注解+Aop+反射+Redis+Lua表达式 实现的限流设计方案。实现的限流设计与实际使用。具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文主要讲解基于 自定义注解+aop+反射+redis+lua表达式 实现的限流设计方案。实现的限流设计与实际使用。

1.背景

在互联网开发中经常遇到需要限流的场景一般分为两种

  • 业务场景需要(比如:5分钟内发送验证码不超过xxx次);
  • 对流量大的功能流量削峰;

一般我们衡量系统处理能力的指标是每秒的qps或者tps,假设系统每秒的流量阈值是2000,
理论上第2001个请求进来时,那么这个请求就需要被限流。

本文演示项目使用的是 springboot 项目,项目构建以及其他配置,这里不做演示。文末附限流demo源码

 

2.redis计数器限流设计

本文演示项目使用的是 springboot 项目,这里仅挑选了重点实现代码展示,
项目构建以及其他配置,这里不做演示,详细配置请参考源码demo工程。

 

2.1lua脚本

lua 是一种轻量小巧的脚本语言可以理解为就是一组命令。
使用redis的计数器达到限流的效果,表面上redis自带命令多个组合也可以支持了,那为什么还要用lua呢?
因为要保证原子性,这也是使用redis+lua表达式原因,一组命令要么全成功,要么全失败。
相比redis事务,lua脚本的优点:

  • 减少网络开销:多个请求通过脚本一次发送,减少网络延迟
  • 原子操作:将脚本作为一个整体执行,中间不会插入其他命令,无需使用事务
  • 复用:客户端发送的脚本永久存在redis中,其他客户端可以复用脚本
  • 可嵌入性:可嵌入java,c#等多种编程语言,支持不同操作系统跨平台交互

实现限流lua脚本示例

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 定义计数变量
local count
# 获取调用脚本时传入的第一个key值(用作限流的 key)
count = redis.call('get',keys[1])
# 限流最大值比较,若超过最大值,则直接返回
if count and tonumber(count) > tonumber(argv[1]) then
return count;
end
# incr 命令 执行计算器累加
count = redis.call('incr',keys[1])
# 从第一次调用开始限流,并设置失效时间
if tonumber(count) == 1 then
redis.call('expire',keys[1],argv[2])
end
return count;

参数说明

  • keys[1] - redis的key
  • argv[1] - 限流次数
  • argv[2] - 失效时间

 

2.2自定义注解

支持范围:任意接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
 * 描述: 限流注解
 *
 * @author 程序员小强
 **/
@target({elementtype.type, elementtype.method})
@retention(retentionpolicy.runtime)
public @interface ratelimit {
 
    /**
     * 限流唯一标示 key
     * 若同时使用 keyfiled 则当前 key作为前缀
     */
    string key();
 
    /**
     * 限流时间-单位:秒数
     * 默认 60s
     */
    int time() default 60;
 
    /**
     * 限流次数
     * 失效时间段内最大放行次数
     */
    int count();
 
    /**
     * 可作为限流key-参数类中属性名,动态值
     * 示例:phone、userid 等
     */
    string keyfield() default "";
 
    /**
     * 超过最大访问次数后的,提示内容
     */
    string msg() default "over the max request times please try again";
 
}

属性介绍

  • key - 必填,限流key唯一标识,redis存储key
  • time -过期时间,单位 秒,默认60s
  • count - 必填,失效时间段内最大放行次数
  • keyfield - 动态限流key,比如参数是一个自定义的类,里面有属性userid 等。可以使用keyfield=“userid”,

这样生成的key为参数中userid的值。一般与key属性组合使用。不支持java基本类型参数,
仅支持参数是一个对象的接口。

msg - 超过限流的提示内容

示例:

?
1
@ratelimit(key = "limit-phone-key", time = 300, count = 10, keyfield = "phone", msg = "5分钟内,验证码最多发送10次")

含义 - 5分钟内根据手机号限流10次
rediskey- limit-phone-key:后面拼接的是参数中phone的值。

 

2.3限流组件

这里用的是jedis客户端,配置就不列在这里的,详见源码,文末附源码地址

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
 * redis限流组件
 *
 * @author 程序员小强
 */
@component
public class redisratelimitcomponent {
    private static final logger logger = loggerfactory.getlogger(redisratelimitcomponent.class);
 
    private jedispool jedispool;
 
    @autowired
    public redisratelimitcomponent(jedispool jedispool) {
        this.jedispool = jedispool;
    }
 
    /**
     * 限流方法
     * 1.执行 lua 表达式
     * 2.通过 lua 表达式实现-限流计数器
     *
     * @param rediskey
     * @param time           超时时间-秒数
     * @param ratelimitcount 限流次数
     */
    public long ratelimit(string rediskey, integer time, integer ratelimitcount) {
        jedis jedis = null;
        try {
            jedis = jedispool.getresource();
            object obj = jedis.evalsha(jedis.scriptload(this.buildluascript()), collections.singletonlist(rediskey),
                    arrays.aslist(string.valueof(ratelimitcount), string.valueof(time)));
            return long.valueof(obj.tostring());
        } catch (jedisexception ex) {
            logger.error("[ executelua ] >> messages:{}", ex.getmessage(), ex);
            throw new ratelimitexception("[ redisratelimitcomponent ] >> jedis run lua script exception" + ex.getmessage());
        } finally {
            if (jedis != null) {
                if (jedis.isconnected()) {
                    jedis.close();
                }
            }
        }
    }
 
    /**
     * 构建lua 表达式
     * keys[1] -- 参数key
     * argv[1]-- 失效时间段内最大放行次数
     * argv[2]-- 失效时间|秒
     */
    private string buildluascript() {
        stringbuilder luabuilder = new stringbuilder();
        //定义变量
        luabuilder.append("local count");
        //获取调用脚本时传入的第一个key值(用作限流的 key)
        luabuilder.append(" count = redis.call('get',keys[1])");
        // 获取调用脚本时传入的第一个参数值(限流大小)-- 调用不超过最大值,则直接返回
        luabuilder.append(" if count and tonumber(count) > tonumber(argv[1]) then");
        luabuilder.append(" return count;");
        luabuilder.append(" end");
        //执行计算器自增
        luabuilder.append(" count = redis.call('incr',keys[1])");
        //从第一次调用开始限流
        luabuilder.append(" if tonumber(count) == 1 then");
        //设置过期时间
        luabuilder.append(" redis.call('expire',keys[1],argv[2])");
        luabuilder.append(" end");
        luabuilder.append(" return count;");
        return luabuilder.tostring();
    }
}

 

2.4限流切面实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
 * 描述:限流切面实现
 *
 * @author 程序员小强
 **/
@aspect
@configuration
public class ratelimitaspect {
    private static final logger logger = loggerfactory.getlogger(ratelimitaspect.class);
 
    private redisratelimitcomponent redisratelimitcomponent;
 
    @autowired
    public ratelimitaspect(redisratelimitcomponent redisratelimitcomponent) {
        this.redisratelimitcomponent = redisratelimitcomponent;
    }
 
    /**
     * 匹配所有使用以下注解的方法
     *
     * @see ratelimit
     */
    @pointcut("@annotation(com.example.ratelimit.annotation.ratelimit)")
    public void pointcut() {
    }
 
    @around("pointcut()&&@annotation(ratelimit)")
    public object logaround(proceedingjoinpoint joinpoint, ratelimit ratelimit) throws throwable {
        methodsignature signature = (methodsignature) joinpoint.getsignature();
        string methodname = signature.getmethod().getname();
 
        //组装限流key
        string ratelimitkey = this.getratelimitkey(joinpoint, ratelimit);
 
        //限流组件-通过计数方式限流
        long count = redisratelimitcomponent.ratelimit(ratelimitkey, ratelimit.time(), ratelimit.count());
        logger.debug("[ ratelimit ] method={},ratelimitkey={},count={}", methodname, ratelimitkey, count);
 
        if (null != count && count.intvalue() <= ratelimit.count()) {
            //未超过限流次数-执行业务方法
            return joinpoint.proceed();
        } else {
            //超过限流次数
            logger.info("[ ratelimit ] >> over the max request times method={},ratelimitkey={},currentcount={},ratelimitcount={}",
                    methodname, ratelimitkey, count, ratelimit.count());
            throw new ratelimitexception(ratelimit.msg());
        }
    }
 
    /**
     * 获取限流key
     * 默认取 ratelimit > key 属性值
     * 若设置了 keyfield 则从参数中获取该字段的值拼接到key中
     * 示例:user_phone_login_max_times:13235777777
     *
     * @param joinpoint
     * @param ratelimit
     */
    private string getratelimitkey(proceedingjoinpoint joinpoint, ratelimit ratelimit) {
        string fieldname = ratelimit.keyfield();
        if ("".equals(fieldname)) {
            return ratelimit.key();
        }
 
        //处理自定义-参数名-动态属性key
        stringbuilder ratelimitkeybuilder = new stringbuilder(ratelimit.key());
        for (object obj : joinpoint.getargs()) {
            if (null == obj) {
                continue;
            }
            //过滤基本类型参数
            if (reflectionutil.isbasetype(obj.getclass())) {
                continue;
            }
            //属性值
            object fieldvalue = reflectionutil.getfieldbyclazz(fieldname, obj);
            if (null != fieldvalue) {
                ratelimitkeybuilder.append(":").append(fieldvalue.tostring());
                break;
            }
        }
        return ratelimitkeybuilder.tostring();
    }
}

由于演示项目中做了统一异常处理
在限流切面这里未做异常捕获,若超过最大限流次数会抛出自定义限流异常。可以根据业务自行处理。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
 * 反射工具
 *
 * @author 程序员小强
 */
public class reflectionutil {
 
    private static final logger logger = loggerfactory.getlogger(reflectionutil.class);
 
    /**
     * 根据属性名获取属性元素,
     * 包括各种安全范围和所有父类
     *
     * @param fieldname
     * @param object
     * @return
     */
    public static object getfieldbyclazz(string fieldname, object object) {
        field field = null;
        class<?> clazz = object.getclass();
        try {
            for (; clazz != object.class; clazz = clazz.getsuperclass()) {
                try {
                    //子类中查询不到属性-继续向父类查
                    field = clazz.getdeclaredfield(fieldname);
                } catch (nosuchfieldexception ignored) {
                }
            }
            if (null == field) {
                return null;
            }
            field.setaccessible(true);
            return field.get(object);
        } catch (exception e) {
            //通过反射获取 属性值失败
            logger.error("[ reflectionutil ] >> [getfieldbyclazz] fieldname:{} ", fieldname, e);
        }
        return null;
    }
 
    /**
     * 判断对象属性是否是基本数据类型,包括是否包括string | bigdecimal
     *
     * @param clazz
     * @return
     */
    public static boolean isbasetype(class clazz) {
        if (null == clazz) {
            return false;
        }
        //基本类型
        if (clazz.isprimitive()) {
            return true;
        }
        //string
        if (clazz.equals(string.class)) {
            return true;
        }
        //integer
        if (clazz.equals(integer.class)) {
            return true;
        }
        //boolean
        if (clazz.equals(boolean.class)) {
            return true;
        }
        //bigdecimal
        if (clazz.equals(bigdecimal.class)) {
            return true;
        }
        //byte
        if (clazz.equals(byte.class)) {
            return true;
        }
        //long
        if (clazz.equals(long.class)) {
            return true;
        }
        //double
        if (clazz.equals(double.class)) {
            return true;
        }
        //float
        if (clazz.equals(float.class)) {
            return true;
        }
        //character
        if (clazz.equals(character.class)) {
            return true;
        }
        //short
        return clazz.equals(short.class);
    }
}

 

3.测试一下

基本属性已经配置好了,写个接口测试一下。

 

3.1方法限流示例

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * 计数器
 * 演示 demo 为了方便计数
 */
private static final atomicinteger counter = new atomicinteger();   
 
/**
 * 普通限流
 * <p>
 * 30 秒中,可以访问10次
 */
@requestmapping("/limittest")
@ratelimit(key = "limit-test-key", time = 30, count = 10)
public response limittest() {
    map<string, object> datamap = new hashmap<>();
    datamap.put("date", dateformatutils.format(new date(), "yyyy-mm-dd hh:mm:ss.sss"));
    datamap.put("times", counter.incrementandget());
    return response.success(datamap);
}

Redis分布式限流组件设计与使用实例

 

3.2动态入参限流示例

3.2.1场景一:5分钟内,方法最多访问10次,根据入参手机号限流

入参类

?
1
2
3
4
5
6
7
public class userphonecaptcharateparam implements serializable {
 
    private static final long serialversionuid = -1l;
 
    private string phone;
    //省略 get/set
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static final map<string, atomicinteger> count_phone_map = new hashmap<>();
 
 
/**
 * 根据手机号限流-限制验证码发送次数
 * <p>
 * 示例:5分钟内,验证码最多发送10次
 */
@requestmapping("/limitbyphone")
@ratelimit(key = "limit-phone-key", time = 300, count = 10, keyfield = "phone", msg = "5分钟内,验证码最多发送10次")
public response limitbyphone(userphonecaptcharateparam param) {
    map<string, object> datamap = new hashmap<>();
    datamap.put("date", dateformatutils.format(new date(), "yyyy-mm-dd hh:mm:ss.sss"));
    if (count_phone_map.containskey(param.getphone())) {
        count_phone_map.get(param.getphone()).incrementandget();
    } else {
        count_phone_map.put(param.getphone(), new atomicinteger(1));
    }
    datamap.put("times", count_phone_map.get(param.getphone()).intvalue());
    datamap.put("reqparam", param);
    return response.success(datamap);
}

Redis分布式限流组件设计与使用实例

3.2.2场景二:根据订单id限流

入参类

?
1
2
3
4
5
6
7
8
@data
public class orderrateparam implements serializable {
 
    private static final long serialversionuid = -1l;
 
    private string orderid;
    //省略 getset
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static final map<string, atomicinteger> count_order_map = new hashmap<>();
 
/**
 * 根据订单id限流示例
 * <p>
 * 300 秒中,可以访问10次
 */
@requestmapping("/limitbyorderid")
@ratelimit(key = "limit-order-key", time = 300, count = 10, keyfield = "orderid", msg = "订单飞走了,请稍后再试!")
public response limitbyorderid(orderrateparam param) {
    map<string, object> datamap = new hashmap<>();
    datamap.put("date", dateformatutils.format(new date(), "yyyy-mm-dd hh:mm:ss.sss"));
    if (count_order_map.containskey(param.getorderid())) {
        count_order_map.get(param.getorderid()).incrementandget();
    } else {
        count_order_map.put(param.getorderid(), new atomicinteger(1));
    }
    datamap.put("times", count_order_map.get(param.getorderid()).intvalue());
    datamap.put("reqparam", param);
    return response.success(datamap);
}

Redis分布式限流组件设计与使用实例

 

4.其它扩展

根据ip限流

在key中拼接ip即可;

 

5.源码地址

传送门

到此这篇关于redis分布式限流组件设计与使用实例的文章就介绍到这了,更多相关redis分布式限流内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://xqiangme.blog.csdn.net/article/details/107303043

延伸 · 阅读

精彩推荐
  • RedisRedis集群的5种使用方式,各自优缺点分析

    Redis集群的5种使用方式,各自优缺点分析

    Redis 多副本,采用主从(replication)部署结构,相较于单副本而言最大的特点就是主从实例间数据实时同步,并且提供数据持久化和备份策略。...

    优知学院4082021-08-10
  • RedisRedis 6.X Cluster 集群搭建

    Redis 6.X Cluster 集群搭建

    码哥带大家完成在 CentOS 7 中安装 Redis 6.x 教程。在学习 Redis Cluster 集群之前,我们需要先搭建一套集群环境。机器有限,实现目标是一台机器上搭建 6 个节...

    码哥字节15752021-04-07
  • Redis关于Redis数据库入门详细介绍

    关于Redis数据库入门详细介绍

    大家好,本篇文章主要讲的是关于Redis数据库入门详细介绍,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览...

    沃尔码6982022-01-24
  • Redis如何使用Redis锁处理并发问题详解

    如何使用Redis锁处理并发问题详解

    这篇文章主要给大家介绍了关于如何使用Redis锁处理并发问题的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学习...

    haofly4522019-11-26
  • Redis详解三分钟快速搭建分布式高可用的Redis集群

    详解三分钟快速搭建分布式高可用的Redis集群

    这篇文章主要介绍了详解三分钟快速搭建分布式高可用的Redis集群,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,...

    万猫学社4502021-07-25
  • Redis《面试八股文》之 Redis十六卷

    《面试八股文》之 Redis十六卷

    redis 作为我们最常用的内存数据库,很多地方你都能够发现它的身影,比如说登录信息的存储,分布式锁的使用,其经常被我们当做缓存去使用。...

    moon聊技术8182021-07-26
  • Redisredis缓存存储Session原理机制

    redis缓存存储Session原理机制

    这篇文章主要为大家介绍了redis缓存存储Session原理机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪...

    程序媛张小妍9252021-11-25
  • RedisRedis Template实现分布式锁的实例代码

    Redis Template实现分布式锁的实例代码

    这篇文章主要介绍了Redis Template实现分布式锁,需要的朋友可以参考下 ...

    晴天小哥哥2592019-11-18