aiohttp 怎么复用连接池

2017-06-11 16:14:48 +08:00
 resolvewang

最近,在看 python 的异步编程( asyncio )部分,在使用 aiomysql 的时候遇到了困难,已经困惑我两三天了。可能是自己资质愚钝,看了 aiomysql 的官网例子( https://github.com/aio-libs/aiomysql/tree/master/examples ),我还是没能弄懂怎么才能多个数据库操作中复用 pool。下面是我的代码

import aiomysql
import asyncio


async def select(loop, sql):
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='123456',
                                      db='test', loop=loop)
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute(sql)
            r = await cur.fetchone()
            print(r)


async def insert(loop, sql):
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='123456',
                                      db='test', loop=loop)
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute(sql)
            await conn.commit()


async def main(loop):
    c1 = select(loop=loop, sql='select * from minifw')
    c2 = insert(loop=loop, sql="insert into minifw (name) values ('hello')")
    tasks = [
        asyncio.ensure_future(c1),
        asyncio.ensure_future(c2)
    ]
    return await asyncio.gather(*tasks)

if __name__ == '__main__':
    cur_loop = asyncio.get_event_loop()
    cur_loop.run_until_complete(main(cur_loop))



上面这样做的话,每次做数据库操作的时候,应该都会执行一次 create_pool这个操作。现在我的问题是应该怎么改上面的代码,让连接池可以复用啊?

以前没怎么接触异步编程,希望大家能解答一下我的疑惑,感谢

6270 次点击
所在节点    问与答
12 条回复
qs
2017-06-11 18:46:22 +08:00
pool 放到全局变量里 创建前先判断下 pool 是否可用 具体可以看看单例的写法
resolvewang
2017-06-11 18:59:53 +08:00
@qs 能否改一下给我看看,我用全局变量试过,应该是我能力不够吧,改过后调试不通。感谢
messense
2017-06-11 21:18:32 +08:00
把 pool 放到 main 里面?

import aiomysql
import asyncio


async def select(pool, sql):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql)
r = await cur.fetchone()
print(r)


async def insert(pool, sql):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql)
await conn.commit()


async def main(loop):
pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='123456',
db='test', loop=loop)
c1 = select(pool, sql='select * from minifw')
c2 = insert(pool, sql="insert into minifw (name) values ('hello')")
tasks = [
asyncio.ensure_future(c1),
asyncio.ensure_future(c2)
]
return await asyncio.gather(*tasks)

if __name__ == '__main__':
cur_loop = asyncio.get_event_loop()
cur_loop.run_until_complete(main(cur_loop))
resolvewang
2017-06-11 22:18:54 +08:00
@messense 感谢,这样确实是可以的。我咋就没想到。
resolvewang
2017-06-11 22:34:11 +08:00
@messense 大神,是否还有别的方法,可以把 pool 从 main() 中抽出来,这样写我又遇到问题了,比如我要写单元测试用例,应该怎么写啊,我写的运行使用不行

<pre>
import asyncio
import unittest
import aiomysql
from minifw.db import base_db


class TestDB(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.pool = aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='123456',
db='test', loop=self.loop)

def test_select(self):
sql = 'select * from minifw where id = (%s)'
rs = self.loop.run_until_complete(base_db.select(self.pool, sql, args=(1,), size=1))
self.assertEqual(len(rs), 1)

def test_insert(self):
sql = 'insert into `minifw` (`name`) values (?)'
rs = self.loop.run_until_complete(base_db.insert(self.pool, sql, args=('test_val',)))
self.assertEqual(rs, 1)

def tearDown(self):
self.loop.close()
del self.loop

if __name__ == '__main__':
unittest.main()
</pre>

这里的问题就是这个 pool 我不知道咋处理,如果我要运行 unnittest.main(),就会报错 `AttributeError: '_PoolContextManager' object has no attribute 'acquire'`
messense
2017-06-11 23:40:15 +08:00
aiomysql.create_pool 需要 await 吧?没有 await 它返回的是个 coroutine,试试:

self.pool = self.loop.run_until_complete(aiomysql.create_pool(...))
resolvewang
2017-06-12 09:55:40 +08:00
@messense 感谢您的耐心回复!通过你的回复,我对 asyncio 的理解进了一步,前两天看了差不多一天的文档,都还是有点懵。现在还有两个问题,我还是不是很懂,希望您能再帮忙解一下惑。

`
async def select(pool, sql, args=(), size=None):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql.replace('?', '%s'), args)
if size:
r = await cur.fetchmany(size)
else:
r = await cur.fetchall()
return r
`

我在做关于这段代码的单元测试的时候,虽然执行正确,但是有一个警告,` ResourceWarning: Unclosed connection <aiomysql.connection.Connection object at 0x1030d1710>
ResourceWarning)`

大概意思就是数据库连接没有关闭吧。但是这里用了上下文管理器啊,它不应该会关闭连接吗?我如果在 return 语句之前,加上 conn.close(), 就不会报这个警告了。

`
def test_select(self):
sql = 'select * from minifw where id = (%s)'
rs = self.loop.run_until_complete(base_db.select(self.pool, sql, args=(1,), size=1))
self.assertEqual(len(rs), 1)
`
这段代码是测试 select。


另外还有一个问题,我想问问
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)


self.loop = asyncio.get_event_loop()
的区别,因为在我执行 unnittest.main()的时候,用前者就可以执行,用后者就会报错,`Event loop is closed`,如果是测试单个数据库操作,后者就不会报错。

期望您的回复,感恩
messense
2017-06-12 12:08:54 +08:00
按道理说会自动关闭连接,不清楚 aiomysql 的具体实现细节。

ef tearDown(self):
self.loop.close()
del self.loop

你 tearDown 里面把 loop close 了。。。
messense
2017-06-12 12:10:19 +08:00
Python 3.6 以后已经不推荐传递 event loop 了,需要用到 event loop 的时候调用 asyncio.get_event_loop() 就好了。
resolvewang
2017-06-12 13:42:41 +08:00
@messense 我一直以为 teardown 和 setup 在 unnittest.main()中只会运行一遍。。。

event loop 在程序的整个生命流程中,如果被 close 了,那么就不能使用 asyncio.get_event_loop()获取了吗?根据上面的代码,感觉是这样的。那这也就是说,我们只能在程序不用 event loop 的时候再关吗?

感觉自己对 event loop 模型还不是特别清楚。
messense
2017-06-12 15:16:26 +08:00
一般来说不需要手动 close event loop
resolvewang
2017-06-12 16:40:48 +08:00
@messense 好的,明白了,感谢

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/367612

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX