求数组的算术平均,但参数是一个数组,怎么高效实现?

2018-04-06 10:31:39 +08:00
 dwjgwsm

import numpy as np

a=np.arange(10)

print(a)

b=np.array([2,4,3,5,2,1,4,5,3,2])

c=MA(a,b)

要求输出:
[nan nan 1.  nan 3.5 5.  4.5 5.  7.  8.5]

也就是说,数组每个点的算术平均所使用的参数都可能是不同的.我现在用的办法是 map,速度很慢,想破脑袋也想不到更好的办法,不知道有没有什么高效的实现办法?
4635 次点击
所在节点    Python
36 条回复
Kirscheis
2018-04-07 10:12:33 +08:00
@dwjgwsm 我上面说的办法实际上就是线段树稍微改了一下,如果你求和不需要并行加速,那直接用前缀和是最简单的,直接利用 sun(i, j) = sa(j) - sa(i) 的性质就好了

老哥你该复习一下算法了。。
wingkou
2018-04-07 10:25:00 +08:00
@Kirscheis 刚开始没怎么看你的 dp😂,后来看了下,感觉有点奇怪,dp 不是讲状态转移的吗?您的状态转移方程是

`dp [d[i]] [d[i+1]]=sum(a[d[i]:d[i+1]])`?

感觉不是状态转移吧?数组 dp 只在等式左侧出现。感觉只是个优化吧?
另外前缀和除了预处理之外,后面的能并行。
Kirscheis
2018-04-07 10:35:22 +08:00
@wingkou 是的,这只是分治,一开始想写 dp 的,后来想了想不好并行改了,不过那个是我昨晚喝了酒回来写的,又赶着打游戏。。写得很乱
楼主的题目有点特殊,因为他的区间右端点实际上把集合完全分成单个元素了,所以求和的求和也会有大量重合,应该加上 dp[i][k] = dp[i][j] + dp[j][k]的
不好意思,变量名乱写误导了
wingkou
2018-04-07 10:47:28 +08:00
@Kirscheis 啊,对耶!“他的区间右端点实际上把集合完全分成单个元素了”,所以你的`d = sorted(c)`去重之后一定是 range(len(a))😂算是负优化了😂刚刚还没注意到这点。
ipwx
2018-04-07 11:37:22 +08:00
@Kirscheis 老哥,这个问题的另一个难点是如何用 Python 高效地实现楼主的需求。

这么说吧,用 Python 裸写一个 for (n): c[i] = a[i] + b[i] 要比 NumPy 写 c = a + b 至少慢 20 倍。没办法,NumPy 用 C 写的,而且有些操作还有 Intel SIMD 指令集加成,比不过的。

NumPy 的基本操作大致有:任意维张量的(每个对应元素)加减乘除、比较(判等和大小,输出布尔向量),布尔向量当做整形向量参与运算,任意维两个张量后两维、前两维的点积(这个 carefully 优化过,相信是考虑过指令集和 cache line 之类的各种问题的)。

由于这个限制,Python + NumPy 写程序的时候通常会“多做一些运算”,以求更短的执行时间。比如:

flag = (a > b)
c = flag * a + (1 - flag) * b

换成 C 语言你相信这个比 for 循环更快?
- - - -

昨天我就看到这个问题了,但是恕我愚钝,我想不出利用 NumPy 在 Python 里面优雅地解决这个问题的方案。
ipwx
2018-04-07 11:40:20 +08:00
@dwjgwsm 我的看法是你用 @Kirscheis 的思路,上 Cython 吧。。。
RecursiveG
2018-04-07 12:47:45 +08:00
希望楼主解释一下你的“用 map 一个子函数来实现的”具体是怎么实现的,至少我没看出来。
然后你算法的时间复杂度是多少?你期望的算法时间复杂度是多少?你的数据量有多大?
是只需要算法优化,还是需要考虑别的因素?(并行 /GPU etc.)

