深入理解 tornado 之 底层 ioloop 实现(二)

2016-06-06 21:14:50 +08:00
 rapospectre
def configurable_default(cls):
        """Returns the implementation class to be used if none is configured."""
        raise NotImplementedError()

显然也是个接口,那么我们再回头看 ioloop 的 configurable_default():

def configurable_default(cls):
        if hasattr(select, "epoll"):
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop
        if hasattr(select, "kqueue"):
            # Python 2.6+ on BSD or Mac
            from tornado.platform.kqueue import KQueueIOLoop
            return KQueueIOLoop
        from tornado.platform.select import SelectIOLoop
        return SelectIOLoop

原来这是个工厂函数,根据不同的操作系统返回不同的事件池( linux 就是 epoll , mac 返回 kqueue ,其他就返回普通的 select 。 kqueue 基本等同于 epoll , 只是不同系统对其的不同实现)

现在线索转移到了 tornado.platform.epoll.EPollIOLoop 上,我们再来看看 EPollIOLoop:

tornado.platform.epoll.EPollIOLoop

import select

from tornado.ioloop import PollIOLoop


class EPollIOLoop(PollIOLoop):
    def initialize(self, **kwargs):
        super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)

EPollIOLoop 完全继承自 PollIOLoop注意这里是 PollIOLoop 不是 IOLoop)并只是在初始化时指定了 impl 是 epoll ,所以看起来我们用 IOLoop 初始化最后初始化的其实就是这个 PollIOLoop,所以接下来,我们真正需要理解和阅读的内容应该都在这里:

tornado.ioloop.PollIOLoop

