最近打算学习 tornado 的源码,所以就建立一个系列主题 **“深入理解 tornado ”**。 在此记录学习经历及个人见解与大家分享。文中一定会出现理解不到位或理解错误的地方,还请大家多多指教:
进入正题:
tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构(其他 python web 框架的自带 server 基本是基于 wsgi 写的简单服务器,并没有自己实现底层结构。 关于 wsgi 详见之前的文章: 自己写一个 wsgi 服务器运行 Django 、 Tornado 应用)。 那么 tornado.ioloop
就是 tornado web server 最底层的实现。
看 ioloop 之前,我们需要了解一些预备知识,有助于我们理解 ioloop 。
ioloop 的实现基于 epoll ,那么什么是 epoll ? epoll 是 Linux 内核为处理大批量文件描述符而作了改进的 poll 。 那么什么又是 poll ? 首先,我们回顾一下, socket 通信时的服务端,当它接受( accept )一个连接并建立通信后( connection )就进行通信,而此时我们并不知道连接的客户端有没有信息发完。 这时候我们有两种选择:
第一种办法虽然可以解决问题,但我们要注意的是对于一个线程\进程同时只能处理一个 socket 通信,其他连接只能被阻塞。 显然这种方式在单进程情况下不现实。
第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是 poll / select 的解决方案。 看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的 socket 大多不是活跃的,所以轮询所花费的大部分时间将是无用的。为了解决这个问题, epoll 被创造出来,它的概念和 poll 类似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量连接时轮询就节省了大量时间。
对于 epoll 的操作,其实也很简单,只要 4 个 API 就可以完全操作它。
用来创建一个 epoll 描述符( 就是创建了一个 epoll )
操作 epoll 中的 event ;可用参数有:
| 参数 | 含义 | | ------------ | ------------ | | EPOLL_CTL_ADD | 添加一个新的 epoll 事件 | | EPOLL_CTL_DEL | 删除一个 epoll 事件 | | EPOLL_CTL_MOD | 改变一个事件的监听方式 |
而事件的监听方式有七种,而我们只需要关心其中的三种:
| 宏定义 | 含义 | | ------------ | ------------ | | EPOLLIN | 缓冲区满,有数据可读 | | EPOLLOUT | 缓冲区空,可写数据 | | EPOLLERR | 发生错误 |
就是让 epoll 开始工作,里面有个参数 timeout ,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。
在监听时有数据活跃的连接时其返回活跃的文件句柄列表(此处为 socket 文件句柄)。
关闭 epoll
现在了解了 epoll 后,我们就可以来看 ioloop 了 (如果对 epoll 还有疑问可以看这两篇资料: epoll 的原理是什么、百度百科: epoll)
很多初学者一定好奇 tornado 运行服务器最后那一句 tornado.ioloop.IOLoop.current().start()
到底是干什么的。 我们先不解释作用,来看看这一句代码背后到底都在干什么。
先贴 ioloop 代码:
from __future__ import absolute_import, division, print_function, with_statement
import datetime
import errno
import functools
import heapq # 最小堆
import itertools
import logging
import numbers
import os
import select
import sys
import threading
import time
import traceback
import math
from tornado.concurrent import TracebackFuture, is_future
from tornado.log import app_log, gen_log
from tornado.platform.auto import set_close_exec, Waker
from tornado import stack_context
from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds
try:
import signal
except ImportError:
signal = None
if PY3:
import _thread as thread
else:
import thread
_POLL_TIMEOUT = 3600.0
class TimeoutError(Exception):
pass
class IOLoop(Configurable):
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
_current = threading.local()
@staticmethod
def instance():
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
@staticmethod
def initialized():
"""Returns true if the singleton instance has been created."""
return hasattr(IOLoop, "_instance")
def install(self):
assert not IOLoop.initialized()
IOLoop._instance = self
@staticmethod
def clear_instance():
"""Clear the global `IOLoop` instance.
.. versionadded:: 4.0
"""
if hasattr(IOLoop, "_instance"):
del IOLoop._instance
@staticmethod
def current(instance=True):
current = getattr(IOLoop._current, "instance", None)
if current is None and instance:
return IOLoop.instance()
return current
def make_current(self):
IOLoop._current.instance = self
@staticmethod
def clear_current():
IOLoop._current.instance = None
@classmethod
def configurable_base(cls):
return IOLoop
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
def initialize(self, make_current=None):
if make_current is None:
if IOLoop.current(instance=False) is None:
self.make_current()
elif make_current:
if IOLoop.current(instance=False) is not None:
raise RuntimeError("current IOLoop already exists")
self.make_current()
def close(self, all_fds=False):
raise NotImplementedError()
def add_handler(self, fd, handler, events):
raise NotImplementedError()
def update_handler(self, fd, events):
raise NotImplementedError()
def remove_handler(self, fd):
raise NotImplementedError()
def set_blocking_signal_threshold(self, seconds, action):
raise NotImplementedError()
def set_blocking_log_threshold(self, seconds):
self.set_blocking_signal_threshold(seconds, self.log_stack)
def log_stack(self, signal, frame):
gen_log.warning('IOLoop blocked for %f seconds in\n%s',
self._blocking_signal_threshold,
''.join(traceback.format_stack(frame)))
def start(self):
raise NotImplementedError()
def _setup_logging(self):
if not any([logging.getLogger().handlers,
logging.getLogger('tornado').handlers,
logging.getLogger('tornado.application').handlers]):
logging.basicConfig()
def stop(self):
raise NotImplementedError()
def run_sync(self, func, timeout=None):
future_cell = [None]
def run():
try:
result = func()
if result is not None:
from tornado.gen import convert_yielded
result = convert_yielded(result)
except Exception:
future_cell[0] = TracebackFuture()
future_cell[0].set_exc_info(sys.exc_info())
else:
if is_future(result):
future_cell[0] = result
else:
future_cell[0] = TracebackFuture()
future_cell[0].set_result(result)
self.add_future(future_cell[0], lambda future: self.stop())
self.add_callback(run)
if timeout is not None:
timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
self.start()
if timeout is not None:
self.remove_timeout(timeout_handle)
if not future_cell[0].done():
raise TimeoutError('Operation timed out after %s seconds' % timeout)
return future_cell[0].result()
def time(self):
return time.time()
...
IOLoop 类首先声明了 epoll 监听事件的宏定义,当然,如前文所说,我们只要关心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。
类中的方法有很多,看起来有点晕,但其实我们只要关心 IOLoop 核心功能的方法即可,其他的方法在明白核心功能后也就不难理解了。所以接下来我们着重分析核心代码。
instance
、 initialized
、 install
、 clear_instance
、 current
、 make_current
、 clear_current
这些方法不用在意细节,总之现在记住它们都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop 就好。
你一定疑惑 IOLoop 为何没有 __init__
, 其实是因为要初始化成为单例, IOLoop 的 new 函数已经被改写了,同时指定了 initialize
做为它的初始化方法,所以此处没有 __init__
。 说到这, ioloop 的代码里好像没有看到 new
方法,这又是什么情况? 我们先暂时记住这里。
接着我们来看这个初始化方法:
def initialize(self, make_current=None):
if make_current is None:
if IOLoop.current(instance=False) is None:
self.make_current()
elif make_current:
if IOLoop.current(instance=False) is None:
raise RuntimeError("current IOLoop already exists")
self.make_current()
def make_current(self):
IOLoop._current.instance = self
what? 里面只是判断了是否第一次初始化或者调用 self.make_current ()
初始化,而 make_current()
里也仅仅是把实例指定为自己,那么初始化到底去哪了?
然后再看看 start()
、 run()
、 close()
这些关键的方法都成了返回 NotImplementedError
错误,全部未定义?!跟网上搜到的源码分析完全不一样啊。 这时候看下 IOLoop 的继承关系,原来问题出在这里,之前的 tornado.ioloop 继承自 object 所以所有的一切都自己实现,而现在版本的 tornado.ioloop 则继承自 Configurable
看起来现在的 IOLoop 已经成为了一个基类,只定义了接口。 所以接着看 Configurable
代码:
class Configurable(object):
__impl_class = None
__impl_kwargs = None
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatibility with AsyncHTTPClient
# singleton magic. If we get rid of that we can switch to __init__
# here too.
instance.initialize(*args, **init_kwargs)
return instance
@classmethod
def configurable_base(cls):
"""Returns the base class of a configurable hierarchy.
This will normally return the class in which it is defined.
(which is *not* necessarily the same as the cls classmethod parameter).
"""
raise NotImplementedError()
@classmethod
def configurable_default(cls):
"""Returns the implementation class to be used if none is configured."""
raise NotImplementedError()
def initialize(self):
"""Initialize a `Configurable` subclass instance.
Configurable classes should use `initialize` instead of ``__init__``.
.. versionchanged:: 4.2
Now accepts positional arguments in addition to keyword arguments.
"""
@classmethod
def configure(cls, impl, **kwargs):
"""Sets the class to use when the base class is instantiated.
Keyword arguments will be saved and added to the arguments passed
to the constructor. This can be used to set global defaults for
some parameters.
"""
base = cls.configurable_base()
if isinstance(impl, (unicode_type, bytes)):
impl = import_object(impl)
if impl is not None and not issubclass(impl, cls):
raise ValueError("Invalid subclass of %s" % cls)
base.__impl_class = impl
base.__impl_kwargs = kwargs
@classmethod
def configured_class(cls):
"""Returns the currently configured class."""
base = cls.configurable_base()
if cls.__impl_class is None:
base.__impl_class = cls.configurable_default()
return base.__impl_class
@classmethod
def _save_configuration(cls):
base = cls.configurable_base()
return (base.__impl_class, base.__impl_kwargs)
@classmethod
def _restore_configuration(cls, saved):
base = cls.configurable_base()
base.__impl_class = saved[0]
base.__impl_kwargs = saved[1]
之前我们寻找的 __new__
出现了! 注意其中这句: impl = cls.configured_class()
impl 在这里就是 epoll ,它的生成函数是 configured_class()
, 而其方法里又有 base.__impl_class = cls.configurable_default()
,调用了 configurable_default()
。而 Configurable
的 configurable_default()
:
v2ex 限制了文章最大长度 20000 ,可以继续看第二部分或直接点击原文阅读
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.