异步与多 worker 的问题

2017-10-24 11:08:57 +08:00
 ray1888

现在在写一个服务端的程序,主要有三个操作,1.是等待网络请求接收数据(使用 socket 而不是 http ),2.是对数据转格式,第三个把整理好格式的数据写入数据库中。现在是想把 1,2,3 都写到一个协程中,然后开多个 worker 来进行处理,怎样可以使协程开出多个 worker 来应对多并发呢?目前打算使用 asyncio,如果不支持可以考虑 tornado,求各位看看有什么解决方法

1659 次点击
所在节点    Python
2 条回复
hcnhcn012
2017-10-24 11:43:36 +08:00
Twisted
hook923
2017-10-28 00:28:59 +08:00
我有个类似的做法,我是这样解决的。
因为我是多个线程共享一个数据库连接,每个线程都 execute,最后一起提交数据库。因此我在 savedata 中加了个锁

import threading
lock = threading.Lock()

from concurrent.futures import ThreadPoolExecutor

max_workers=64
sock_pool =ThreadPoolExecutor(max_workers=max_workers) #注意这 3 个 max_workers 不必都相同的
chgdata_pool = ThreadPoolExecutor(max_workers=max_workers)
chgdata_future = []
savedata_pool = ThreadPoolExecutor(max_workers=200)
savedata_future = []

def sock(参数):
接收 shock 数据的代码
chgdata_future.append(chgdata_pool.submit(chgdata,参数) ) ##异步委托一个清洗数据的函数 chgdata
其它代码
def chgdata(参数):
清洗数据的代码
savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托一个保存数据的函数 savedata
其它代码
def savedata(参数):
保存语句生成
lock.acquire() #加个互斥锁
保存到数据库
lock.release() #释放锁
其它代码

if __name__ == '__main__':
执行 sock()之前的代码
sock_future = sock_pool.submit(sock,参数) for 参数 in 列表] ## 多个 sock 接收数据
for f in sock_future:
f.result()
for f in chgdata_future:
f.result()
for f in saveda'ta_future:
f.result()

conn.commit() #。这步你可以视你的实际需求放在 savadata 中。

每个 sock 接收数据后传递给 chgdata,不必等待 chgdata。每个 chgdata 清洗数据后传递给 savedata,不必等待 savedata。
这应该是楼主想要的效果

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/400171

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX