V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
wangyongbo
V2EX  ›  Python

多进程下载网络 拥堵问题咨询

  •  
  •   wangyongbo · 2017-12-05 09:14:21 +08:00 · 4190 次点击
    这是一个创建于 2546 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我需要下载约 10 万个小文件(每个 1M 左右。)

    写了一个脚本, 开 8 个进程,每个进程用 10 个线程下载。

    问题是: 无论如何修改进程,线程数,下载速度都没有太大变化,每秒 4 个左右。 而且本机的网络会很卡,打开网页很慢,,ping 192.168.1.1 (网关) 都要几百毫秒 。 这是为什么? 应该怎么优化? 同路由器链接的其他机器,网络不受影响。 我的机器配置: i7-3667u,256GB SSD, 8G 内存,ubuntu 14.04 64 位系统

    64 bytes from 192.168.1.1: icmp_seq=372 ttl=64 time=210 ms
    64 bytes from 192.168.1.1: icmp_seq=373 ttl=64 time=136 ms
    64 bytes from 192.168.1.1: icmp_seq=374 ttl=64 time=166 ms
    64 bytes from 192.168.1.1: icmp_seq=375 ttl=64 time=139 ms
    64 bytes from 192.168.1.1: icmp_seq=376 ttl=64 time=369 ms
    64 bytes from 192.168.1.1: icmp_seq=377 ttl=64 time=537 ms
    64 bytes from 192.168.1.1: icmp_seq=378 ttl=64 time=536 ms
    64 bytes from 192.168.1.1: icmp_seq=379 ttl=64 time=582 ms
    64 bytes from 192.168.1.1: icmp_seq=380 ttl=64 time=461 ms
    64 bytes from 192.168.1.1: icmp_seq=381 ttl=64 time=363 ms
    64 bytes from 192.168.1.1: icmp_seq=382 ttl=64 time=309 ms
    64 bytes from 192.168.1.1: icmp_seq=383 ttl=64 time=277 ms
    64 bytes from 192.168.1.1: icmp_seq=384 ttl=64 time=312 ms
    64 bytes from 192.168.1.1: icmp_seq=385 ttl=64 time=268 ms
    64 bytes from 192.168.1.1: icmp_seq=386 ttl=64 time=392 ms
    64 bytes from 192.168.1.1: icmp_seq=387 ttl=64 time=400 ms
    64 bytes from 192.168.1.1: icmp_seq=388 ttl=64 time=266 ms
    64 bytes from 192.168.1.1: icmp_seq=389 ttl=64 time=551 ms
    64 bytes from 192.168.1.1: icmp_seq=390 ttl=64 time=382 ms
    64 bytes from 192.168.1.1: icmp_seq=391 ttl=64 time=289 ms
    64 bytes from 192.168.1.1: icmp_seq=392 ttl=64 time=309 ms
    64 bytes from 192.168.1.1: icmp_seq=393 ttl=64 time=445 ms
    64 bytes from 192.168.1.1: icmp_seq=394 ttl=64 time=411 ms
    64 bytes from 192.168.1.1: icmp_seq=395 ttl=64 time=274 ms
    64 bytes from 192.168.1.1: icmp_seq=396 ttl=64 time=262 ms
    64 bytes from 192.168.1.1: icmp_seq=397 ttl=64 time=282 ms
    64 bytes from 192.168.1.1: icmp_seq=398 ttl=64 time=588 ms
    64 bytes from 192.168.1.1: icmp_seq=399 ttl=64 time=614 ms
    64 bytes from 192.168.1.1: icmp_seq=400 ttl=64 time=222 ms
    64 bytes from 192.168.1.1: icmp_seq=401 ttl=64 time=238 ms
    64 bytes from 192.168.1.1: icmp_seq=402 ttl=64 time=335 ms
    64 bytes from 192.168.1.1: icmp_seq=403 ttl=64 time=461 ms
    64 bytes from 192.168.1.1: icmp_seq=404 ttl=64 time=491 ms
    
    #coding=utf8
    
    import threading
    
    import multiprocessing
    import time
    import Queue
    import os
    import re
    import requests
    
    #SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列
    SHARE_Q = multiprocessing.Queue()  #构造一个不限制大小的的队列
    _WORKER_THREAD_NUM = 2   #设置线程个数
    _WORKER_PROCESS_NUM = 8
    
    class MyThread(threading.Thread) :
    
        def __init__(self, func, q) :
            super(MyThread, self).__init__()
            self.func = func
            self.queue = q
    
        def run(self) :
            self.func(self.queue)
    
    def worker(q):
        session = requests.Session()
        while not q.empty():
            url, name = q.get() #获得任务
            print "Processing : ", url, name
            with open(name, 'w') as f:
                f.write(session.get(url).content)
    
    def main() :
        global SHARE_Q
    
    
    for root,dirs,files in os.walk('data'):
        for filespath in files:
            url = os.path.join(root,filespath)
            url = os.path.abspath(url)
            temp = []
            if url.endswith('m3u8'):
                path = os.path.dirname(url)
                new_m3u8_path = os.path.join(path, 'new_video.m3u8')
                content = open(url).read()
                for x in content.split('\n'):
                    if x.startswith('http://'):
                        x = x.split('?')[0]
                        _, name = os.path.split(x)
                        save_path = os.path.join(path, name)
                        temp.append(name)
                        if not os.path.exists(save_path) or os.path.getsize(save_path) == 0:
                            #print 'download', x, save_path
                            #os.system('wget %s -O %s' % (x, save_path))
                            SHARE_Q.put([x, save_path])
                            #with open(save_path, 'w') as f:
                                #f.write(requests.get(x).content)
                    else:
                        temp.append(x)
                with open(new_m3u8_path, 'w') as f:
                    f.write("\n".join(temp))
                #exit()
    
    processs = []
    print 'Queue length', SHARE_Q.qsize()
    
    def func(q, max_thread_num):
        threads = []
        for i in xrange(max_thread_num) :
            thread = MyThread(worker, q)
            thread.start()
            threads.append(thread)
        for thread in threads :
            thread.join()
    
    for i in xrange(_WORKER_PROCESS_NUM):
        p = multiprocessing.Process(target=func, args=(SHARE_Q, _WORKER_THREAD_NUM))
        p.start()
        processs.append(p)
    
    while True:
        time.sleep(10)
        print 'Queue length', SHARE_Q.qsize()
    
    for p in processs:
        p.join()
    
    
    第 1 条附言  ·  2017-12-05 13:47:07 +08:00
    路由器是水星 mw300R ( 300M ), 笔记本通过 wifi 无线连接 路由器。
    估计下载速度也就这样了。
    13 条回复    2017-12-13 18:31:53 +08:00
    shoaly
        1
    shoaly  
       2017-12-05 09:22:14 +08:00   ❤️ 1
    楼主如果只是为了下载程序的话, 研究一下 aria2c, 他支持文件形式的批量导入下载链接的....你只需要给他一个 1w 文件的链接即可....多线程, 多文件同时下载占满带宽不是问题
    nine99
        2
    nine99  
       2017-12-05 09:25:25 +08:00
    带宽没跑满吗?我也差不多的情况,开 5 个线程跑就满了。
    inmyfree
        3
    inmyfree  
       2017-12-05 09:27:04 +08:00
    磁盘 io,CPU 负载,网络流量都不贴下啊
    wangyongbo
        4
    wangyongbo  
    OP
       2017-12-05 09:29:34 +08:00
    带宽没有跑满。 磁盘 io, cpu, 网络流量 都很低,
    realpg
        5
    realpg  
       2017-12-05 10:06:42 +08:00
    在服务器上用 tar 把十万个小文件压缩成一个大文件
    下载下来
    再解压
    wysnylc
        6
    wysnylc  
       2017-12-05 10:43:09 +08:00
    BBR
    zjsxwc
        7
    zjsxwc  
       2017-12-05 10:50:09 +08:00
    我怎么觉得每秒下载速度 4M 很正常....
    wangyongbo
        8
    wangyongbo  
    OP
       2017-12-05 12:18:44 +08:00
    我是要下载 勤思考研的视频。
    每个视频都是一个 m3u8 的文件,每个视频分割成了 200 多个小文件。 需要先把这些小文件下载下来, 再用 ffmpeg 合成一个新文件。

    这些小文件一共有 10 多万。下载了约 7 个小时才下载完成。
    aru
        9
    aru  
       2017-12-05 12:24:05 +08:00
    @wangyongbo
    是不是你用的无线网络下载?
    如果你的网络带宽没问题,可能是网卡跑满了,你换成有线再试试
    sujin190
        10
    sujin190  
       2017-12-05 13:16:15 +08:00
    如果用无线网络的话,普通路由 4M 已经满带宽了
    guyskk0x0
        11
    guyskk0x0  
       2017-12-08 09:52:26 +08:00 via Android
    电脑路由器都重启一下
    linw1995
        12
    linw1995  
       2017-12-08 10:04:59 +08:00
    可以试下用协程,用 asyncio 和 aiohttp 试一下。我用这两个库写过下载 m3u8 的视频的脚本,下载速度很快。
    120
        13
    120  
       2017-12-13 18:31:53 +08:00
    路由器是水星 mw300R ( 300M ), 笔记本通过 wifi 无线连接 路由器。
    能换个好点的路由器吗?
    能换个支持 QoS 的路由器吗。
    能用有线连接吗?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2557 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 02:37 · PVG 10:37 · LAX 18:37 · JFK 21:37
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.