def configurable_default(cls):
"""Returns the implementation class to be used if none is configured."""
raise NotImplementedError()
显然也是个接口,那么我们再回头看 ioloop 的 configurable_default()
:
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
原来这是个工厂函数,根据不同的操作系统返回不同的事件池( linux 就是 epoll , mac 返回 kqueue ,其他就返回普通的 select 。 kqueue 基本等同于 epoll , 只是不同系统对其的不同实现)
现在线索转移到了 tornado.platform.epoll.EPollIOLoop
上,我们再来看看 EPollIOLoop
:
import select
from tornado.ioloop import PollIOLoop
class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
EPollIOLoop
完全继承自 PollIOLoop
(注意这里是 PollIOLoop 不是 IOLoop)并只是在初始化时指定了 impl 是 epoll ,所以看起来我们用 IOLoop 初始化最后初始化的其实就是这个 PollIOLoop,所以接下来,我们真正需要理解和阅读的内容应该都在这里:
class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.
For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
`tornado.platform.select.SelectIOLoop` (all platforms).
"""
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
def close(self, all_fds=False):
with self._callback_lock:
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in self._handlers.values():
self.close_fd(fd)
self._waker.close()
self._impl.close()
self._callbacks = None
self._timeouts = None
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def start(self):
...
try:
while True:
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
# are ready, so timeouts that call add_timeout cannot
# schedule anything in this iteration.
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
# The timeout was cancelled. Note that the
# cancellation check is repeated below for timeouts
# that are cancelled by another timeout or callback.
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512
and self._cancellations > (len(self._timeouts) >> 1)):
# Clean up the timeout queue when it gets large and it's
# more than half cancellations.
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
for callback in callbacks:
self._run_callback(callback)
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = due_timeouts = timeout = None
if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
poll_timeout = 0.0
elif self._timeouts:
# If there are any timeouts, schedule the first one.
# Use self.time() instead of 'now' to account for time
# spent running callbacks.
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
# No timeouts and no callbacks, so use the default.
poll_timeout = _POLL_TIMEOUT
if not self._running:
break
if self._blocking_signal_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
finally:
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)
def stop(self):
self._running = False
self._stopped = True
self._waker.wake()
def time(self):
return self.time_func()
def call_at(self, deadline, callback, *args, **kwargs):
timeout = _Timeout(
deadline,
functools.partial(stack_context.wrap(callback), *args, **kwargs),
self)
heapq.heappush(self._timeouts, timeout)
return timeout
def remove_timeout(self, timeout):
# Removing from a heap is complicated, so just leave the defunct
# timeout object in the queue (see discussion in
# http://docs.python.org/library/heapq.html).
# If this turns out to be a problem, we could add a garbage
# collection pass whenever there are too many dead timeouts.
timeout.callback = None
self._cancellations += 1
def add_callback(self, callback, *args, **kwargs):
with self._callback_lock:
if self._closing:
raise RuntimeError("IOLoop is closing")
list_empty = not self._callbacks
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
if list_empty and thread.get_ident() != self._thread_ident:
# If we're in the IOLoop's thread, we know it's not currently
# polling. If we're not, and we added the first callback to an
# empty list, we may need to wake it up (it may wake up on its
# own, but an occasional extra wake is harmless). Waking
# up a polling IOLoop is relatively expensive, so we try to
# avoid it when we can.
self._waker.wake()
def add_callback_from_signal(self, callback, *args, **kwargs):
with stack_context.NullContext():
if thread.get_ident() != self._thread_ident:
# if the signal is handled on another thread, we can add
# it normally (modulo the NullContext)
self.add_callback(callback, *args, **kwargs)
else:
# If we're on the IOLoop's thread, we cannot use
# the regular add_callback because it may deadlock on
# _callback_lock. Blindly insert into self._callbacks.
# This is safe because the GIL makes list.append atomic.
# One subtlety is that if the signal interrupted the
# _callback_lock block in IOLoop.start, we may modify
# either the old or new version of self._callbacks,
# but either way will work.
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs))
果然, PollIOLoop 继承自 IOLoop 并实现了它的所有接口,现在我们终于可以进入真正的正题了:joy:
首先要看的是关于 epoll 操作的方法,还记得前文说过的 epoll 只需要四个 api 就能完全操作嘛? 我们来看 PollIOLoop 的实现:
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
fd, obj = self.split_fd(fd)
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
fd, obj = self.split_fd(fd)
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except Exception:
gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 所以这三个方法实现了 epoll 的 epoll_ctl 。
epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
。 这个相当于 epoll_create 。
epoll_wait: epoll_wait 操作则在 start()
中:event_pairs = self._impl.poll(poll_timeout)
epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close
方法内调用: self._impl.close()
完成。
接下来看 PollIOLoop 的初始化方法中作了什么:
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl # 指定 epoll
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno()) # fork 后关闭无用文件描述符
self.time_func = time_func or time.time # 指定获取当前时间的函数
self._handlers = {} # handler 的字典,储存被 epoll 监听的 handler ,与打开它的文件描述符 ( file descriptor 简称 fd ) 一一对应
self._events = {} # event 的字典,储存 epoll 返回的活跃的 fd event pairs
self._callbacks = [] # 储存各个 fd 回调函数的列表
self._callback_lock = threading.Lock() # 指定进程锁
self._timeouts = [] # 将是一个最小堆结构,按照超时时间从小到大排列的 fd 的任务堆( 通常这个任务都会包含一个 callback )
self._cancellations = 0 # 关于 timeout 的计数器
self._running = False # ioloop 是否在运行
self._stopped = False # ioloop 是否停止
self._closing = False # ioloop 是否关闭
self._thread_ident = None # 当前线程堆标识符 ( thread identify )
self._blocking_signal_threshold = None # 系统信号, 主要用来在 epoll_wait 时判断是否会有 signal alarm 打断 epoll
self._timeout_counter = itertools.count() # 超时计数器 ( 暂时不是很明白具体作用,好像和前面的 _cancellations 有关系? 请大神讲讲)
self._waker = Waker() # 一个 waker 类,主要是对于管道 pipe 的操作,因为 ioloop 属于底层的数据操作,这里 epoll 监听的是 pipe
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ) # 将管道加入 epoll 监听,对于 web server 初始化时只需要关心 READ 事件
除了注释中的解释,还有几点补充:
close_exec 的作用: 子进程在 fork 出来的时候,使用了写时复制( COW , Copy-On-Write )方式获得父进程的数据空间、 堆和栈副本,这其中也包括文件描述符。刚刚 fork 成功时,父子进程中相同的文件描述符指向系统文件表中的同一项,接着,一般我们会调用 exec 执行另一个程序,此时会用全新的程序替换子进程的正文,数据,堆和栈等。此时保存文件描述符的变量当然也不存在了,我们就无法关闭无用的文件描述符了。所以通常我们会 fork 子进程后在子进程中直接执行 close 关掉无用的文件描述符,然后再执行 exec 。 所以 close_exec 执行的其实就是 关闭 + 执行的作用。 详情可以查看: 关于 linux 进程间的 close-on-exec 机制
Waker(): Waker 封装了对于管道 pipe 的操作:
def set_close_exec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
class Waker(interface.Waker):
def __init__(self):
r, w = os.pipe()
_set_nonblocking(r)
_set_nonblocking(w)
set_close_exec(r)
set_close_exec(w)
self.reader = os.fdopen(r, "rb", 0)
self.writer = os.fdopen(w, "wb", 0)
def fileno(self):
return self.reader.fileno()
def write_fileno(self):
return self.writer.fileno()
def wake(self):
try:
self.writer.write(b"x")
except IOError:
pass
def consume(self):
try:
while True:
result = self.reader.read()
if not result:
break
except IOError:
pass
def close(self):
self.reader.close()
self.writer.close()
```
可以看到 waker 把 pipe 分为读、 写两个管道并都设置了非阻塞和 close_exec
。 注意wake(self)
方法中:self.writer.write(b"x")
直接向管道中写入随意字符从而释放管道。
作者:rapospectre
1
rapospectre OP 噗,,还有一部分发不出来了,不能短时间内频繁发布主 @_@
|