flask 项目需要访问阿里云上部署的 kubeflow api 服务, 这个 api 参数需要使用 query 传递, 因 query 构造过长报了 413,后考虑拆分 query 并发访问服务.
大致代码抽象如下
import asyncio
import json
import re
import functools
from typing import Dict, List
import aiohttp
from loguru import logger
def kubeflow_auth_with_async(func):
"""做 kubeflow 的 auth"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with aiohttp.ClientSession() as session:
# login
payload = {
"username": conf.USERNAME,
"password": conf.PASSWORD,
}
await session.post(conf.AIX_AUTH_URL, data=payload)
# get req
# async with session.get(conf.PIPELINE_URL) as response:
# text = await response.text()
response = await session.get(conf.PIPELINE_URL)
text = await response.text()
pattern = r"/dex/auth/aix\?req="
index_beg = re.search(pattern, text).span()
index_end = text.find('"', index_beg[1])
req = text[index_beg[1] : index_end]
params = {"req": req}
# login kubeflow with aix
await session.get(conf.DEX_AUTH_URL, params=params)
return await func(session, *args, **kwargs)
return wrapper
async def async_get_runs(
session: aiohttp.ClientSession,
page_size=1,
page_token=None,
experiment_id=None,
filters=None,
sort_by="created_at desc",
):
if not filters:
filters = {}
query = {
"page_size": page_size,
"page_token": page_token,
"sort_by": sort_by,
"resource_reference_key.type": "EXPERIMENT",
"resource_reference_key.id": experiment_id,
"filter": json.dumps(filters),
}
response = await session.get(
url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
)
# async with session.get(
# url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
# ) as response:
# response.raise_for_status()
# return await response.json()
response.raise_for_status()
return await response.json()
@kubeflow_auth_with_async
async def gather_fetch_runs(
session: aiohttp.ClientSession,
runs_lst:List[str],
):
exp_id = "xxxxx"
tasks = []
for sub_runs in runs_lst:
filters: dict = {
"predicates": [
{
"key": "id",
"op": FILTER_OPERATIONS.IN.value,
"string_values": {"values": sub_runs},
},
]
}
tasks.append(
async_get_runs(
session,
page_size=len(sub_runs),
experiment_id=exp_id,
filters=filters,
)
)
return await asyncio.gather(*tasks)
res = []
for r in asyncio.run(gather_fetch_runs(["xxx","xxx","xx"])):
res.extend(r.get("runs", []))
出现的报错,不知道怎么贴图,手动概括一下异常
服务稳定运行一段时间后,会出现突然 500 刷新又可以坚挺一段时间
aiohttp.client_exceptions.ContentypTypeError
从报错信息上看像是 session 失效导致的,被 auth 重定向到了登录页. contentType 变成了 text/html
而请求的 url 重定向到了登录的 url,已经不是我传入的那个
因为没试过 async 函数的装饰器,不知道是不是这个问题,
还有就是 clientSession 的连接池是不是保存了 session 的状态. 装饰器每次都重新请求了.这个 session 按道理不应该还是一样的.
也试了,没有这个问题,不过访问次数多起来之后容易莫名 GG,大概率是被自己家部署的 kubeflow 反爬了?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.