自学内容网 自学内容网

FastApi接口限流

  • 有个接口内容信息比较敏感,遂设置限流操作
    通常的限流操作就包括对某个用户或者某个IP进行限流,每分钟或者每天最多能访问多少次之类的等等。
  • FaslApi中的限流操作开源库SlowApi https://github.com/laurentS/slowapi
    这个库是根据flask-limiter改编而来,应该也是在FastApi中使用人数最多的一个库,但是感觉网上的资料比较少,我仔细看了一下,这个库本身所支持的功能也比较单一,很多个性化的操作还是需要自己来写。
    比如想根据用户类型来进行限流操作分级,官方没有提供支持,这个issue里个老哥实现了。https://github.com/laurentS/slowapi/issues/13
    还有一个问题是虽然默认支持内存计数,但是FastApi开多个workers跑时每个进程都会有自己独立的limit。想实现总体上的限流要加入redis作为backend。
  • SlowApi的简单用法
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI
from starlette.requests import Request
from setting import config, env
from extensions import logger


def get_real_remote_address(request: Request) -> str:
    """
    Returns the ip address for the current request (or 127.0.0.1 if none found)
    """
    real_ip = request.client.host
    if request and request.headers.getlist("X-Forwarded-For"):
        real_ip = request.headers.getlist("X-Forwarded-For")[0]
    logger.info(f"limit_ip: {real_ip}")
    return real_ip


limiter = Limiter(key_func=get_real_remote_address, storage_uri=config[env].LIMIT_STORAGE)

app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# 接口中
@router.post("/query/detail")
@limiter.limit("200/day")
@limiter.limit("10/minute")
async def get_file_detail(
    request: Request
):

注意Request是必填的,看了一下他的源码,简单来说是通过一个key来生成一个计数器,超过设定的计数即返回code: 429。

  • 甚至可以模仿其操作来实现更加自由的自定义限流
    先准备好Redis,主要是使用Redis中的setex()命令。
import redis
from setting import config, env
from extensions import logger


def create_redis():
    return redis.ConnectionPool(
        host='10.10.10.00',
        port=6379,
        password=111111,
        db=5 if env == "production" else 4,
        decode_responses=True
    )


pool = create_redis()

接口中定义

def get_redis():
    return redis.Redis(connection_pool=pool)


@router.post("/query/detail")
@limiter.limit("200/day")
@limiter.limit("10/minute")
async def get_file_detail(
    request: Request,
    cache=Depends(get_redis)
):
   if not cache.get("LIMITS:total_limit_m"):
       cache.setex('LIMITS:total_limit_m', 60, 0)
   else:
       r_times = int(cache.get("LIMITS:total_limit_m"))
       if r_times > 20:
           return {"msg": "too many requests!"}
       ttl = cache.ttl("LIMITS:total_limit_m")
       cache.setex("LIMITS:total_limit_m", int(ttl), str(r_times + 1))

这样就实现了一个非常简单的限流器,生产环境慎用。严谨的实现要实现其原子操作,这个到后面有时间再填坑。

  • 一些不错的资料

https://blog.csdn.net/cqcre/article/details/124440748
https://developer.aliyun.com/article/974481
https://github.com/jintaekimmm/fastapi-simple-rate-limiter
https://blog.csdn.net/km_bandits/article/details/135568339


原文地址:https://blog.csdn.net/wyh1618/article/details/140700764

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!