class PollIOLoop(IOLoop):
    """Base class for IOLoops built around a select-like function.

    For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
    (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
    `tornado.platform.select.SelectIOLoop` (all platforms).
    """
    def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)
        self._impl = impl
        if hasattr(self._impl, 'fileno'):
            set_close_exec(self._impl.fileno())
        self.time_func = time_func or time.time
        self._handlers = {}
        self._events = {}
        self._callbacks = []
        self._callback_lock = threading.Lock()
        self._timeouts = []
        self._cancellations = 0
        self._running = False
        self._stopped = False
        self._closing = False
        self._thread_ident = None
        self._blocking_signal_threshold = None
        self._timeout_counter = itertools.count()

        # Create a pipe that we send bogus data to when we want to wake
        # the I/O loop when it is idle
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)

    def close(self, all_fds=False):
        with self._callback_lock:
            self._closing = True
        self.remove_handler(self._waker.fileno())
        if all_fds:
            for fd, handler in self._handlers.values():
                self.close_fd(fd)
        self._waker.close()
        self._impl.close()
        self._callbacks = None
        self._timeouts = None

    def add_handler(self, fd, handler, events):
        fd, obj = self.split_fd(fd)
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        fd, obj = self.split_fd(fd)
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        fd, obj = self.split_fd(fd)
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except Exception:
            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

    def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)

    def start(self):
        ...

        try:
            while True:
                # Prevent IO event starvation by delaying new callbacks
                # to the next iteration of the event loop.
                with self._callback_lock:
                    callbacks = self._callbacks
                    self._callbacks = []

                # Add any timeouts that have come due to the callback list.
                # Do not run anything until we have determined which ones
                # are ready, so timeouts that call add_timeout cannot
                # schedule anything in this iteration.
                due_timeouts = []
                if self._timeouts:
                    now = self.time()
                    while self._timeouts:
                        if self._timeouts[0].callback is None:
                            # The timeout was cancelled.  Note that the
                            # cancellation check is repeated below for timeouts
                            # that are cancelled by another timeout or callback.
                            heapq.heappop(self._timeouts)
                            self._cancellations -= 1
                        elif self._timeouts[0].deadline <= now:
                            due_timeouts.append(heapq.heappop(self._timeouts))
                        else:
                            break
                    if (self._cancellations > 512
                            and self._cancellations > (len(self._timeouts) >> 1)):
                        # Clean up the timeout queue when it gets large and it's
                        # more than half cancellations.
                        self._cancellations = 0
                        self._timeouts = [x for x in self._timeouts
                                          if x.callback is not None]
                        heapq.heapify(self._timeouts)

                for callback in callbacks:
                    self._run_callback(callback)
                for timeout in due_timeouts:
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)
                # Closures may be holding on to a lot of memory, so allow
                # them to be freed before we go into our poll wait.
                callbacks = callback = due_timeouts = timeout = None

                if self._callbacks:
                    # If any callbacks or timeouts called add_callback,
                    # we don't want to wait in poll() before we run them.
                    poll_timeout = 0.0
                elif self._timeouts:
                    # If there are any timeouts, schedule the first one.
                    # Use self.time() instead of 'now' to account for time
                    # spent running callbacks.
                    poll_timeout = self._timeouts[0].deadline - self.time()
                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                else:
                    # No timeouts and no callbacks, so use the default.
                    poll_timeout = _POLL_TIMEOUT

                if not self._running:
                    break

                if self._blocking_signal_threshold is not None:
                    # clear alarm so it doesn't fire while poll is waiting for
                    # events.
                    signal.setitimer(signal.ITIMER_REAL, 0, 0)

                try:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    # Depending on python version and IOLoop implementation,
                    # different exception types may be thrown and there are
                    # two ways EINTR might be signaled:
                    # * e.errno == errno.EINTR
                    # * e.args is like (errno.EINTR, 'Interrupted system call')
                    if errno_from_exception(e) == errno.EINTR:
                        continue
                    else:
                        raise

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)

                # Pop one fd at a time from the set of pending fds and run
                # its handler. Since that handler may perform actions on
                # other file descriptors, there may be reentrant calls to
                # this IOLoop that update self._events
                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()
                    try:
                        fd_obj, handler_func = self._handlers[fd]
                        handler_func(fd_obj, events)
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:
                            # Happens when the client closes the connection
                            pass
                        else:
                            self.handle_callback_exception(self._handlers.get(fd))
                    except Exception:
                        self.handle_callback_exception(self._handlers.get(fd))
                fd_obj = handler_func = None

        finally:
            # reset the stopped flag so another start/stop pair can be issued
            self._stopped = False
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
            IOLoop._current.instance = old_current
            if old_wakeup_fd is not None:
                signal.set_wakeup_fd(old_wakeup_fd)

    def stop(self):
        self._running = False
        self._stopped = True
        self._waker.wake()

    def time(self):
        return self.time_func()

    def call_at(self, deadline, callback, *args, **kwargs):
        timeout = _Timeout(
            deadline,
            functools.partial(stack_context.wrap(callback), *args, **kwargs),
            self)
        heapq.heappush(self._timeouts, timeout)
        return timeout

    def remove_timeout(self, timeout):
        # Removing from a heap is complicated, so just leave the defunct
        # timeout object in the queue (see discussion in
        # http://docs.python.org/library/heapq.html).
        # If this turns out to be a problem, we could add a garbage
        # collection pass whenever there are too many dead timeouts.
        timeout.callback = None
        self._cancellations += 1

    def add_callback(self, callback, *args, **kwargs):
        with self._callback_lock:
            if self._closing:
                raise RuntimeError("IOLoop is closing")
            list_empty = not self._callbacks
            self._callbacks.append(functools.partial(
                stack_context.wrap(callback), *args, **kwargs))
            if list_empty and thread.get_ident() != self._thread_ident:
                # If we're in the IOLoop's thread, we know it's not currently
                # polling.  If we're not, and we added the first callback to an
                # empty list, we may need to wake it up (it may wake up on its
                # own, but an occasional extra wake is harmless).  Waking
                # up a polling IOLoop is relatively expensive, so we try to
                # avoid it when we can.
                self._waker.wake()

    def add_callback_from_signal(self, callback, *args, **kwargs):
        with stack_context.NullContext():
            if thread.get_ident() != self._thread_ident:
                # if the signal is handled on another thread, we can add
                # it normally (modulo the NullContext)
                self.add_callback(callback, *args, **kwargs)
            else:
                # If we're on the IOLoop's thread, we cannot use
                # the regular add_callback because it may deadlock on
                # _callback_lock.  Blindly insert into self._callbacks.
                # This is safe because the GIL makes list.append atomic.
                # One subtlety is that if the signal interrupted the
                # _callback_lock block in IOLoop.start, we may modify
                # either the old or new version of self._callbacks,
                # but either way will work.
                self._callbacks.append(functools.partial(
                    stack_context.wrap(callback), *args, **kwargs))

果然, PollIOLoop 继承自 IOLoop 并实现了它的所有接口,现在我们终于可以进入真正的正题了:joy:

ioloop 分析

首先要看的是关于 epoll 操作的方法,还记得前文说过的 epoll 只需要四个 api 就能完全操作嘛? 我们来看 PollIOLoop 的实现:

epoll 操作

