python 如何并发采集设备信息

2013-07-11 11:03:09 +08:00
 lvii
各位兄台,厂里有个用 snmp 协议采集温度信息的活,需求是这样子:

1. 有 100 多台设备
2. 每隔 10 秒取一次数据
3. 将取到的数据写入数据库

我现在只实现了采集单台设备信息并写入数据库的功能
要并发采集 100 台设备的信息,不知道要怎么解决

使用 time snmpwalk 测试采集单台设置数据,要 real 0m2.269s
现在只写了一个简单的 for 循环轮循发起请求,但这样的同步模式,前五台设备采集好数据,就已经过了 10s 了
就进入了下一个采集时间片。这样明显杯具了

昨天搜了一下 python 并发操作的一些概念:

1. 使用多进程
2. 使用多线程
3. 使用协程

对于多线程,多进程能理解,但是 同步,异步 和 阻塞和非阻塞和协程关系不是很懂
如果在这个需求里面,批量发送 snmp 请求应该是可以异步操作的
但是取回数据之后,要写回数据库,这个是不是就会被阻塞了?

之前没有写过这种并发的例子。各位兄台,可否指点下,我对需求的分析是否正确
可否有简单的实例参考一下。谢谢
7150 次点击
所在节点    Python
14 条回复
reusFork
2013-07-11 11:36:58 +08:00
可以用标准库里的asyncore,或者gevent之类,可以同时进行多个网络io
lvii
2013-07-11 12:02:56 +08:00
对 100 条记录的批量插入数据库操作是阻塞的还是非阻塞的?
lookhi
2013-07-11 13:53:03 +08:00
从解决问题的态度。
起100个线程
likuku
2013-07-11 14:23:52 +08:00
@lvii 建议是每条记录在生成时的时间戳也当成记录内容的一部分,

之后记录导入数据库后,记录产生时刻以记录内的时间戳信息为准,

这样,你记录迟一点/异步 导入数据库也可,但每条记录所记的真实/原始采样时戳都正确。
likuku
2013-07-11 14:27:56 +08:00
@lvii
@lookhi 的「起100个线程」,也是好办法,但设备越来越多,这状况只会更严重。

建议是否可能改当前的“拉数据”的方法为“推数据”,每个设备本地有个小监控设备,每10秒采样,状态可以先缓存本地,之后每分钟(可以加个随机的秒,已避免成DDOS)将数据集中推给搜集中心数据库。
timonwong
2013-07-11 15:35:59 +08:00
这种pull式获取传感器信息的,还是很好做,首先就不用考虑写一个高性能服务器需要做的活,因为这就是个客户端,实在不行在N台机器上开N个客户端分别获取不同组的信息最后统一写入到数据库(也可以写一个任务分配服务器来完成任务派发)。

再就是客户端编写,开N个线程其实也没啥,因为你点数不多(100多个),间隙也比较长(10s),那2s多的过程大部分都耗在传输上了(还有可能是结点部分处理相当相当慢,如果是数据量相当大导致的慢,我的假设没用,这段文本也就对你毫无用途了)。

数据库写入肯定是需要时间的,你可以另外开一个线程/进程/whatever干这种事情,写入速度要看你的数据量,数据量大还有更多的课题要做,几句话肯定说不完。

如果不需要考虑数据恢复的问题,可以在客户端内部维护一个list, 以一定的时间间隙,将这个list整个拿出来, 原来的list引用到另外一个新空list上(还考虑同步问题,暴力法就是用锁,当然锁都是要你命一万的),然后把整个list的数据bulk insert到数据库里。另外可以用message queue来完成数据写入工作:客户端发送采集到的数据到mq里(生产者), 一个单独的进程获取mq的数据,完成写入工作(消费者),这种模型简单不易出错。
BOYPT
2013-07-12 13:04:32 +08:00
你的数据库不会慢到写入也要等几秒吧,不然阻塞一下有什么要紧的。
BOYPT
2013-07-12 13:07:23 +08:00
当然要是你数据库真那么慢也没关系啊,专门一个线程负责管理写入队列,其他收集者拿到数据后塞给这个写入者,就完了吧……标准的教科书式生产者消费者模型。
lvii
2013-07-15 10:54:00 +08:00
各位兄台,我的需求是这样的:

1. 一共有 100 多台设备,需要每隔 10s 获取一遍信息
2. 主动监控,因为要监控的设备没法编程,不支持 SNMP 的 trap,只支持 snmpwalk 获取 OID 信息
3. 每个设备获取 7 个 OID 信息,其中最后一个最耗时,需要 2.2~2.4s 左右
4. 将返回的信息格式化一下,插入 mysql 数据库

所以,@likuku @timonwon 兄,被动监控,没法搞啊

我用 gevent 模块测试了一下获取一台机器的信息:

oid_list=['.1.3.6.1.4.1.30966.4.2.1.22',
'.1.3.6.1.4.1.30966.4.2.1.23',
'.1.3.6.1.4.1.30966.4.2.1.24',
'.1.3.6.1.4.1.30966.4.2.1.25',
'.1.3.6.1.4.1.30966.4.2.1.7',
'.1.3.6.1.4.1.30966.4.2.1.13',
'.1.3.6.1.4.1.30966.4.2.1.1' ]

def get_snmp_info(oid,ip):
'''
net-snmp 官方 python 接口:
http://www.net-snmp.org/wiki/index.php/Python_Bindings
https://net-snmp.svn.sourceforge.net/svnroot/net-snmp/trunk/net-snmp/python/README
'''
return netsnmp.snmpwalk(oid, Version=2, DestHost=ip, Community="public")

start_time=time.time()
threads = []

for oid_text in oid_list:
threads.append(gevent.spawn(get_snmp_info,oid_text,desthost))
gevent.run()
#gevent.joinall(threads)

print time.time()-start_time


问题:

耗时:0.000214099884033

>>> threads[0].value ## spawn() 后,函数并没有在后台执行,没法获取返回值
>>> threads[0].run() ## 需要手动调用 run() 方法进行执行
>>> threads[0].value
('23',)

gevent.spawn() 的 thread 没有执行,需要手动调用 threads[x].run() 方法来触发
这样又和取消 #gevent.joinall(threads) 的注释,效果一样了,不是异步的模式
和同步模式没有区别

耗时:4.58578181267
测试 2台设备 gevent.joinall(threads) 耗时 4.5s 这样子如果并发采集 100
多台,时间不够用

gevent.joinall() 手册描述:gevent.joinall() waits for them to complete
使用 joinall() 方法,就要等待所有的 spawn 的调用都执行完,这种等待是不是一种阻塞
lvii
2013-07-15 10:55:14 +08:00
v2ex 代码格式有问题,我代码贴到这里了:http://ix.io/6F5
timonwong
2013-07-15 11:26:11 +08:00
@lvii 我没有说被动啊,你的需求其实是客户端啊,因为是你主动请求设备的数据。。。
最后那个OID那里,耗时长的原因是传输速度慢而不是数据量极大吧,数据量极大写起来就比较麻烦,如果只是传输速度慢(同时数据量也小)完全可以依赖上下文切换切到其它任务去。

还有两点:
1. 没有必要对一台设备的7次请求都分别做成线程, 直接顺序请求7次就可以了,单台设备的请求封在一个线程内(一个线程获取设备的7个OID)
2. 还有如果你的joinall()调用时机也不对,joinall()就是等待所有线程完成,你对单台设备的7次请求就等待其完成,跟做成顺序执行7次(无线程)没有什么区别,反而可能由于上下文切换速度还慢点,如果不需要同步,开了之后不用等待,只有程序退出的时候可能你需要所有操作完成join一下。
heqing
2013-07-15 11:55:02 +08:00
单取一台设备信息需要多少时间?
lvii
2013-07-15 13:59:03 +08:00
@timonwong 兄,这7个 OID 中,前 6个都是毫秒级别,最后一个 OID 耗时 2s
因为这个 OID 需要遍历设备的所有信息,我需要的是其中的一段。

about 1:
@timonwong 兄,gevent.spawn() 开启的是一个线程?
我不知道怎么把对 7个 OID 的请求封装在一个线程里面。
net-snmp.snmpwalk() 方法每次只能调用一个 OID 不能批量一次性调用多个 OID

about 2:
我想如果买个设备的信息采集可以控制在 2.4s 并且每个设备可以并行采集是没问题的。
我 joinall() 的目的是要获取执行的返回结果,不然我后面的格式化数据和保存数据的逻辑
没法写了。

起始我想要的模型:

采集设备1 (7个OID) 2.4s 返回结果,保存到字典
采集设备2 (7个OID) 2.4s 返回结果,保存到字典
...
采集设备n (7个OID) 2.4s 返回结果,保存到字典

如果上面的采集,可以并行异步进行:

最短时间 = 单台采集时间 + for轮循时间

@heqing 兄,一台设备平均 2.4s
timonwong
2013-07-15 14:57:14 +08:00
@lvii
我先建议你换成多线程模型,net-snmp的python绑定与gevent的兼容性未知,当然net-snmp的python绑定是否完全线程安全也未知,如果不是线程安全,也好做,使用多进程模型即可。

相当基本过程(多线程), 伪代码:

def worker_proc(equip_id, q):

oids = oids_for_equip_id(equip_id)
for oid in ords:
snmp_walk(oid)

data = pre_process_data()
q.put(data)

end


import threading

# Consumer queue
q= Queue()
def data_consumer(q):
# 这里可以考虑维护个buffer_list, 维持一个超时时间
# 在超时或者buffer_list的大小超过阈值时, bulk insert到数据库
data = q.get()
save_to_db(data)


# bookkeeping
threads = []
# Workers
for equip_id in equips:
t = threading.Thread(target=worker_proc, args=(equip_id,q))
t.start()
threads.append(t)

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/75439

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX