V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
lieh222
V2EX  ›  Python

求解协程+多进程的正确使用姿势

  •  
  •   lieh222 · 2018-08-06 17:00:59 +08:00 · 3715 次点击
    这是一个创建于 2342 天前的主题,其中的信息可能已经有所发展或是发生改变。

    用 asyncio 做了一个 UDP 传输性能测试工具,目前单进程服务端性能不够,流量大的时候处理不过来,服务端用的 asyncio.DatagramProtocol,怎么变成多进程的呢?试试了一下抢占式的写法,运行报错了,运行起来也只有一个进程工作,上代码


    import asyncio
    import time
    import socket
    from multiprocessing import Process
    
    loop = asyncio.get_event_loop()
    size = 0
    
    class ServerProtocol(asyncio.DatagramProtocol):
    
        def connection_made(self, transport):
            self.transport = transport
    
        def datagram_received(self, data, addr, args=None):
            global size
            data = data.decode()
            message = data
            index = data.find('\n')
            if index > 0:
                filename = data[0:index]
                data = data[index+1::]
                size += len(data)
            task = self.WriteToFile(filename, data)
            asyncio.run_coroutine_threadsafe(task, loop)
    
        async def WriteToFile(self, f, data):
            await asyncio.sleep(1)
            return True
    
    async def print_size():
        global size
        while True:
            await asyncio.sleep(1)
            print(size)
    
    def start(sock):
        listen = loop.create_datagram_endpoint(
            ServerProtocol,
            sock = sock
        )
        transport, protocol = loop.run_until_complete(listen)
    
        task = print_size()
        asyncio.run_coroutine_threadsafe(task, loop)
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        transport.close()
        loop.close()
        sock.close()
    
    if __name__ == '__main__':
        print("Starting UDP server")
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.bind(('0.0.0.0', 9873))
        for i in range(1):
            t = Process(target=start, args=(sock,))
            t.deamon = True
            t.start()
        start(sock)
    

    报错信息

    Exception in callback BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)
    handle: <Handle BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)>
    Traceback (most recent call last):
      File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
        key = self._selector.get_key(fd)
      File "/usr/local/python36/lib/python3.6/selectors.py", line 191, in get_key
        raise KeyError("{!r} is not registered".format(fileobj)) from None
    KeyError: '6 is not registered'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/python36/lib/python3.6/asyncio/events.py", line 145, in _run
        self._callback(*self._args)
      File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 267, in _add_reader
        (handle, None))
      File "/usr/local/python36/lib/python3.6/selectors.py", line 412, in register
        self._epoll.register(key.fd, epoll_events)
    FileExistsError: [Errno 17] File exists
    
    3 条回复    2018-08-07 10:50:13 +08:00
    lieh222
        1
    lieh222  
    OP
       2018-08-07 08:39:57 +08:00
    这。。。咋没人呢
    lolizeppelin
        2
    lolizeppelin  
       2018-08-07 09:00:04 +08:00 via Android
    把 fork 搞明白

    一多进城就 multiprocessing
    有没有想过要看 multiprocessing 的源码?
    lieh222
        3
    lieh222  
    OP
       2018-08-07 10:50:13 +08:00
    @lolizeppelin 感谢回复,已经解决,因为多进程中共用了一个事件循环,add_reader 重复注册了 socket.fileno,所以报错了,用 new_event_loop 解决,与 fork、mutiprocessing 的用法和是否看了源码没有关系,另外我认为像 fork,mutiprocessing 这种系统接口函数和系统接口的高级封装直接用就是了,除非遇到相关必须要解决的问题和抱着学习的目的,没有必要看这种源码
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1662 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 16:49 · PVG 00:49 · LAX 08:49 · JFK 11:49
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.