def add_handler(self, fd, handler, events):
	fd, obj = self.split_fd(fd)
	self._handlers[fd] = (obj, stack_context.wrap(handler))
	self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):
	fd, obj = self.split_fd(fd)
	self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):
	fd, obj = self.split_fd(fd)
	self._handlers.pop(fd, None)
	self._events.pop(fd, None)
	try:
		self._impl.unregister(fd)
		except Exception:
			gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 所以这三个方法实现了 epoll 的 epoll_ctl 。

epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create 。

epoll_wait: epoll_wait 操作则在 start() 中:event_pairs = self._impl.poll(poll_timeout)

epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。

initialize

接下来看 PollIOLoop 的初始化方法中作了什么:

def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)
        self._impl = impl                         # 指定 epoll
        if hasattr(self._impl, 'fileno'):
            set_close_exec(self._impl.fileno())   # fork 后关闭无用文件描述符
        self.time_func = time_func or time.time   # 指定获取当前时间的函数
        self._handlers = {}                       # handler 的字典,储存被 epoll 监听的 handler ,与打开它的文件描述符 ( file descriptor 简称 fd ) 一一对应
        self._events = {}                         # event 的字典,储存 epoll 返回的活跃的 fd event pairs
        self._callbacks = []                      # 储存各个 fd 回调函数的列表
        self._callback_lock = threading.Lock()    # 指定进程锁
        self._timeouts = []                       # 将是一个最小堆结构,按照超时时间从小到大排列的 fd 的任务堆( 通常这个任务都会包含一个 callback )
        self._cancellations = 0                   # 关于 timeout 的计数器
        self._running = False                     # ioloop 是否在运行
        self._stopped = False                     # ioloop 是否停止
        self._closing = False                     # ioloop 是否关闭
        self._thread_ident = None                 #  当前线程堆标识符 ( thread identify )
        self._blocking_signal_threshold = None    # 系统信号, 主要用来在 epoll_wait 时判断是否会有 signal alarm 打断 epoll
        self._timeout_counter = itertools.count() # 超时计数器 ( 暂时不是很明白具体作用,好像和前面的 _cancellations 有关系? 请大神讲讲)
        self._waker = Waker()                     # 一个 waker 类,主要是对于管道 pipe 的操作,因为 ioloop 属于底层的数据操作,这里 epoll 监听的是 pipe
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)               # 将管道加入 epoll 监听,对于 web server 初始化时只需要关心 READ 事件

除了注释中的解释,还有几点补充:

  1. close_exec 的作用: 子进程在 fork 出来的时候,使用了写时复制( COW , Copy-On-Write )方式获得父进程的数据空间、 堆和栈副本,这其中也包括文件描述符。刚刚 fork 成功时,父子进程中相同的文件描述符指向系统文件表中的同一项,接着,一般我们会调用 exec 执行另一个程序,此时会用全新的程序替换子进程的正文,数据,堆和栈等。此时保存文件描述符的变量当然也不存在了,我们就无法关闭无用的文件描述符了。所以通常我们会 fork 子进程后在子进程中直接执行 close 关掉无用的文件描述符,然后再执行 exec 。 所以 close_exec 执行的其实就是 关闭 + 执行的作用。 详情可以查看: 关于 linux 进程间的 close-on-exec 机制

  2. Waker(): Waker 封装了对于管道 pipe 的操作:

    def set_close_exec(fd):
    	flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    	fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
    
    def _set_nonblocking(fd):
    	flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    	fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
    
    class Waker(interface.Waker):
    	def __init__(self):
    		r, w = os.pipe()
    		_set_nonblocking(r)
    		_set_nonblocking(w)
    		set_close_exec(r)
    		set_close_exec(w)
    		self.reader = os.fdopen(r, "rb", 0)
    		self.writer = os.fdopen(w, "wb", 0)
    
    	def fileno(self):
    		return self.reader.fileno()
    
    	def write_fileno(self):
    		return self.writer.fileno()
    
    	def wake(self):
    		try:
    			self.writer.write(b"x")
    		except IOError:
    			pass
    
    	def consume(self):
    		try:
    			while True:
    				result = self.reader.read()
    				if not result:
    					break
    		except IOError:
    			pass
    
    	def close(self):
    		self.reader.close()
    		self.writer.close()
    ```
    

可以看到 waker 把 pipe 分为读、 写两个管道并都设置了非阻塞和 close_exec。 注意wake(self)方法中:self.writer.write(b"x") 直接向管道中写入随意字符从而释放管道。

原文地址

作者:rapospectre

3340 次点击
所在节点    Python
1 条回复
rapospectre
2016-06-06 22:18:31 +08:00
噗,,还有一部分发不出来了,不能短时间内频繁发布主 @_@

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/283943

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX