求问 Python 大神,多进程处理文本内数据

2017-01-02 00:11:51 +08:00
 fjhh
我的程序要实现的功能很简单,就是打开一个写有 1-500000 的 test.txt 的文档,数文档中奇数和偶数的个数,并写入一个字典
用多进程的方法,就是把这 500000 个数分成 n 份,每个进程处理 500000/n 个数
总之最后发现, num_of_process 越大,速度越慢,当等于 1 的时候,速度最快。这是为何?按理多进程,时间应变小。程序要做哪方面优化?

受人之托,特来求问。惭愧了,妹子竟然写的如此一手好代码……感谢。

from multiprocessing import pool
import time
import os
import copy
import multiprocessing


labels= {'0': 0, '1': 0}
train_set = 'test.txt'
num_of_process = 2
def statistics(file,label):
num=int(file)
if num%2==0:
label['0']+=1
else:
label['1']+=1

def union_dict(objs):
_keys = set(sum([obj.keys() for obj in objs], []))
_total = {}
for _key in _keys:
_total[_key] = sum([obj.get(_key, 0) for obj in objs])
return _total

def myprocess(data,i):
labelnew=copy.deepcopy(labels)
for afile in data[i*len(data)/num_of_process:(i+1)*len(data)/num_of_process]:
statistics(afile, labelnew)
return labelnew

if __name__=='__main__':
e1 = time.time()
pool = multiprocessing.Pool(processes = multiprocessing.cpu_count())#processes=4 in my mac
result_list=[]
data_train=open(train_set,'r').readlines()
for i in xrange(num_of_process):
#multiprocessing.Process(myprocess(target=myprocess,args=[data_train,i]))
result=pool.apply_async(myprocess,(data_train,i))
result_list.append(result.get())

print result_list

pool.close()
pool.join()
e2 = time.time()
print float(e2 - e1)
6956 次点击
所在节点    Python
18 条回复
ipwx
2017-01-02 00:17:22 +08:00
机械硬盘随机读取需要寻道,速度远低于顺序读取。所以你进程分得越多越慢。

话说现在的程序员都不知道硬盘 IO 是大部分程序的瓶颈了吗?
ipwx
2017-01-02 00:20:45 +08:00
。。。抱歉没看你的程序就回复了。你的程序的问题不在于读写,大概在于进程间数据的拷贝。你的 data_train 在 pool 建立之后才读取的,目测数据要拷贝到子进程里面。

如果是 *nix ,你可以试试把读取数据放到 pool 创建之前,也许有效。实在不行你用 os.fork 。
ipwx
2017-01-02 00:23:46 +08:00
话说最后吐槽一句:你都把文件整个读到内存里面了还分进程个屁啊,统计奇数和偶数明明是 O(n) 的算法,你读取文件也是 O(n) 的操作,干嘛不一个进程直接边读边统计,还要 readlines 读到内存里面做啥。

如果你想要每个进程分别读取部分数据(一开始我看你的题目就是这么猜想的,所以直接回答了一楼),那就会遇到机械硬盘的瓶颈,还是没有意义。

所以结论就是,你这例子根本什么意义都没有,就算进程越多越慢也不能说明任何问题。
likuku
2017-01-02 00:55:26 +08:00
建议可以这样玩:
1.探测当前系统 CPU 核心数 m ,
2.将文件切分成 m 份存硬盘上,根据 m 份文件名生成任务列表,
3.根据任务 /文件名列表创建最大进程数为 j=m 或者 j= m+2 的多进程池
4.进程池发动任务,呼叫任务进程执行(每进程的结果写入独立某 db/某文件),记得一定得丢到后台去跑
4.1 任务进程先检查当前系统里是否正在执行的任务进程 =j or > j (避免超载),若有空闲,则立即执行,否则等待 0.1 秒,循环检查
5.任务进程结束,汇总结果数据
#3 ( j 得实际测试,根据之前玩 freebsd / gentoo 时,编译软件包 编译器推荐使用 cpu 核数 +2 的并行进程数; 但去年按以上思路作过某实用小应用,实际却是在 4 核机器上最大并行开 40 个任务是刚能榨干算力)
likuku
2017-01-02 00:56:53 +08:00
@fjhh "num_of_process 越大,速度越慢,当等于 1 的时候,速度最快" 很怀疑这个运行环境是单核心 /CPU
CosimoZi
2017-01-02 01:16:41 +08:00
日志统计的瓶颈一般都在 io ,多进程没有用
appleorchard2000
2017-01-02 01:36:11 +08:00
看了一下,估计是因为数据总量太小,所以分解的成本超过了带来的收益
eyp82
2017-01-02 07:36:51 +08:00
@appleorchard2000 赞同, 50w 数据, 一个进程都不够吃的
zmj1316
2017-01-02 08:44:22 +08:00
这种 IO 瓶颈的加 CPU 也没用啊
gdsagdada
2017-01-02 09:24:33 +08:00
妹子竟然写的如此一手好代码, no picture you say a egg
gdsagdada
2017-01-02 09:25:26 +08:00
an
yanzixuan
2017-01-02 09:46:06 +08:00
现成的 spark 能干的事情为啥还要自己造轮子?
P0P
2017-01-02 13:44:33 +08:00
python 的解释器还是有全局锁的 https://wiki.python.org/moin/GlobalInterpreterLock ,所以 python 的多线程效率一直不怎么样
raiz
2017-01-02 16:08:01 +08:00
@P0P 不要赖全局锁好不好,明明她用的是多进程。。
最费时的 IO 操作在进程分支之前已经做了,而每个 worker 进程需要的运算又很少,就一个 M 个判断,所以时间主要都拿去做进程空间拷贝,数据拷贝之类的工作了。

让妹子注册个账号好沟通一点哦 : p
22too
2017-01-02 16:28:14 +08:00
你这个不是磁盘 io 问题啊。
首先,你只打开了一次文件,所以这个耗时应该是一个定值。
然后你要的结果是统计,所以你需要把数据分成均等的几份,交给多进程处理,这个也没有问题。
多进程处理的结果,你放到一个 dict 中。这样相当于多个进程修改一个数据,必然会越处理越慢啊。因为一个修改的时候,另外一个必须等待,所以问题在这里。
wind3110991
2017-01-02 18:13:46 +08:00
和楼上说的一样,分布式情境下推荐用 spark 吧
j5shi
2017-01-02 21:45:33 +08:00
@P0P 正解
fjhh
2017-01-04 08:48:41 +08:00
感谢各位,朋友说换至单位 32 核服务器运行单位项目,速度有明显改善。
我猜测大致如 ipwx 和 appleorchard2000 等朋友们解说,数据总量太小,所以分解的成本超过了带来的收益。
再谢感谢大家。祝大家 2017 一切顺利。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/331643

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX