一、背景
限流對于一個微服務架構系統來說具有非常重要的意義,否則其中的某個微服務將成為整個系統隱藏的雪崩因素,為什么這么說?
舉例來講,某個SAAS平臺有100多個微服務應用,但是作為底層的某個或某幾個應用來說,將會被所有上層應用頻繁調用,業務高峰期時,如果底層應用不做限流處理,該應用必將面臨著巨大的壓力,尤其是那些個別被高頻調用的接口來說,最直接的表現就是導致后續新進來的請求阻塞、排隊、響應超時...最后直到該服務所在JVM資源被耗盡。
二、限流概述
在大多數的微服務架構在設計之初,比如在技術選型階段,架構師會從一個全局的視角去規劃技術棧的組合,比如結合當前產品的現狀考慮是使用dubbo?還是springcloud?作為微服務治理的底層框架。甚至為了滿足快速的上線、迭代和交付,直接以springboot為基座進行開發,后續再引入新的技術棧等...
所以在談論某個業務場景具體的技術解決方案時不可一概而論,而是需要結合產品和業務的現狀綜合評估,以限流來說,在下面的不同的技術架構下具體在選擇的時候可能也不一樣。
2.1 dubbo 服務治理模式
選擇dubbo框架作為基礎服務治理對于那種偏向內部平臺的應用還是不錯的,dubbo底層走netty,這一點相比http協議來說,在一定場景下還是具有優勢的,如果選擇dubbo,在選擇限流方案上可以做如下的參考。
2.1.1 dubbo框架級限流
dubbo官方提供了完善的服務治理,能夠滿足大多數開發場景中的需求,針對限流這個場景,具體來說包括如下手段,具體的配置,可以參考官方手冊;
客戶端限流
信號量限流 (通過統計的方式)
連接數限流 (socket->tcp)
服務端限流
線程池限流 (隔離手段)
信號量限流 (非隔離手段)
接收數限流 (socket->tcp)
2.1.2 線程池設置
多線程并發操作一定離不開線程池,Dubbo自身提供了支持了四種線程池類型支持。生產者
2.1.3 集成第三方組件
如果是springboot框架的項目,可以考慮直接引入地方的組件或SDK,比如hystrix,guava,sentinel原生SDK等,如果技術實力足夠強甚至可以考慮自己造輪子。
2.2 springcloud 服務治理模式
如果你的服務治理框架選用的是springcloud或springcloud-alibaba,其框架自身的生態中已經包含了相應的限流組件,可以實現開箱即用,下面列舉幾種常用的基于springcloud框架的限流組件。
2.2.1 hystrix
Hystrix是Netflix開源的一款容錯框架,在springcloud早期推出市場的時候,作為springcloud生態中用于限流、熔斷、降級的一款組件。
Hystrix提供了限流功能,在springcloud架構的系統中,可以在網關啟用Hystrix,進行限流處理,每個微服務也可以各自啟用Hystrix進行限流。
Hystrix默認使用線程隔離模式,可以通過線程數+隊列大小進行限流,具體參數配置可以參考官網相關資料。
2.2.2 sentinel
Sentinel 號稱分布式系統的流量防衛兵,屬于springcloud-alibaba生態中的重要組件,面向分布式服務架構的流量控制組件,主要以流量為切入點,從限流、流量整形、熔斷降級、系統負載保護、熱點防護等多個維度來幫助開發者保障微服務的穩定性。
2.3 網關層限流
隨著微服務規模的增加,整個系統中很多微服務都需要實現限流這種需求時,就可以考慮在網關這一層進行限流了,通常來說,網關層的限流面向的是通用的業務,比如那些惡意的請求,爬蟲,攻擊等,簡單來說,網關層面的限流提供了一層對系統整體的保護措施。
三、常用限流策略
3.1 限流常用的算法
不管是哪種限流組件,其底層的限流實現算法大同小異,這里列舉幾種常用的限流算法以供了解。
3.1.1 令牌桶算法
令牌桶算法是目前應用最為廣泛的限流算法,顧名思義,它有以下兩個關鍵角色:
令牌 :獲取到令牌的Request才會被處理,其他Requests要么排隊要么被直接丟棄;
桶 :用來裝令牌的地方,所有Request都從這個桶里面獲取令牌

