先上代码:
import asyncio
import aioredis
# redis 池
redis_pool = {
}
# redis 池读写锁
_lock = asyncio.Lock()
async def get_db(index:str="0"):
global redis_pool
await _lock.acquire() # 请求锁
db = redis_pool.get(index)
print(db)
if db is None:
specify_db = await aioredis.create_redis_pool("redis://localhost",db=int(index))
redis_pool.update({index:specify_db})
_lock.release() # 更新 redis 池完毕,释放锁
return specify_db
_lock.release() # 不需要生成 redis 连接,立即释放锁
return db
async def put_json(key:str,value:str,db_index:str="0",expire=-1):
db = await get_db(db_index)
await db.set(key,value,expire=expire)
async def test():
t1 = asyncio.create_task(get_db("0"))
t2 = asyncio.create_task(get_db("0"))
t3 = asyncio.create_task(get_db("0"))
t4 = asyncio.create_task(get_db("0"))
t5 = asyncio.create_task(get_db("0"))
task_list = [t1, t2, t3, t4, t5]
await asyncio.wait(task_list)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(test())
这是我构建了一个简易的 aioredis 连接池,如果单纯执行这个 test 的时候 lock 可以正确使用,但一旦其他异步代码通过 from import 调用put_json()
那么get_db()
就会抛出... got Future <Future pending> attached to a different loop
,说lock.acquire()
和 其他代码不再同一个事务循环中
而且查阅源码看到 asyncio.Lock()
若不指定 loop 循环就会通过asyncio.get_event_loop()
得到一个新循环,网上对于 asyncio.Lock 对象资料太少了。只有少量的 demo,demo 中是在 create_task 之前就创建好锁,然后参数传递给异步方法,这样做对 demo 有效,但对层次太多的代码很不友好
我现在的问题是我不想从 create_task()前就创建一把锁然后一直用参数传递到 get_db()中,因为这把锁只针对 redis_pool 的读写操作,其他代码根本不需要这把锁,还有有什么其他的办法可以调度协程对某个对象的读写吗
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.