问大家一个问题
try:
# pylint: disable=invalid-name
asyncio_run = asyncio.run # type: ignore
except AttributeError:
_T = TypeVar('_T')
def asyncio_run(main: Awaitable[_T], *, debug: bool = False) -> _T:
"""Minimal re-implementation of asyncio.run (since 3.7)."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(debug)
try:
return loop.run_until_complete(main)
finally:
asyncio.set_event_loop(None)
loop.close()
async def setup_and_run_hass(config_dir: str,
args: argparse.Namespace) -> int:
"""Set up HASS and run."""
from homeassistant import bootstrap, core
hass = core.HomeAssistant()
if args.demo_mode:
config = {
'frontend': {},
'demo': {}
} # type: Dict[str, Any]
bootstrap.async_from_config_dict(
config, hass, config_dir=config_dir, verbose=args.verbose,
skip_pip=args.skip_pip, log_rotate_days=args.log_rotate_days,
log_file=args.log_file, log_no_color=args.log_no_color)
else:
config_file = ensure_config_file(config_dir)
print('Config directory:', config_dir)
await bootstrap.async_from_config_file(
config_file, hass, verbose=args.verbose, skip_pip=args.skip_pip,
log_rotate_days=args.log_rotate_days, log_file=args.log_file,
log_no_color=args.log_no_color)
if args.open_ui:
# Imported here to avoid importing asyncio before monkey patch
from homeassistant.util.async_ import run_callback_threadsafe
def open_browser(_: Any) -> None:
"""Open the web interface in a browser."""
if hass.config.api is not None:
import webbrowser
webbrowser.open(hass.config.api.base_url)
run_callback_threadsafe(
hass.loop,
hass.bus.async_listen_once,
EVENT_HOMEASSISTANT_START, open_browser
)
return await hass.async_run()
exit_code = asyncio_run(setup_and_run_hass(config_dir, args))
if exit_code == RESTART_EXIT_CODE and not args.runner:
try_to_restart()
这里的 exit_code = asyncio_run(setup_and_run_hass(config_dir, args)) 作用是什么,怎么感觉还是要等 setup_and_run_hass 跑完了才继续运行
就是针对一个函数使用 async 的目的,
import asyncio
try:
# pylint: disable=invalid-name
asyncio_run = asyncio.run # type: ignore
except AttributeError:
def asyncio_run(main, debug=False):
"""Minimal re-implementation of asyncio.run (since 3.7)."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(debug)
try:
return loop.run_until_complete(main)
finally:
asyncio.set_event_loop(None)
loop.close()
async def slow_operation_1(m):
await asyncio.sleep(m)
print('Slow operation {} complete'.format(m))
return m
async def slow_operation_2(n):
await asyncio.sleep(n)
print('Slow operation {} complete'.format(n))
return await slow_operation_1(m=n+1)
result = asyncio_run(slow_operation_2(2))
print(result)
类似于这段代码。。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.