普通算法有前缀和 O(n)或者线段树 O(nlogn)(本质都是区间和问题)
@Kirscheis 有 O(nlogn)的可以并行的算法( taken as-is )
或者直接根据 b 数组构造一个 n*n 的矩阵 Q 使得 c=aQ 然后用矩阵乘法。
dwjgwsm
2018-04-07 15:17:01 +08:00
@RecursiveG 我先把我写的函数放上来吧,当时我觉得没必要贴一坨代码.刚从外面回来,楼上的各位回复还没仔细看.贴完了,在慢慢看
dwjgwsm
2018-04-07 15:25:09 +08:00
def MA(npdata,narr): #narr 是一个和 npdata 等长的 ndarray 数组
L=len(npdata)
j=np.arange(1,L+1) #当时还不知道有 enumerate 这个东西,所以传了一个定位数组进去(从 aijam 那儿学了一招,谢谢!)
def IMA(n, k):
if k < n or n<0 or np.isnan(n):
return np.nan
return np.mean(npdata[k - int(n):k])

return np.array(list(map(IMA, narr, j)))

其实就是和 aijam 一样的遍历算法. 我待会儿试试用列表推导和 map 哪个快,之前网上说应该是 map 快.
之前没有装 cython.昨天买了一个大硬盘重装了系统,把 cython 装上去了(因为 vs 的缘故).准备上 cython 看看.回头报告结果
dwjgwsm
2018-04-07 15:34:47 +08:00
@RecursiveG 数据长度几百,但是调用频率特别高,还有其他十几个类似的函数,所以总体运行下来慢的要死,必须想办法从各种角度优化
jmc891205
2018-04-07 23:31:11 +08:00
纠结列表推导和 map 哪个快毫无意义
就算上了 C,前缀和和线段树也是比遍历快的多的算法
dwjgwsm
2018-04-08 09:36:17 +08:00
@Kirscheis 谢谢你讲的这么详细,我不是科班出身啊,没学过这些算法.不过大家说的前缀和,线段树,dp,我网上大致看了一下.大体的思路就是把数据分割小,避免重复计算.回头去买本算法的书学一下吧.我觉得作用可能不会很大,因为我的情况不是数据长度很大. 先说一下测试结果:
测试了长度为 10 万的数据,
1.在 python 中,map 和列表推导速度基本相同,大概是 2 秒
2.在参数检查中 not np.isnan()会将速度降低 40%左右,这是之前没注意到的
3.对比 cython 和 python,cython 列表推导只比 python 快 12%(想试一下 map,结果还不知道怎么实现,cython 中好像无法实现子函数,编译报错)
ipwx
2018-04-08 09:49:42 +08:00
@dwjgwsm
1、Cython 可以写函数。
2、Cython 最好不要用 map、列表推导之类的,直接用 for。
3、Cython 性能提升的关键是用上 C 一样的指针或者数组直接读写,不要用 Python 对象。
4、Cython 支持对 NumPy 数组进行直接读写。当然,不是直接用 NumPy 数组对象,你得查文档。
dwjgwsm
2018-04-08 12:33:52 +08:00
@ipwx 试过了,基本上看不出提升效果.

import numpy as np
cimport numpy as np
cimport cython
np.seterr(invalid='ignore')
cdef int CMAX(int x,int y):
return x if x>y else y
cdef int CMIN(int x,int y):
return y if x>y else x

DTYPE1 = np.float
DTYPE2 = np.int
ctypedef np.float_t DTYPE1_t
ctypedef np.int_t DTYPE2_t
@cython.boundscheck(False)
@cython.wraparound(False)
def MA(np.ndarray[DTYPE1_t, ndim=1] npdata,np.ndarray[DTYPE2_t, ndim=1] n):
cdef int L=npdata.shape[0]
cdef int i=0
cdef np.ndarray res=np.zeros(L)
for i from 0<=i<L:
try:
res[i]=np.mean(npdata[i + 1 - n[i]: i + 1])
except:
res[i] =np.nan
return res
jmc891205
2018-04-08 22:45:36 +08:00
@dwjgwsm 你对每一个位置都用 np.mean 来求平均值 最坏情况下要做多少次多余的加法你知道不?
ruoyu0088
2018-04-10 07:09:34 +08:00
用一个累加数组,可以提高计算速度:

import numpy as np
a=np.arange(10)
b=np.array([2,4,3,5,2,1,4,5,3,2])

ac = np.r_[0, np.cumsum(a)]

be = np.arange(1, b.shape[0] + 1)
bs = be - b
res = (ac[be] - ac[bs]) / b

res[bs<0] = np.nan

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/444619

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX