深入理解 tornado 之 底层 ioloop 实现(三)

2016-06-07 15:57:14 +08:00
 rapospectre

承接之前的文章:深入理解 tornado 之 底层 ioloop 实现(二)

start

ioloop 最核心的部分:

def start(self):
        if self._running:       # 判断是否已经运行
            raise RuntimeError("IOLoop is already running")
        self._setup_logging()
        if self._stopped:
            self._stopped = False  # 设置停止为假
            return
        old_current = getattr(IOLoop._current, "instance", None)
        IOLoop._current.instance = self
        self._thread_ident = thread.get_ident()  # 获得当前线程标识符
        self._running = True # 设置运行

        old_wakeup_fd = None
        if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
            try:
                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
                if old_wakeup_fd != -1:
                    signal.set_wakeup_fd(old_wakeup_fd)
                    old_wakeup_fd = None
            except ValueError:
                old_wakeup_fd = None

        try:
            while True:  # 服务器进程正式开始,类似于其他服务器的 serve_forever
                with self._callback_lock: # 加锁,_callbacks 做为临界区不加锁进行读写会产生脏数据
                    callbacks = self._callbacks # 读取 _callbacks
                    self._callbacks = []. # 清空 _callbacks
                due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务
                if self._timeouts: # 判断 _timeouts 里是否有数据
                    now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时
                    while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的
                        if self._timeouts[0].callback is None: # 超时任务无回调
                            heapq.heappop(self._timeouts) # 直接弹出
                            self._cancellations -= 1 # 超时计数器 - 1
                        elif self._timeouts[0].deadline <= now: # 判断最小的数据是否超时
                            due_timeouts.append(heapq.heappop(self._timeouts)) # 超时就加到已超时列表里。
                        else:
                            break # 因为最小堆,如果没超时就直接退出循环( 后面的数据必定未超时 )
                    if (self._cancellations > 512
                            and self._cancellations > (len(self._timeouts) >> 1)):  # 当超时计数器大于 512 并且 大于 _timeouts 长度一半( >> 为右移运算, 相当于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务
                        self._cancellations = 0
                        self._timeouts = [x for x in self._timeouts
                                          if x.callback is not None]
                        heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化

                for callback in callbacks:
                    self._run_callback(callback) # 运行 callbacks 里所有的 calllback
                for timeout in due_timeouts:
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback) # 运行所有已过期任务的 callback
                callbacks = callback = due_timeouts = timeout = None # 释放内存

                if self._callbacks: # _callbacks 里有数据时
                    poll_timeout = 0.0 # 设置 epoll_wait 时间为 0 ( 立即返回 )
                elif self._timeouts: # _timeouts 里有数据时
                    poll_timeout = self._timeouts[0].deadline - self.time() 
					# 取最小过期时间当 epoll_wait 等待时间,这样当第一个任务过期时立即返回
                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
					# 如果最小过期时间大于默认等待时间 _POLL_TIMEOUT = 3600 ,则用 3600 ,如果最小过期时间小于 0 就设置为 0 立即返回。
                else:
                    poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间

                if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续
                    break

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 之前确保 signal alarm 都被清空( 这样在 epoll_wait 过程中不会被 signal alarm 打断 )

                try:
                    event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队
                except Exception as e:
                    if errno_from_exception(e) == errno.EINTR:
                        continue
                    else:
                        raise

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0) #  epoll_wait 结束, 再设置 signal alarm
                self._events.update(event_pairs) # 将活跃事件加入 _events
                while self._events:
                    fd, events = self._events.popitem() # 循环弹出事件
                    try:
                        fd_obj, handler_func = self._handlers[fd] # 处理事件
                        handler_func(fd_obj, events)
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:
                            pass
                        else:
						    self.handle_callback_exception(self._handlers.get(fd))
                    except Exception:
                        self.handle_callback_exception(self._handlers.get(fd))
                fd_obj = handler_func = None

        finally:
            self._stopped = False # 确保发生异常也继续运行
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm
            IOLoop._current.instance = old_current 
            if old_wakeup_fd is not None:
                signal.set_wakeup_fd(old_wakeup_fd)   # 和 start 开头部分对应,但是不是很清楚作用,求老司机带带路

最后来看 stop:

stop

def stop(self):
	self._running = False
	self._stopped = True
	self._waker.wake()

这个很简单,设置判断条件,然后调用 self._waker.wake() 向 pipe 写入随意字符释放 pipe 。 over !

总结

噗,写了这么长,终于写完了。 经过分析,我们可以看到, ioloop 实际上是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。

最后,感谢大家不辞辛苦看到这,文中理解有误的地方还请多多指教!:pray:

原文地址

作者:rapospectre

3488 次点击
所在节点    Python
2 条回复
micyng
2016-06-07 20:49:34 +08:00
最后一点写错了,不是释放 pipe ,而是利用 pipe 的 fd 唤醒 ioloop 事件循环
rapospectre
2016-06-08 09:27:03 +08:00
@micyng 谢谢指正!已经修改

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/284153

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX