多进程下载网络 拥堵问题咨询

2017-12-05 09:14:21 +08:00
 wangyongbo

我需要下载约 10 万个小文件(每个 1M 左右。)

写了一个脚本, 开 8 个进程,每个进程用 10 个线程下载。

问题是: 无论如何修改进程,线程数,下载速度都没有太大变化,每秒 4 个左右。 而且本机的网络会很卡,打开网页很慢,,ping 192.168.1.1 (网关) 都要几百毫秒 。 这是为什么? 应该怎么优化? 同路由器链接的其他机器,网络不受影响。 我的机器配置: i7-3667u,256GB SSD, 8G 内存,ubuntu 14.04 64 位系统

64 bytes from 192.168.1.1: icmp_seq=372 ttl=64 time=210 ms
64 bytes from 192.168.1.1: icmp_seq=373 ttl=64 time=136 ms
64 bytes from 192.168.1.1: icmp_seq=374 ttl=64 time=166 ms
64 bytes from 192.168.1.1: icmp_seq=375 ttl=64 time=139 ms
64 bytes from 192.168.1.1: icmp_seq=376 ttl=64 time=369 ms
64 bytes from 192.168.1.1: icmp_seq=377 ttl=64 time=537 ms
64 bytes from 192.168.1.1: icmp_seq=378 ttl=64 time=536 ms
64 bytes from 192.168.1.1: icmp_seq=379 ttl=64 time=582 ms
64 bytes from 192.168.1.1: icmp_seq=380 ttl=64 time=461 ms
64 bytes from 192.168.1.1: icmp_seq=381 ttl=64 time=363 ms
64 bytes from 192.168.1.1: icmp_seq=382 ttl=64 time=309 ms
64 bytes from 192.168.1.1: icmp_seq=383 ttl=64 time=277 ms
64 bytes from 192.168.1.1: icmp_seq=384 ttl=64 time=312 ms
64 bytes from 192.168.1.1: icmp_seq=385 ttl=64 time=268 ms
64 bytes from 192.168.1.1: icmp_seq=386 ttl=64 time=392 ms
64 bytes from 192.168.1.1: icmp_seq=387 ttl=64 time=400 ms
64 bytes from 192.168.1.1: icmp_seq=388 ttl=64 time=266 ms
64 bytes from 192.168.1.1: icmp_seq=389 ttl=64 time=551 ms
64 bytes from 192.168.1.1: icmp_seq=390 ttl=64 time=382 ms
64 bytes from 192.168.1.1: icmp_seq=391 ttl=64 time=289 ms
64 bytes from 192.168.1.1: icmp_seq=392 ttl=64 time=309 ms
64 bytes from 192.168.1.1: icmp_seq=393 ttl=64 time=445 ms
64 bytes from 192.168.1.1: icmp_seq=394 ttl=64 time=411 ms
64 bytes from 192.168.1.1: icmp_seq=395 ttl=64 time=274 ms
64 bytes from 192.168.1.1: icmp_seq=396 ttl=64 time=262 ms
64 bytes from 192.168.1.1: icmp_seq=397 ttl=64 time=282 ms
64 bytes from 192.168.1.1: icmp_seq=398 ttl=64 time=588 ms
64 bytes from 192.168.1.1: icmp_seq=399 ttl=64 time=614 ms
64 bytes from 192.168.1.1: icmp_seq=400 ttl=64 time=222 ms
64 bytes from 192.168.1.1: icmp_seq=401 ttl=64 time=238 ms
64 bytes from 192.168.1.1: icmp_seq=402 ttl=64 time=335 ms
64 bytes from 192.168.1.1: icmp_seq=403 ttl=64 time=461 ms
64 bytes from 192.168.1.1: icmp_seq=404 ttl=64 time=491 ms
#coding=utf8

import threading

import multiprocessing
import time
import Queue
import os
import re
import requests

#SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列
SHARE_Q = multiprocessing.Queue()  #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 2   #设置线程个数
_WORKER_PROCESS_NUM = 8

class MyThread(threading.Thread) :

    def __init__(self, func, q) :
        super(MyThread, self).__init__()
        self.func = func
        self.queue = q

    def run(self) :
        self.func(self.queue)

def worker(q):
    session = requests.Session()
    while not q.empty():
        url, name = q.get() #获得任务
        print "Processing : ", url, name
        with open(name, 'w') as f:
            f.write(session.get(url).content)

def main() :
    global SHARE_Q


for root,dirs,files in os.walk('data'):
    for filespath in files:
        url = os.path.join(root,filespath)
        url = os.path.abspath(url)
        temp = []
        if url.endswith('m3u8'):
            path = os.path.dirname(url)
            new_m3u8_path = os.path.join(path, 'new_video.m3u8')
            content = open(url).read()
            for x in content.split('\n'):
                if x.startswith('http://'):
                    x = x.split('?')[0]
                    _, name = os.path.split(x)
                    save_path = os.path.join(path, name)
                    temp.append(name)
                    if not os.path.exists(save_path) or os.path.getsize(save_path) == 0:
                        #print 'download', x, save_path
                        #os.system('wget %s -O %s' % (x, save_path))
                        SHARE_Q.put([x, save_path])
                        #with open(save_path, 'w') as f:
                            #f.write(requests.get(x).content)
                else:
                    temp.append(x)
            with open(new_m3u8_path, 'w') as f:
                f.write("\n".join(temp))
            #exit()

processs = []
print 'Queue length', SHARE_Q.qsize()

def func(q, max_thread_num):
    threads = []
    for i in xrange(max_thread_num) :
        thread = MyThread(worker, q)
        thread.start()
        threads.append(thread)
    for thread in threads :
        thread.join()

for i in xrange(_WORKER_PROCESS_NUM):
    p = multiprocessing.Process(target=func, args=(SHARE_Q, _WORKER_THREAD_NUM))
    p.start()
    processs.append(p)

while True:
    time.sleep(10)
    print 'Queue length', SHARE_Q.qsize()

for p in processs:
    p.join()

4224 次点击
所在节点    Python
13 条回复
shoaly
2017-12-05 09:22:14 +08:00
楼主如果只是为了下载程序的话, 研究一下 aria2c, 他支持文件形式的批量导入下载链接的....你只需要给他一个 1w 文件的链接即可....多线程, 多文件同时下载占满带宽不是问题
nine99
2017-12-05 09:25:25 +08:00
带宽没跑满吗?我也差不多的情况,开 5 个线程跑就满了。
inmyfree
2017-12-05 09:27:04 +08:00
磁盘 io,CPU 负载,网络流量都不贴下啊
wangyongbo
2017-12-05 09:29:34 +08:00
带宽没有跑满。 磁盘 io, cpu, 网络流量 都很低,
realpg
2017-12-05 10:06:42 +08:00
在服务器上用 tar 把十万个小文件压缩成一个大文件
下载下来
再解压
wysnylc
2017-12-05 10:43:09 +08:00
BBR
zjsxwc
2017-12-05 10:50:09 +08:00
我怎么觉得每秒下载速度 4M 很正常....
wangyongbo
2017-12-05 12:18:44 +08:00
我是要下载 勤思考研的视频。
每个视频都是一个 m3u8 的文件,每个视频分割成了 200 多个小文件。 需要先把这些小文件下载下来, 再用 ffmpeg 合成一个新文件。

这些小文件一共有 10 多万。下载了约 7 个小时才下载完成。
aru
2017-12-05 12:24:05 +08:00
@wangyongbo
是不是你用的无线网络下载?
如果你的网络带宽没问题,可能是网卡跑满了,你换成有线再试试
sujin190
2017-12-05 13:16:15 +08:00
如果用无线网络的话,普通路由 4M 已经满带宽了
guyskk0x0
2017-12-08 09:52:26 +08:00
电脑路由器都重启一下
linw1995
2017-12-08 10:04:59 +08:00
可以试下用协程,用 asyncio 和 aiohttp 试一下。我用这两个库写过下载 m3u8 的视频的脚本,下载速度很快。
120
2017-12-13 18:31:53 +08:00
路由器是水星 mw300R ( 300M ), 笔记本通过 wifi 无线连接 路由器。
能换个好点的路由器吗?
能换个支持 QoS 的路由器吗。
能用有线连接吗?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/412023

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX