有一个读文件然后写数据库的操作,想尝试使用协程。
使用协程的:
async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):
ql = len(context_qs)
i = 0
# 每一个 Queue 放 step 个数据就切换下一个
step = 2
with open(file_path, encoding="utf8") as f:
for text in f:
if i // step == ql:
i = 0
context_q = context_qs[i // step]
context = {}
text = re.findall(r"\d+", text)
if text:
context = {"解析然后组装成 dict"}
await context_q.put(context)
# 这里如果不 join ,会一直在这个 for 循环里不出去
await context_q.join()
i = i + 1
else:
await context_q.put("结束标记")
return
async def write_db(context_q: asyncio.Queue, model: ModelBase):
async with AsyncSession() as session:
while 1:
context = await context_q.get()
if context["结束标记"] == "end":
return
info, obj = None, None
try:
if context["info"]:
info = await session.execute(
select(InfoModel).filter(
InfoModel.attr == context["info"]
)
)
info = info.scalars().one_or_none()
if not info:
info = InfoModel(attr=context["info"])
session.add(info)
if context["header"]:
obj = await session.execute(
select(model).filter(
model.header == context["header"]
).options(selectinload(getattr(model, "info")))
)
obj = obj.scalars().one_or_none()
if not obj:
obj = model(header=context["header"])
session.add(obj)
if obj or info:
if info not in obj.info:
obj.info.append(info)
session.add(obj)
await session.commit()
except Exception as e:
await session.rollback()
raise e
else:
context_q.task_done()
async def main():
# 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法
c_q_count = 3
a_context_qs = [asyncio.Queue() for i in range(c_q_count)]
b_context_qs = [asyncio.Queue() for i in range(c_q_count)]
tasks = [
asyncio.create_task(
parse_text(Path("a.txt"), a_context_qs)
),
asyncio.create_task(
parse_text(Path("b.txt"), b_context_qs)
),
]
for i in range(c_q_count):
tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))
tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main(), debug=settings.DEBUG)
不使用协程的:
def sync_read_file():
af = Path("a.txt").open(encoding="utf8")
bf = Path("b.txt").open(encoding="utf8")
with Session() as session:
while 1:
if af:
try:
text = af.readline()
context = parse_text(text)
sync_write_db(session, context, AModel)
except IOError:
af.close()
af = None
if bf:
try:
text = bf.readline()
context = parse_text(text)
sync_write_db(session, context, BModel)
except IOError:
bf.close()
bf = None
if not af and not bf:
return
def sync_write_db(session, context, model):
info, obj = None, None
try:
if context["info"]:
info = session.execute(
select(Info).filter(
Info.attr == context["info"]
)
)
info = info.scalars().one_or_none()
if not info:
info = Info(attr=context["info"])
session.add(info)
if context["header"]:
obj = session.execute(
select(model).filter(model.info == context["info"]))
obj = obj.scalars().one_or_none()
if not obj:
obj = model(info=context["info"])
session.add(obj)
if obj or info:
if info not in obj.info:
obj.info.append(info)
session.add(obj)
session.commit()
except Exception as e:
session.rollback()
raise e
if __name__ == '__main__':
sync_read_file()
这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
不知道是我协程的用法有问题?还是说有别的什么原因?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.