请教 aiohttp clientSession 认证过期带来的重定向问题

2022-01-12 10:51:06 +08:00
 tomtao00001

使用 aiohttp 的背景:

flask 项目需要访问阿里云上部署的 kubeflow api 服务, 这个 api 参数需要使用 query 传递, 因 query 构造过长报了 413,后考虑拆分 query 并发访问服务.

尝试 1 aiphttp + asyncio

大致代码抽象如下

import asyncio
import json
import re
import functools
from typing import Dict, List

import aiohttp
from loguru import logger


def kubeflow_auth_with_async(func):
    """做 kubeflow 的 auth"""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):

        async with aiohttp.ClientSession() as session:
            # login
            payload = {
                "username": conf.USERNAME,
                "password": conf.PASSWORD,
            }
            await session.post(conf.AIX_AUTH_URL, data=payload)
            # get req
            # async with session.get(conf.PIPELINE_URL) as response:
            #     text = await response.text()
            response = await session.get(conf.PIPELINE_URL)
            text = await response.text()
            pattern = r"/dex/auth/aix\?req="
            index_beg = re.search(pattern, text).span()
            index_end = text.find('"', index_beg[1])
            req = text[index_beg[1] : index_end]
            params = {"req": req}
            # login kubeflow with aix
            await session.get(conf.DEX_AUTH_URL, params=params)

            return await func(session, *args, **kwargs)

    return wrapper


async def async_get_runs(
    session: aiohttp.ClientSession,
    page_size=1,
    page_token=None,
    experiment_id=None,
    filters=None,
    sort_by="created_at desc",
):

    if not filters:
        filters = {}

    query = {
        "page_size": page_size,
        "page_token": page_token,
        "sort_by": sort_by,
        "resource_reference_key.type": "EXPERIMENT",
        "resource_reference_key.id": experiment_id,
        "filter": json.dumps(filters),
    }
    response = await session.get(
        url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
    )  

    # async with session.get(
    #     url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
    # ) as response:
    # response.raise_for_status()
    # return await response.json()

    response.raise_for_status()
    return await response.json()


@kubeflow_auth_with_async
async def gather_fetch_runs(
    session: aiohttp.ClientSession,
    runs_lst:List[str],
):
    exp_id = "xxxxx"
    tasks = []
    for sub_runs in runs_lst:
        filters: dict = {
            "predicates": [
                {
                    "key": "id",
                    "op": FILTER_OPERATIONS.IN.value,
                    "string_values": {"values": sub_runs}, 
                },
            ]
        }
        tasks.append(
            async_get_runs(
                session,
                page_size=len(sub_runs),
                experiment_id=exp_id,
                filters=filters,
            )
        )

    return await asyncio.gather(*tasks)


res = []
for r in asyncio.run(gather_fetch_runs(["xxx","xxx","xx"])):
    res.extend(r.get("runs", []))

请教 V 站大佬

出现的报错,不知道怎么贴图,手动概括一下异常

服务稳定运行一段时间后,会出现突然 500 刷新又可以坚挺一段时间
aiohttp.client_exceptions.ContentypTypeError
从报错信息上看像是 session 失效导致的,被 auth 重定向到了登录页. contentType 变成了 text/html
而请求的 url 重定向到了登录的 url,已经不是我传入的那个

因为没试过 async 函数的装饰器,不知道是不是这个问题,
还有就是 clientSession 的连接池是不是保存了 session 的状态. 装饰器每次都重新请求了.这个 session 按道理不应该还是一样的.

尝试 2 gevent + request

也试了,没有这个问题,不过访问次数多起来之后容易莫名 GG,大概率是被自己家部署的 kubeflow 反爬了?

1872 次点击
所在节点    Python
4 条回复
amlee
2022-01-15 06:48:07 +08:00
你可能是想在装饰函数里面获取 token ,然后 session 发出的每次请求的请求头都要带上这个 token ?
如果我理解没错的话,装饰器里面的 wrapper 只执行一次的,你可以看看服务端的 token 过期时间。
tomtao00001
2022-01-17 15:17:08 +08:00
@amlee emm 服务器每次获取请求后, 内部实现都会用这个装饰器, 您指的 wrapper 执行一次, 我不是很理解, 表达的是 wrapper 函数外的作用域只执行一次么? 如果是这个意思 , 对于目前这个方式来讲 它应该是不影响的对么? 不知道我理解的对不对.
amlee
2022-01-18 19:40:41 +08:00
@tomtao00001 之前看你代码不仔细,我上面那个回答是错误的,不要理会上面那个回答了。

我重新读了一遍你的代码,你说的“每次获取请求,内部函数都会用这个装饰器”跟你的代码行为有误差。

你通过 gather_fetch_runs 构建一组了协程对象并发执行,但这一组协程对象通过 gather_fetch_runs 的 @kubeflow_auth_with_async 装饰器赋予了同一个 session 对象。当这一组协程并发运行足够长的时间,登录会超时。而你的登录状态是保存在同一个 session 中的,这一组协程共同使用这个 session ,所以登录超时以后这个 session 失效,你这一组并发的协程也会请求失败。

而你说的刷新又好了的情况,或许是重新运行了一遍 gather_fetch_runs ,这会导致重新构建一组协程,重新构建一个已登录的 session
tomtao00001
2022-01-19 17:15:03 +08:00
@amlee 好的 非常感谢回复 最后还是换了一种方式 这个就直接放弃了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/827759

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX