项目中的服务使用 tornado
来搭建,但是对于 tornado
如何启动还没有仔细研究过。
在某次问题排查时,仔细研究了 tornado
是如何启动的。
在 tornado 的大部分文档是建议使用
from tornado.ioloop import IOLoop
IOLoop.current().start()
上述程序会启动后端的 HTTPServer
。HTTPServer
派生自 TCPServer
。如果需要依赖这个功能来调用
start()
,那么在 TCPServer
的实现中,start()
一定会被调用。为了看清楚这个调用关系,
在 start
函数入口加一句抛异常。但是奇怪的是:服务启动后,异常并没有抛出,tornado 服务运行正常。
测试
为了验证 tornado 到底是怎么启动服务的,写了简单的服务,代码如下,保存为 app.py.
python app.py # 启动 8000 端口的服务
from tornado.web import Application, RequestHandler
from tornado.ioloop import IOLoop
class Handler(RequestHandler):
def get(self):
print("Handle request")
self.write("Bingo!")
url_specs = [
("/", Handler),
("/a", Handler),
]
app = Application(url_specs)
port = 8000
app.listen(port)
print(f"Listen on port {port}")
IOLoop.current().start()
刨根问底
其实 app.listen()
的实现就是创建 HTTPServer
,然后调用 TCPServer
的
listen
接口。这里的 listen
实现:
def listen(self, port: int, address: str = "") -> None:
"""Starts accepting connections on the given port.
This method may be called more than once to listen
on multiple ports.
`listen` takes effect immediately; it is not necessary to call
`TCPServer.start` afterwards. It is, however, necessary to start
the `.IOLoop`.
"""
print(f"Listen for tcp server")
sockets = bind_sockets(port, address=address)
self.add_sockets(sockets)
这里其实开始绑定端口。这里的 add_sockets
函数比较关键,因为是它开始把
socket 上事件挂载到 IOLoop.current()
上。
def add_sockets(self, sockets: Iterable[socket.socket]) -> None:
"""Makes this server start accepting connections on
the given sockets.
The ``sockets`` parameter is a list of socket objects such as
those returned by `~tornado.netutil.bind_sockets`.
`add_sockets` is typically used in combination with that
method and `tornado.process.fork_processes` to provide greater
control over the initialization of a multi-process server.
"""
for sock in sockets:
self._sockets[sock.fileno()] = sock
self._handlers[sock.fileno()] = add_accept_handler(
sock, self._handle_connection
)
add_accept_handler
是开始挂载事件:
def add_accept_handler(
sock: socket.socket, callback: Callable[[socket.socket, Any], None]
) -> Callable[[], None]:
"""Adds an `.IOLoop` event handler to accept new connections
on ``sock``.
When a connection is accepted, ``callback(connection, address)``
will
be run (``connection`` is a socket object, and ``address`` is the
address of the other end of the connection). Note that this
signature
is different from the ``callback(fd, events)`` signature used for
`.IOLoop` handlers.
A callable is returned which, when called, will remove the `.IOLoop`
event handler and stop processing further incoming connections.
.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1)
has been removed.
.. versionchanged:: 5.0
A callable is returned (``None`` was returned before).
"""
io_loop = IOLoop.current()
removed = [False]
def accept_handler(fd: socket.socket, events: int) -> None:
# More connections may come in while we're handling callbacks;
# to prevent starvation of other tasks we must limit the number
# of connections we accept at a time. Ideally we would accept
# up to the number of connections that were waiting when we
# entered this method, but this information is not available
# (and rearranging this method to call accept() as many times
# as possible before running any callbacks would have adverse
# effects on load balancing in multiprocess configurations).
# Instead, we use the (default) listen backlog as a rough
# heuristic for the number of connections we can reasonably
# accept at once.
for i in range(_DEFAULT_BACKLOG):
if removed[0]:
# The socket was probably closed
return
try:
connection, address = sock.accept()
except BlockingIOError:
# EWOULDBLOCK indicates we have accepted every
# connection that is available.
return
except ConnectionAbortedError:
# ECONNABORTED indicates that there was a connection
# but it was closed while still in the accept queue.
# (observed on FreeBSD).
continue
callback(connection, address)
def remove_handler() -> None:
io_loop.remove_handler(sock)
removed[0] = True
io_loop.add_handler(sock, accept_handler, IOLoop.READ)
return remove_handler
回过头,我们再来看 IOLoop.current().start()
做的事情:
@staticmethod
def current(instance: bool = True) -> Optional["IOLoop"]:
"""Returns the current thread's `IOLoop`.
If an `IOLoop` is currently running or has been marked as
current by `make_current`, returns that instance. If there is
no current `IOLoop` and ``instance`` is true, creates one.
.. versionchanged:: 4.1
Added ``instance`` argument to control the fallback to
`IOLoop.instance()`.
.. versionchanged:: 5.0
On Python 3, control of the current `IOLoop` is delegated
to `asyncio`, with this and other methods as pass-through
accessors.
The ``instance`` argument now controls whether an `IOLoop`
is created automatically when there is none, instead of
whether we fall back to `IOLoop.instance()` (which is now
an alias for this method). ``instance=False`` is deprecated,
since even if we do not create an `IOLoop`, this method
may initialize the asyncio loop.
"""
try:
loop = asyncio.get_event_loop()
except (RuntimeError, AssertionError):
if not instance:
return None
raise
try:
return IOLoop._ioloop_for_asyncio[loop]
except KeyError:
if instance:
from tornado.platform.asyncio import AsyncIOMainLoop
current = AsyncIOMainLoop(make_current=True)
# type: Optional[IOLoop]
else:
current = None
return current
def start(self) -> None:
"""Starts the timer."""
# Looking up the IOLoop here allows to first instantiate the
# PeriodicCallback in another thread, then start it using
# IOLoop.add_callback().
self.io_loop = IOLoop.current()
self._running = True
self._next_timeout = self.io_loop.time()
self._schedule_next()
本质上做的事就是将 asyncio.get_event_loop()
包装到 AsyncIOMainLoop
。
这个 AsyncIOMainLoop
的 start
函数是调用 event_loop
的 run_forever
。
通过以上分析后,其实我们并不需要调用 IOLoop.current().start()
,我们可以直接利用
asyncio
的接口来启动即可。本质是 tornado 把 tcp 的事件绑定到 event loop 上。
最终代码
import asyncio
from tornado.web import Application, RequestHandler
class Handler(RequestHandler):
def get(self):
print("Handle request")
self.write("Bingo!")
url_specs = [
("/", Handler),
("/a", Handler),
]
app = Application(url_specs)
port = 8000
app.listen(port)
print(f"Listen on port {port}")
asyncio.get_event_loop().run_forever()
get_event_loop
get_event_loop
又在做什么事呢?
如下两个文件:asyncio/events.py
和 asyncio/unix_events.py
中描述怎么拿到对应的 event_loop
base_events.py
: run_forever
, run_until_complete
stop
IOLoop.current().stop()
实际上是 Mark 标记,然后由具体的 eventloop 来关闭
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
self._check_runnung()
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()
old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
finally:
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)
总结
- 启动实际上
listen
: 把时间挂载到event_loop
中start
调度下一个 event
- 结束
- 标记,依赖底层的 event loop 去执行。按照这个逻辑,岂不是只要事件不退出,这个 stop 就结束不了?