令牌桶主要涉及到2個過程,即令牌的生成,令牌的獲取
3.1.2 漏桶算法
漏桶算法的前半段和令牌桶類似,但是操作的對象不同,結合下圖進行理解。
令牌桶是將令牌放入桶里,而漏桶是將訪問請求的數據包放到桶里。同樣的是,如果桶滿了,那么后面新來的數據包將被丟棄。

3.1.3 滑動時間窗口
根據下圖,簡單描述下滑動時間窗口這種過程:
黑色大框為時間窗口,可以設定窗口時間單位為5秒,它會隨著時間推移向后滑動。我們將窗口內的時間劃分為五個小格子,每個格子代表1秒鐘,同時這個格子還包含一個計數器,用來計算在當前時間內訪問的請求數量。那么這個時間窗口內的總訪問量就是所有格子計數器累加后的數值;
比如說,我們在每一秒內有5個用戶訪問,第5秒內有10個用戶訪問,那么在0到5秒這個時間窗口內訪問量就是15。如果我們的接口設置了時間窗口內訪問上限是20,那么當時間到第六秒的時候,這個時間窗口內的計數總和就變成了10,因為1秒的格子已經退出了時間窗口,因此在第六秒內可以接收的訪問量就是20-10=10個;

滑動窗口其實也是一種計算器算法,它有一個顯著特點,當時間窗口的跨度越長時,限流效果就越平滑。打個比方,如果當前時間窗口只有兩秒,而訪問請求全部集中在第一秒的時候,當時間向后滑動一秒后,當前窗口的計數量將發生較大的變化,拉長時間窗口可以降低這種情況的發生概率
四、通用限流實現方案
拋開網關層的限流先不說,在微服務應用中,考慮到技術棧的組合,團隊人員的開發水平,以及易維護性等因素,一個比較通用的做法是,利用AOP技術+自定義注解實現對特定的方法或接口進行限流,下面基于這個思路來分別介紹下幾種常用的限流方案的實現。
4.1 基于guava限流實現
guava為谷歌開源的一個比較實用的組件,利用這個組件可以幫助開發人員完成常規的限流操作,接下來看具體的實現步驟。
4.1.1 引入guava依賴
版本可以選擇更高的或其他版本
com.google.guava guava 23.0
4.1.2 自定義限流注解
自定義一個限流用的注解,后面在需要限流的方法或接口上面只需添加該注解即可;
importjava.lang.annotation.ElementType; importjava.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; importjava.lang.annotation.Target; @Target(value=ElementType.METHOD) @Retention(value=RetentionPolicy.RUNTIME) public@interfaceRateConfigAnno{ StringlimitType(); doublelimitCount()default5d; }
4.1.3 限流AOP類
通過AOP前置通知的方式攔截添加了上述自定義限流注解的方法,解析注解中的屬性值,并以該屬性值作為guava提供的限流參數,該類為整個實現的核心所在。
importcom.alibaba.fastjson2.JSONObject; importcom.google.common.util.concurrent.RateLimiter; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Before; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.stereotype.Component; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.ServletOutputStream; importjavax.servlet.http.HttpServletResponse; importjava.io.IOException; importjava.lang.reflect.Method; importjava.util.Objects; @Aspect @Component publicclassGuavaLimitAop{ privatestaticLoggerlogger=LoggerFactory.getLogger(GuavaLimitAop.class); @Before("execution(@RateConfigAnno**(..))") publicvoidlimit(JoinPointjoinPoint){ //1、獲取當前的調用方法 MethodcurrentMethod=getCurrentMethod(joinPoint); if(Objects.isNull(currentMethod)){ return; } //2、從方法注解定義上獲取限流的類型 StringlimitType=currentMethod.getAnnotation(RateConfigAnno.class).limitType(); doublelimitCount=currentMethod.getAnnotation(RateConfigAnno.class).limitCount(); //使用guava的令牌桶算法獲取一個令牌,獲取不到先等待 RateLimiterrateLimiter=RateLimitHelper.getRateLimiter(limitType,limitCount); booleanb=rateLimiter.tryAcquire(); if(b){ System.out.println("獲取到令牌"); }else{ HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse(); JSONObjectjsonObject=newJSONObject(); jsonObject.put("success",false); jsonObject.put("msg","限流中"); try{ output(resp,jsonObject.toJSONString()); }catch(Exceptione){ logger.error("error,e:{}",e); } } } privateMethodgetCurrentMethod(JoinPointjoinPoint){ Method[]methods=joinPoint.getTarget().getClass().getMethods(); Methodtarget=null; for(Methodmethod:methods){ if(method.getName().equals(joinPoint.getSignature().getName())){ target=method; break; } } returntarget; } publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{ response.setContentType("application/json;charset=UTF-8"); ServletOutputStreamoutputStream=null; try{ outputStream=response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); }catch(IOExceptione){ e.printStackTrace(); }finally{ outputStream.flush(); outputStream.close(); } } }
其中限流的核心API即為RateLimiter這個對象,涉及到的RateLimitHelper類如下
importcom.google.common.util.concurrent.RateLimiter;
importjava.util.HashMap;
importjava.util.Map;
publicclassRateLimitHelper{
privateRateLimitHelper(){}
privatestaticMaprateMap=newHashMap<>();
publicstaticRateLimitergetRateLimiter(StringlimitType,doublelimitCount){
RateLimiterrateLimiter=rateMap.get(limitType);
if(rateLimiter==null){
rateLimiter=RateLimiter.create(limitCount);
rateMap.put(limitType,rateLimiter);
}
returnrateLimiter;
}
}
4.1.4 測試接口
下面添加一個測試接口,測試一下上面的代碼是否生效
@RestController
publicclassOrderController{
//localhost:8081/save
@GetMapping("/save")
@RateConfigAnno(limitType="saveOrder",limitCount=1)
publicStringsave(){
return"success";
}
}
在接口中為了模擬出效果,我們將參數設置的非常小,即QPS為1,可以預想當每秒請求超過1時將會出現被限流的提示,啟動工程并驗證接口,每秒1次的請求,可以正常得到結果,效果如下:

快速刷接口,將會看到下面的效果

4.2 基于sentinel限流實現
在不少同學的意識中,sentinel通常是需要結合springcloud-alibaba框架一起實用的,而且與框架集成之后,可以配合控制臺一起使用達到更好的效果,實際上,sentinel官方也提供了相對原生的SDK可供使用,接下來就以這種方式進行整合。
4.2.1 引入sentinel核心依賴包
com.alibaba.csp sentinel-core 1.8.0
4.2.2 自定義限流注解
可以根據需要,添加更多的屬性
importjava.lang.annotation.ElementType;
importjava.lang.annotation.Retention;
importjava.lang.annotation.RetentionPolicy;
importjava.lang.annotation.Target;
@Target(value=ElementType.METHOD)
@Retention(value=RetentionPolicy.RUNTIME)
public@interfaceSentinelLimitAnnotation{
StringresourceName();
intlimitCount()default5;
}
4.2.3 自定義AOP類實現限流
該類的實現思路與上述使用guava類似,不同的是,這里使用的是sentinel原生的限流相關的API,對此不夠屬性的可以查閱官方的文檔進行學習,這里就不展開來說了。
importcom.alibaba.csp.sentinel.Entry;
importcom.alibaba.csp.sentinel.SphU;
importcom.alibaba.csp.sentinel.Tracer;
importcom.alibaba.csp.sentinel.slots.block.BlockException;
importcom.alibaba.csp.sentinel.slots.block.RuleConstant;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
importorg.apache.commons.lang3.StringUtils;
importorg.aspectj.lang.JoinPoint;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.springframework.stereotype.Component;
importjava.lang.reflect.Method;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Objects;
@Aspect
@Component
publicclassSentinelMethodLimitAop{
privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){
Listrules=newArrayList<>();
FlowRulerule=newFlowRule();
//設置受保護的資源
rule.setResource(resourceName);
//設置流控規則QPS
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//設置受保護的資源閾值
rule.setCount(limitCount);
rules.add(rule);
//加載配置好的規則
FlowRuleManager.loadRules(rules);
}
@Pointcut(value="@annotation(com.congge.sentinel.SentinelLimitAnnotation)")
publicvoidrateLimit(){
}
@Around("rateLimit()")
publicObjectaround(ProceedingJoinPointjoinPoint){
//1、獲取當前的調用方法
MethodcurrentMethod=getCurrentMethod(joinPoint);
if(Objects.isNull(currentMethod)){
returnnull;
}
//2、從方法注解定義上獲取限流的類型
StringresourceName=currentMethod.getAnnotation(SentinelLimitAnnotation.class).resourceName();
if(StringUtils.isEmpty(resourceName)){
thrownewRuntimeException("資源名稱為空");
}
intlimitCount=currentMethod.getAnnotation(SentinelLimitAnnotation.class).limitCount();
initFlowRule(resourceName,limitCount);
Entryentry=null;
Objectresult=null;
try{
entry=SphU.entry(resourceName);
try{
result=joinPoint.proceed();
}catch(Throwablethrowable){
throwable.printStackTrace();
}
}catch(BlockExceptionex){
//資源訪問阻止,被限流或被降級
//在此處進行相應的處理操作
System.out.println("blocked");
return"被限流了";
}catch(Exceptione){
Tracer.traceEntry(e,entry);
}finally{
if(entry!=null){
entry.exit();
}
}
returnresult;
}
privateMethodgetCurrentMethod(JoinPointjoinPoint){
Method[]methods=joinPoint.getTarget().getClass().getMethods();
Methodtarget=null;
for(Methodmethod:methods){
if(method.getName().equals(joinPoint.getSignature().getName())){
target=method;
break;
}
}
returntarget;
}
}
4.2.4 自定義測試接口
為了模擬效果,這里將QPS的數量設置為1
//localhost:8081/limit
@GetMapping("/limit")
@SentinelLimitAnnotation(limitCount=1,resourceName="sentinelLimit")
publicStringsentinelLimit(){
return"sentinelLimit";
}
啟動工程之后,瀏覽器調用接口測試一下,每秒一個請求,可以正常通過

快速刷接口,超過每秒1次時,效果如下

這里只是為了演示出效果,建議在真實的項目中使用時,對返回結果做一個封裝。
4.3 基于redis+lua限流實現
redis是線程安全的,天然具有線程安全的特性,支持原子性操作,限流服務不僅需要承接超高QPS,還要保證限流邏輯的執行層面具備線程安全的特性,利用Redis這些特性做限流,既能保證線程安全,也能保證性能?;趓edis的限流實現完整流程如下圖:

結合上面的流程圖,這里梳理出一個整體的實現思路:
編寫lua腳本,指定入參的限流規則,比如對特定的接口限流時,可以根據某個或幾個參數進行判定,調用該接口的請求,在一定的時間窗口內監控請求次數;
既然是限流,最好能夠通用,可將限流規則應用到任何接口上,那么最合適的方式就是通過自定義注解形式切入;
提供一個配置類,被spring的容器管理,redisTemplate中提供了DefaultRedisScript這個bean;
提供一個能動態解析接口參數的類,根據接口參數進行規則匹配后觸發限流;
4.3.1 引入redis依賴
org.springframework.boot spring-boot-starter-data-redis
4.3.2 自定義注解
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public@interfaceRedisLimitAnnotation{
/**
*key
*/
Stringkey()default"";
/**
*Key的前綴
*/
Stringprefix()default"";
/**
*一定時間內最多訪問次數
*/
intcount();
/**
*給定的時間范圍單位(秒)
*/
intperiod();
/**
*限流的類型(用戶自定義key或者請求ip)
*/
LimitTypelimitType()defaultLimitType.CUSTOMER;
}
4.3.3 自定義redis配置類
importorg.springframework.context.annotation.Bean;
importorg.springframework.core.io.ClassPathResource;
importorg.springframework.data.redis.connection.RedisConnectionFactory;
importorg.springframework.data.redis.core.RedisTemplate;
importorg.springframework.data.redis.core.script.DefaultRedisScript;
importorg.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
importorg.springframework.data.redis.serializer.StringRedisSerializer;
importorg.springframework.scripting.support.ResourceScriptSource;
importorg.springframework.stereotype.Component;
importjava.io.Serializable;
@Component
publicclassRedisConfiguration{
@Bean
publicDefaultRedisScriptredisluaScript(){
DefaultRedisScriptredisScript=newDefaultRedisScript<>();
redisScript.setScriptSource(newResourceScriptSource(newClassPathResource("limit.lua")));
redisScript.setResultType(Number.class);
returnredisScript;
}
@Bean("redisTemplate")
publicRedisTemplateredisTemplate(RedisConnectionFactoryredisConnectionFactory){
RedisTemplateredisTemplate=newRedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializerjackson2JsonRedisSerializer=newJackson2JsonRedisSerializer(Object.class);
//設置value的序列化方式為JSOn
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
//設置key的序列化方式為String
redisTemplate.setKeySerializer(newStringRedisSerializer());
redisTemplate.setHashKeySerializer(newStringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
returnredisTemplate;
}
}
4.3.4 自定義限流AOP類
importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.aspectj.lang.reflect.MethodSignature; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.context.annotation.Configuration; importorg.springframework.data.redis.core.RedisTemplate; importorg.springframework.data.redis.core.script.DefaultRedisScript; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.http.HttpServletRequest; importjava.io.Serializable; importjava.lang.reflect.Method; importjava.util.Collections; importjava.util.List; @Aspect @Configuration publicclassLimitRestAspect{ privatestaticfinalLoggerlogger=LoggerFactory.getLogger(LimitRestAspect.class); @Autowired privateRedisTemplateredisTemplate; @Autowired privateDefaultRedisScript redisluaScript; @Pointcut(value="@annotation(com.congge.config.limit.RedisLimitAnnotation)") publicvoidrateLimit(){ } @Around("rateLimit()") publicObjectinterceptor(ProceedingJoinPointjoinPoint)throwsThrowable{ MethodSignaturesignature=(MethodSignature)joinPoint.getSignature(); Methodmethod=signature.getMethod(); Class>targetClass=method.getDeclaringClass(); RedisLimitAnnotationrateLimit=method.getAnnotation(RedisLimitAnnotation.class); if(rateLimit!=null){ HttpServletRequestrequest=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getRequest(); StringipAddress=getIpAddr(request); StringBufferstringBuffer=newStringBuffer(); stringBuffer.append(ipAddress).append("-") .append(targetClass.getName()).append("-") .append(method.getName()).append("-") .append(rateLimit.key()); List keys=Collections.singletonList(stringBuffer.toString()); //調用lua腳本,獲取返回結果,這里即為請求的次數 Numbernumber=redisTemplate.execute( redisluaScript, keys, rateLimit.count(), rateLimit.period() ); if(number!=null&&number.intValue()!=0&&number.intValue()<=?rateLimit.count())?{ ????????????????logger.info("限流時間段內訪問了第:{}?次",?number.toString()); ????????????????return?joinPoint.proceed(); ????????????} ????????}?else?{ ????????????return?joinPoint.proceed(); ????????} ????????throw?new?RuntimeException("訪問頻率過快,被限流了"); ????} ? ????/** ?????*?獲取請求的IP方法 ?????*?@param?request ?????*?@return ?????*/ ????private?static?String?getIpAddr(HttpServletRequest?request)?{ ????????String?ipAddress?=?null; ????????try?{ ????????????ipAddress?=?request.getHeader("x-forwarded-for"); ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getHeader("Proxy-Client-IP"); ????????????} ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getHeader("WL-Proxy-Client-IP"); ????????????} ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getRemoteAddr(); ????????????} ????????????//?對于通過多個代理的情況,第一個IP為客戶端真實IP,多個IP按照','分割 ????????????if?(ipAddress?!=?null?&&?ipAddress.length()?>15){ if(ipAddress.indexOf(",")>0){ ipAddress=ipAddress.substring(0,ipAddress.indexOf(",")); } } }catch(Exceptione){ ipAddress=""; } returnipAddress; } }
該類要做的事情和上面的兩種限流措施類似,不過在這里核心的限流是通過讀取lua腳步,通過參數傳遞給lua腳步實現的。
4.3.5 自定義lua腳本
在工程的resources目錄下,添加如下的lua腳本
localkey="rate.limit:"..KEYS[1]
locallimit=tonumber(ARGV[1])
localcurrent=tonumber(redis.call('get',key)or"0")
ifcurrent+1>limitthen
return0
else
--沒有超閾值,將當前訪問數量+1,并設置2秒過期(可根據自己的業務情況調整)
redis.call("INCRBY",key,"1")
redis.call("expire",key,"2")
returncurrent+1
end
4.3.6 添加測試接口
@RestController
publicclassRedisController{
//localhost:8081/redis/limit
@GetMapping("/redis/limit")
@RedisLimitAnnotation(key="queryFromRedis",period=1,count=1)
publicStringqueryFromRedis(){
return"success";
}
}
為了模擬效果,這里將QPS設置為1 ,啟動工程后(提前啟動redis服務),調用一下接口,正常的效果如下:

快速刷接口,超過每秒1次的請求時看到如下效果

五、自定義starter限流實現
上面通過案例介紹了幾種常用的限流實現,不過細心的同學可以看到,這些限流的實現都是在具體的工程模塊中嵌入的,事實上,在真實的微服務開發中,一個項目可能包含了眾多的微服務模塊,為了減少重復造輪子,避免每個微服務模塊中單獨實現,可以考慮將限流的邏輯實現封裝成一個SDK,即作為一個springboot的starter的方式被其他微服務模塊進行引用即可。這也是目前很多生產實踐中比較通用的做法,接下來看看具體的實現吧。
5.1 前置準備
創建一個空的springboot工程,工程目錄結構如下圖,目錄說明:
annotation:存放自定義的限流相關的注解;
aop:存放不同的限流實現,比如基于guava的aop,基于sentinel的aop實現等;
spring.factories:自定義待裝配的aop實現類;

5.2 代碼整合完成步驟
5.2.1 導入基礎的依賴
這里包括如下幾個必須的依賴,其他的依賴可以結合自身的情況合理選擇;
spring-boot-starter;
guava;
spring-boot-autoconfigure;
sentinel-core;
org.springframework.boot spring-boot-starter-parent 2.2.1.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-aop log4j log4j 1.2.17 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.projectlombok lombok com.google.guava guava 23.0 org.springframework.boot spring-boot-autoconfigure 2.2.1.RELEASE org.springframework.boot spring-boot-configuration-processor 2.2.1.RELEASE com.alibaba.csp sentinel-core 1.8.0 org.apache.commons commons-lang3 3.4 com.alibaba.fastjson2 fastjson2 2.0.22 src/main/resources **/**
5.2.2 自定義注解
目前該SDK支持三種限流方式,即后續其他微服務工程中可以通過添加這3種注解即可實現限流,分別是基于guava的令牌桶,基于sentinel的限流,基于java自帶的Semaphore限流,三個自定義注解類如下:
令牌桶
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public@interfaceTokenBucketLimiter{
intvalue()default50;
}
Semaphore
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public@interfaceShLimiter{
intvalue()default50;
}
sentinel
@Target(value=ElementType.METHOD)
@Retention(value=RetentionPolicy.RUNTIME)
public@interfaceSentinelLimiter{
StringresourceName();
intlimitCount()default50;
}
5.2.3 限流實現AOP類
具體的限流在AOP中進行實現,思路和上一章節類似,即通過環繞通知的方式,先解析那些添加了限流注解的方法,然后解析里面的參數,進行限流的業務實現。
基于guava的aop實現
importcom.alibaba.fastjson2.JSONObject;
importcom.congge.annotation.TokenBucketLimiter;
importcom.google.common.util.concurrent.RateLimiter;
importlombok.extern.slf4j.Slf4j;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.springframework.cglib.core.ReflectUtils;
importorg.springframework.stereotype.Component;
importorg.springframework.web.context.request.RequestContextHolder;
importorg.springframework.web.context.request.ServletRequestAttributes;
importjavax.servlet.ServletOutputStream;
importjavax.servlet.http.HttpServletResponse;
importjava.io.IOException;
importjava.lang.reflect.Method;
importjava.util.Arrays;
importjava.util.Map;
importjava.util.concurrent.ConcurrentHashMap;
@Aspect
@Component
@Slf4j
publicclassGuavaLimiterAop{
privatefinalMaprateLimiters=newConcurrentHashMap();
@Pointcut("@annotation(com.congge.annotation.TokenBucketLimiter)")
publicvoidaspect(){
}
@Around(value="aspect()")
publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{
log.debug("準備限流");
Objecttarget=point.getTarget();
StringtargetName=target.getClass().getName();
StringmethodName=point.getSignature().getName();
Object[]arguments=point.getArgs();
Class>targetClass=Class.forName(targetName);
Class>[]argTypes=ReflectUtils.getClasses(arguments);
Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes);
//獲取目標method上的限流注解@Limiter
TokenBucketLimiterlimiter=method.getAnnotation(TokenBucketLimiter.class);
RateLimiterrateLimiter=null;
Objectresult=null;
if(null!=limiter){
//以class+method+parameters為key,避免重載、重寫帶來的混亂
Stringkey=targetName+"."+methodName+Arrays.toString(argTypes);
rateLimiter=rateLimiters.get(key);
if(null==rateLimiter){
//獲取限定的流量
//為了防止并發
rateLimiters.putIfAbsent(key,RateLimiter.create(limiter.value()));
rateLimiter=rateLimiters.get(key);
}
booleanb=rateLimiter.tryAcquire();
if(b){
log.debug("得到令牌,準備執行業務");
result=point.proceed();
}else{
HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse();
JSONObjectjsonObject=newJSONObject();
jsonObject.put("success",false);
jsonObject.put("msg","限流中");
try{
output(resp,jsonObject.toJSONString());
}catch(Exceptione){
log.error("error,e:{}",e);
}
}
}else{
result=point.proceed();
}
log.debug("退出限流");
returnresult;
}
publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{
response.setContentType("application/json;charset=UTF-8");
ServletOutputStreamoutputStream=null;
try{
outputStream=response.getOutputStream();
outputStream.write(msg.getBytes("UTF-8"));
}catch(IOExceptione){
e.printStackTrace();
}finally{
outputStream.flush();
outputStream.close();
}
}
}
基于Semaphore的aop實現
importcom.congge.annotation.ShLimiter;
importlombok.extern.slf4j.Slf4j;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.cglib.core.ReflectUtils;
importorg.springframework.stereotype.Component;
importjava.lang.reflect.Method;
importjava.util.Arrays;
importjava.util.Map;
importjava.util.concurrent.ConcurrentHashMap;
importjava.util.concurrent.Semaphore;
@Aspect
@Component
@Slf4j
publicclassSemaphoreLimiterAop{
privatefinalMapsemaphores=newConcurrentHashMap();
privatefinalstaticLoggerLOG=LoggerFactory.getLogger(SemaphoreLimiterAop.class);
@Pointcut("@annotation(com.congge.annotation.ShLimiter)")
publicvoidaspect(){
}
@Around(value="aspect()")
publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{
log.debug("進入限流aop");
Objecttarget=point.getTarget();
StringtargetName=target.getClass().getName();
StringmethodName=point.getSignature().getName();
Object[]arguments=point.getArgs();
Class>targetClass=Class.forName(targetName);
Class>[]argTypes=ReflectUtils.getClasses(arguments);
Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes);
//獲取目標method上的限流注解@Limiter
ShLimiterlimiter=method.getAnnotation(ShLimiter.class);
Objectresult=null;
if(null!=limiter){
//以class+method+parameters為key,避免重載、重寫帶來的混亂
Stringkey=targetName+"."+methodName+Arrays.toString(argTypes);
//獲取限定的流量
Semaphoresemaphore=semaphores.get(key);
if(null==semaphore){
semaphores.putIfAbsent(key,newSemaphore(limiter.value()));
semaphore=semaphores.get(key);
}
try{
semaphore.acquire();
result=point.proceed();
}finally{
if(null!=semaphore){
semaphore.release();
}
}
}else{
result=point.proceed();
}
log.debug("退出限流");
returnresult;
}
}
基于sentinel的aop實現
importcom.alibaba.csp.sentinel.Entry;
importcom.alibaba.csp.sentinel.SphU;
importcom.alibaba.csp.sentinel.Tracer;
importcom.alibaba.csp.sentinel.slots.block.BlockException;
importcom.alibaba.csp.sentinel.slots.block.RuleConstant;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
importcom.congge.annotation.SentinelLimiter;
importorg.apache.commons.lang3.StringUtils;
importorg.aspectj.lang.JoinPoint;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.springframework.stereotype.Component;
importjava.lang.reflect.Method;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Objects;
@Aspect
@Component
publicclassSentinelLimiterAop{
privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){
Listrules=newArrayList<>();
FlowRulerule=newFlowRule();
//設置受保護的資源
rule.setResource(resourceName);
//設置流控規則QPS
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//設置受保護的資源閾值
rule.setCount(limitCount);
rules.add(rule);
//加載配置好的規則
FlowRuleManager.loadRules(rules);
}
@Pointcut(value="@annotation(com.congge.annotation.SentinelLimiter)")
publicvoidrateLimit(){
}
@Around("rateLimit()")
publicObjectaround(ProceedingJoinPointjoinPoint){
//1、獲取當前的調用方法
MethodcurrentMethod=getCurrentMethod(joinPoint);
if(Objects.isNull(currentMethod)){
returnnull;
}
//2、從方法注解定義上獲取限流的類型
StringresourceName=currentMethod.getAnnotation(SentinelLimiter.class).resourceName();
if(StringUtils.isEmpty(resourceName)){
thrownewRuntimeException("資源名稱為空");
}
intlimitCount=currentMethod.getAnnotation(SentinelLimiter.class).limitCount();
initFlowRule(resourceName,limitCount);
Entryentry=null;
Objectresult=null;
try{
entry=SphU.entry(resourceName);
try{
result=joinPoint.proceed();
}catch(Throwablethrowable){
throwable.printStackTrace();
}
}catch(BlockExceptionex){
//資源訪問阻止,被限流或被降級
//在此處進行相應的處理操作
System.out.println("blocked");
return"被限流了";
}catch(Exceptione){
Tracer.traceEntry(e,entry);
}finally{
if(entry!=null){
entry.exit();
}
}
returnresult;
}
privateMethodgetCurrentMethod(JoinPointjoinPoint){
Method[]methods=joinPoint.getTarget().getClass().getMethods();
Methodtarget=null;
for(Methodmethod:methods){
if(method.getName().equals(joinPoint.getSignature().getName())){
target=method;
break;
}
}
returntarget;
}
}
5.2.4 配置自動裝配AOP實現
在resources目錄下創建上述的spring.factories文件,內容如下,通過這種方式配置后,其他應用模塊引入了當前的SDK的jar之后,就可以實現開箱即用了;
org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.congge.aop.SemaphoreLimiterAop, com.congge.aop.GuavaLimiterAop, com.congge.aop.SemaphoreLimiterAop
5.2.5 將工程打成jar進行安裝
這一步比較簡單就跳過了

5.2.6 在其他的工程中引入上述SDK
cm.congge biz-limit 1.0-SNAPSHOT
5.2.7 編寫測試接口
在其他工程中,編寫一個測試接口,并使用上面的注解,這里以guava的限流注解為例進行說明
importcom.congge.annotation.TokenBucketLimiter;
importorg.springframework.web.bind.annotation.GetMapping;
importorg.springframework.web.bind.annotation.RestController;
@RestController
publicclassSdkController{
//localhost:8081/query
@GetMapping("/query")
@TokenBucketLimiter(1)
publicStringqueryUser(){
return"queryUser";
}
}
5.2.8 功能測試
啟動當前的工程后,正常調用接口,每秒一次的請求,可以正常得到結果

快速刷接口,QPS超過1之后,將會觸發限流,看到如下效果

通過上面這種方式,也可以得到預期的效果,其他兩種限流注解有興趣的同學也可以繼續測試驗證,篇幅原因就不再贅述了。
上述通過starter的方式實現了一種更優雅的限流集成方式,也是生產中比較推薦的一種方式,不過當前的案例還比較粗糙,需要使用的同學還需根據自己的情況完善里面的邏輯,進一步的封裝以期得到更好的效果。
審核編輯:劉清
-
接收機
+關注
關注
9文章
1244瀏覽量
56232 -
計數器
+關注
關注
32文章
2316瀏覽量
98180 -
JVM
+關注
關注
0文章
161瀏覽量
13037 -
QPS
+關注
關注
0文章
24瀏覽量
9085 -
負載保護器
+關注
關注
0文章
4瀏覽量
5482 -
SpringBoot
+關注
關注
0文章
177瀏覽量
688
原文標題:SpringBoot 通用限流方案(VIP珍藏版)
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
SpringBoot應用啟動運行run方法
Springboot是如何獲取自定義異常并進行返回的
Spring Boot 系列(八)@ControllerAdvice 攔截異常并統一處理
關于SpringBoot如何優雅的全局異常處理
公司這套架構統一處理try...catch真香!
什么是 SpringBoot?
SpringBoot統一功能處理
SpringBoot攔截器與統一功能處理實戰
springboot統一異常處理
評論