V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
ohayoo
V2EX  ›  Python

新手求助一个蛋疼的问题

  •  
  •   ohayoo · 2023-12-18 11:31:11 +08:00 · 1453 次点击
    这是一个创建于 370 天前的主题,其中的信息可能已经有所发展或是发生改变。

    之前自己遇到简单的需求需要异步并发的时候,经过 v2 大佬的指点之后用的 asyncio.Queue + aiohttp 搞定了,大概步骤是这样:(在下不是专业程序员,可能写的东西看起来像玩具,大佬们见谅)

    async def worker(session):
        while True:
            uri = await queue.get()
            uri, status_code = await delete_url(uri, session)
            ......
    
    
    async def delete_url(uri, session):
        ......
        async with session.delete(url, headers=headers) as response:
            status_code = response.status
            return uri, status_code
    
    
    async def main():
        async with aiohttp.ClientSession() as session:
            tasks = []
            for _ in range(max_worker):
                task = asyncio.create_task(worker(session))
                ......
    

    早几天看到有个大佬编的那个 [ Python 潮流周刊] 里面推荐一个异步队列 saq

    github 地址: https://github.com/tobymao/saq

    由于没玩过异步任务队列,就很想试试,结果遇到这样一个问题

    async def delete_url(ctx, *, uri, session):
        async with session.delete(url, headers=headers) as response:
            status_code = response.status
            return uri, status_code
    
    
    
    settings = {
        "functions": [delete_url],
        "concurrency": 50
    }
    
    
    async def main():
        async with aiohttp.ClientSession() as session:
            with open(filename, 'r') as f:
                for line in f:
                    uri = line.strip()
                    job = await queue.enqueue("delete_url", uri=uri, session=session)
    

    session=session 不能这样传参,因为 saq 的这个 queue.enqueue 只能接收可序列化的作为参数, 而 aiohttp.ClientSession 不是一个可以 JSON 序列化的。这个 session 又不能写全局变量,要 session 共享的话,又不能写在 delete_url 函数里面,想问问大佬,这种情况要咋处理啊?

    NessajCN
        1
    NessajCN  
       2023-12-18 11:39:57 +08:00
    写个类,session 是成员变量,delete_url 是成员函数
    fzzff
        2
    fzzff  
       2023-12-18 13:14:57 +08:00
    session 为什么不能写为全局变量?
    Vegetable
        3
    Vegetable  
       2023-12-18 15:18:51 +08:00   ❤️ 1
    草草看了一下这个 saq ,他通过 redis 通信,和你本身实现的异步并不搭配,如果你之前能用 asyncio.queue ,证明你本来只是一个单进程的程序,这时候你在程序内部共享 session 是不错的做法。
    但是 saq 通过 redis 通信,就意味着他的设计目的是实现分布式结构,分布式想共享对象就需要序列化了。但分布式之下共享 session 这个事儿其实没什么必要做了,至少不是一个好的方案。
    ClericPy
        4
    ClericPy  
       2023-12-18 21:56:26 +08:00
    消费者那边每个进程共享一个 Session
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   833 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 21:18 · PVG 05:18 · LAX 13:18 · JFK 16:18
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.