[实用 Py] 从源码文本层面给 Python 模块打“补丁”——记一次对 you-get 进行 Bug 修正、增添功能的“过程”

2020-06-26 13:49:13 +08:00
 ungrown

you-get 单单作为 B 站(哔哩哔哩、bilibili.com )视频下载器还凑合,但是大小问题却不少:

想等正式版修 Bug 加功能遥遥无期,只能着手魔改。虽然还有很多其他的同类工具,然而……

(合着我看中 you-get 的点主要就是 HEVC ?)

好烦,不过好在还有 you-get 。you-get 的毛病最少,而且 you-get 是 python 写的。“这个我能改,而且这个工作量最小(大概)”,我心里这样嘀咕道。

然而 you-get 的软件框架实在谈不上合理,比较乱,模块间耦合太重,负责实际工作的方法函数的代码普遍太长(动不动三四百行一个方法函数,明明还可以细分的)。

这就导致对象层面的 Monkey Patch 实现起来很不划算——我总不能为了改一个三四百行代码的函数,就另外写一个三四百行的函数,然后这俩的区别无非只有那么几行?

最好既保持原版模块的文件不变,同时将其中的源码导入内存,修改之,再将修改后的源码转换成新的模块,并按照原版模块的路径替换掉原版模块。

即,在 main.py 中调用 xxx,并且改变 xxx 中的部分代码(可能是一段,可能是一行,可能只是几个词),但是 xxx.py 这个文件并没有被动手脚。

这其实也是一种 Monkey Patch,因为不会改变原版文件。

实现这个功能的函数很简单(这个不是我自己写的,引用出处在下面代码注释中):

def modify_and_import(module_path: str, code_modifier: str or Callable, package_path: str = None):
    # How to modify imported source code on-the-fly?
    #     https://stackoverflow.com/a/41863728/7966259  (answered by Martin Valgur)
    # Modules and Packages: Live and Let Die!  (by David Beazley)
    #     http://www.dabeaz.com/modulepackage/ModulePackage.pdf
    #     https://www.youtube.com/watch?v=0oTh1CXRaQ0
    spec = importlib.util.find_spec(module_path, package_path)
    if isinstance(code_modifier, str):
        source = code_modifier
    else:
        source = code_modifier(spec.loader.get_source(module_path))
    module = importlib.util.module_from_spec(spec)
    code_obj = compile(source, module.__spec__.origin, 'exec')
    exec(code_obj, module.__dict__)
    sys.modules[module_path] = module
    return module

正如之前所言,这个函数很简单,几个步骤很清楚,我很懒,所以不细说了。

但是用起来,还是有一些诀窍的。这主要取决于场景,如果只是单纯替换一个源代码不长的小模块,直接调用函数就好了;但如果原模块代码很长,需要精确定位替换,或者模块间相互调用的关系比较复杂,或者模块路径很深却又被浅层模块调用……遇到等等这类情况的时候,还是需要多多思考多多尝试,具体问题具体分析的。

个中过程一概不表,只分享最终方案,我懒。

算了,方案也省略,直接上代码,我好懒,代码里额外加了注释作为概述:

# 下面这个函数用于修改 you-get 的 B 站下载模块`you_get.extractors.bilibili`的源码
def code_modify_you_get_bilibili(x: str):
    # `x`是输入的源代码字符串,就是原模块的代码,直接对其进行替换,以此修改源代码的文本
    # 这里用的方法是在要替换的代码的上下文找到特征明显的一段,然后直接 replace

    # 下面是给尚未支持 B 站 4K 的当前版本,加上 4K 相关的视频流信息
    # 其实就是在原来的 dict 对象中加了一项
    x = x.replace('''
    stream_types = [
        {'id': 'flv_p60', 'quality': 116, 'audio_quality': 30280,
         'container': 'FLV', 'video_resolution': '1080p', 'desc': '高清 1080P60'},
''', '''
    stream_types = [
        {'id': 'hdflv2_4k', 'quality': 120, 'audio_quality': 30280,
         'container': 'FLV', 'video_resolution': '2160p', 'desc': '超清 4K'},
        {'id': 'flv_p60', 'quality': 116, 'audio_quality': 30280,
         'container': 'FLV', 'video_resolution': '1080p', 'desc': '高清 1080P60'},
''')
    # 下面也是跟 4K 相关的判断条件,B 站视频流用不同数字 ID 标定不同的格式码率
    # 120 是 4K,112 则是大会员的 1080P+(即较高码率的 1080P30 )
    # ( 160 是大会员 1080P60,这里的 160 不需要判断,所以没有)
    x = x.replace('''
        elif height <= 1080 and qn <= 80:
            return 80
        else:
            return 112
''', '''
        elif height <= 1080 and qn <= 80:
            return 80
        elif height <= 1080 and qn <= 112:
            return 112
        else:
            return 120
''')
    # 下面这段修改了原本代码中的一行打印提示文本,是为了和自己写的命令行工具的选项一致
    x = x.replace('''
                log.w('This is a multipart video. (use --playlist to download all parts.)')
''', r'''
                sys.stderr.write('# multi-part video: use -p to download other part(s)\n')
''')
    # 下面这段修改了下载文件名的格式,原版是视频标题+分 p 子标题
    # 我则是在视频标题+分 p 子标题的基础上,插入了一些有用的元信息:[av 号][BV 号][上传者用户名]
    x = x.replace('''
            # set video title
            self.title = initial_state['videoData']['title']
            # refine title for a specific part, if it is a multi-part video
            p = int(match1(self.url, r'[\?&]p=(\d+)') or match1(self.url, r'/index_(\d+)') or
                    '1')  # use URL to decide p-number, not initial_state['p']
            if pn > 1:
                part = initial_state['videoData']['pages'][p - 1]['part']
                self.title = '%s (P%s. %s)' % (self.title, p, part)
''', '''
            # set video title
            self.title = initial_state['videoData']['title']
            self.title += ' ' + self.get_vid_label() + self.get_author_label()
            # refine title for a specific part, if it is a multi-part video
            p = int(match1(self.url, r'[\?&]p=(\d+)') or match1(self.url, r'/index_(\d+)') or
                    '1')  # use URL to decide p-number, not initial_state['p']
            if pn > 1:
                part = initial_state['videoData']['pages'][p - 1]['part']
                self.title = '%s P%s. %s' % (self.title, p, part)
''')
    # 下面这段是个重点,修改的是原版中`you_get.extractors.bilibili.Bilibili.prepare_by_url`这个方法函数
    # 原版 you-get 对相当多的 B 站视频无法获取大会员的 1080P+、1080P60 等格式
    # 原版这里的逻辑有问题,按下面这样修改后,用到现在没发现异常
    # 究竟是什么原因?我没有彻底搞明白,但肯定与原版代码中`current_quality`和`best_quality`有关
    x = x.replace('''
            # get alternative formats from API
            for qn in [112, 80, 64, 32, 16]:
                # automatic format for durl: qn=0
                # for dash, qn does not matter
                if current_quality is None or qn < current_quality:
''', '''
            # get alternative formats from API
            for qn in [116, 112, 80, 64, 32, 16]:
                # automatic format for durl: qn=0
                # for dash, qn does not matter
                # if current_quality is None or qn < current_quality:
                if True:
''')
    # 下面这段,修改的是原版`you_get.extractors.bilibili.Bilibili.prepare_by_url`的结尾部分
    # 新加一个流程,从已经获取的所有视频流格式中,删除一部分不需要的格式
    # you-get 默认下载最佳画质,虽然可以选择画质,但用的格式名称比较长,不如数字 ID 本身来得方便
    # 所以加了一个`del_unwanted_dash_streams()`,用数字来指定最高画质和需要下载的画质
    # (其实加这个方法,还是因为 Bug 修得不彻底,权宜之计罢了)
    # 顺便一提,这个`del_unwanted_dash_streams`方法不是原版 you-get 代码里自带的
    # 而是在新的`YouGetBilibiliX`类里自定义的方法
    # 而`YouGetBilibiliX`则是继承的魔改版`bilibili.Bilibili`
    # 替换原版代码 -> 调用一个原版没有的方法 -> 魔改版 -> 继承魔改版的新类 -> 在新类中补上这个缺失的方法
    # 回溯套娃,左右横跳!
    x = x.replace('''
    def prepare_by_cid(self,avid,cid,title,html_content,playinfo,playinfo_,url):
''', '''
        self.del_unwanted_dash_streams()

    def prepare_by_cid(self, avid, cid, title, html_content, playinfo, playinfo_, url):
''')
    return x


# 下面这个函数用于修改 you-get 的文件系统模块`you_get.util.fs`的源码
def code_modify_you_get_fs(x: str):
    # 原版为了兼容 VFAT 文件系统,会把文件名里面的方括号替换成圆括号
    # 魔改版不需要这个特性,所以将这两行代码加#注释掉了
    x = x.replace("ord('['): '(',", "#ord('['): '(',")
    x = x.replace("ord(']'): ')',", "#ord(']'): ')',")
    # 下面把最大文件名字符串长度从 80 延长到 200,原版的 80 有点小,200 会不会太大尚不清楚
    x = x.replace('''
    text = text[:80] # Trim to 82 Unicode characters long
''', '''
    text = text[:200] # Trim to 82 Unicode characters long
''')
    return x


# 上面已经导入了原版的`you_get.util.strings`
# 对 python,此时原版的`you_get`及其下级子路径都已经注册在模块名空间中了
# 在此基础上,下面一行代码将模块名空间中的原版的`you_get.util.fs`替换成经过修改的新模块
you_get.util.fs = modify_and_import('you_get.util.fs', code_modify_you_get_fs)
# 接着把`you_get.util.strings.legitimize`这个原版的函数替换成修改后的模块`you_get.util.fs`中的魔改版函数
# 顺便一提,上面的`code_modify_you_get_fs`修改的源码就是`legitimize`这个函数的源码
# 在原版的 you-get 中,`.util.string`从`.util.fs`中导入了`legitimize`这个函数
# `.util.string`又利用已经导入的`legitimize`和其他几个函数,构建了一个`get_filename`函数
# 而`you_get.extractor`和`you_get.common`又都用到了`get_filename`,当然是各自分别从`you_get.util.strings`导入的
# 因此,所以,故而,然则,
# 只要将原版`you_get.util.strings`中的`legitimize`替换成魔改版的`you_get.util.fs`中的`legitimize`函数即可
# 其他从``you_get.util.strings`二次导入这个函数的模块会自动导入已经被替换成魔改版的函数
you_get.util.strings.legitimize = you_get.util.fs.legitimize
# 综上所述,下面这行可以注释掉了
# you_get.extractor.get_filename = you_get.common.get_filename = you_get.util.strings.get_filename
# 下面则是将 B 站下载模块替换成魔改版本,所用的源码替换函数是上面之前提到的`code_modify_you_get_bilibili`
you_get.extractors.bilibili = modify_and_import('you_get.extractors.bilibili', code_modify_you_get_bilibili)


# 搜寻 av 、BV 、AV 、bv 开头的字符串或者整形数,将之变成 B 站视频的 av 嗯号或者 BV 号
def get_vid(x: str or int) -> str or None:
    if isinstance(x, int):
        vid = 'av{}'.format(x)
    elif isinstance(x, str):
        for p in (r'(av\d+)', r'(BV[\da-zA-Z]{10})'):
            m = re.search(p, x, flags=re.I)
            if m:
                vid = m.group(1)
                if vid.startswith('bv'):
                    vid = 'BV' + vid[2:]
                elif vid.startswith('AV'):
                    vid = 'av' + vid[2:]
                break
        else:
            vid = None
    else:
        raise TypeError("'{}' is not str or int".format(x))
    return vid


# YouGetBilibiliX 继承了`you_get.extractors.bilibili.Bilibili`,添加了一些新的功能
# 虽然原版`Bilibili`是被继承的类,但它也是可以调用继承后新加的属性的
class YouGetBilibiliX(you_get.extractors.bilibili.Bilibili):
    def __init__(self, *args, cookies: str or dict = None, qn_max=116, qn_single=None):
        super(YouGetBilibiliX, self).__init__(*args)
        self.cookie = None
        if cookies:
            self.set_cookie(cookies)
        self.qn_max = qn_max
        self.qn_single = qn_single
        self.html = None, None

    # B 站视频的音频流分不同档次,默认选择中档 128kbps 的(音质足够了),也可以强制选择最高音质
    # 低档 30216 码率偏低,30232 约 128kbps,30280 可能是 320kbps 也可能是 128kbps,貌似跟是否 4K 有关,不是特别清楚
    def set_audio_qn(self, qn):
        for d in self.stream_types:
            d['audio_quality'] = qn

    # 更新视频页面的 HTML 文档(超长字符串)
    def update_html_doc(self):
        url, doc = self.html
        if url != self.url:
            url = self.url
            headers = self.bilibili_headers()
            r = requests.get(url, headers=headers)
            doc = html.document_fromstring(r.text)
            self.html = url, doc

    # 设置 cookies,大会员用得
    # `cookie_str_from_dict`和`cookie_str_from_dict`这两个函数另有定义
    # 前者将 cookies 字典变成单字符串,后者负责读取 cookies 文件
    def set_cookie(self, cookies: str or dict):
        if isinstance(cookies, dict):
            c = cookie_str_from_dict(cookies)
        elif isinstance(cookies, str):
            if os.path.isfile(cookies):
                c = cookie_str_from_dict(cookies_dict_from_file(cookies))
            else:
                c = cookies
        else:
            raise TypeError("'{}' is not cookies file path str or joined cookie str or dict".format(cookies))
        self.cookie = c

    def bilibili_headers(self, referer=None, cookie=None):
        if not cookie:
            cookie = self.cookie
        headers = super(YouGetBilibiliX, self).bilibili_headers(referer=referer, cookie=cookie)
        return headers

    # 从 URL 和 HTML 获取 av 号 BV 号
    def get_vid(self):
        url = self.url
        for m in [re.search(r'/(av\d+)', url), re.search(r'/(bv\w{10})', url, flags=re.I)]:
            if m:
                vid = m.group(1)
                if vid.startswith('bv'):
                    vid = 'BV' + vid[2:]
                break
        else:
            vid = None
        return vid

    # [av 号][BV 号]
    def get_vid_label(self, fmt='[{}]'):
        the_vid = self.get_vid()
        label = fmt.format(the_vid)
        if the_vid.startswith('BV'):
            self.update_html_doc()
            _, h = self.html
            canonical = h.xpath('//link[@rel="canonical"]')[0].attrib['href']
            avid = re.search(r'/(av\d+)/', canonical).group(1)
            label += fmt.format(avid)
        return label

    # 上传者( UP 主)用户名
    def get_author(self):
        self.update_html_doc()
        _, h = self.html
        return h.xpath('//meta[@name="author"]')[0].attrib['content']

    def get_author_label(self, fmt='[{}]'):
        return fmt.format(self.get_author())

    # 删除不需要的视频流,限定最高画质,选择下载画质
    def del_unwanted_dash_streams(self):
        format_to_qn_id = {t['id']: t['quality'] for t in self.stream_types}
        for f in list(self.dash_streams):
            q = format_to_qn_id[f.split('-', maxsplit=1)[-1]]
            if q > self.qn_max or self.qn_single and self.qn_single == q:
                del self.dash_streams[f]


# 这是一个任务函数,包装了魔改版的 you-get 的 B 站下载功能,供另外编写的命令行工具调用
def download_bilibili_video(url: str or int,
                            cookies: str or dict = None, output: str = None, parts: list = None,
                            qn_max: int = None, qn_single: int = None, moderate_audio: bool = True, fmt=None,
                            info: bool = False, playlist: bool = False, caption: bool = True,
                            **kwargs):
    ensure_sigint_signal()
    dr = SimpleDrawer(sys.stderr.write, '\n')

    if not output:
        output = '.'
    if not qn_max:
        qn_max = 116
    url = BILIBILI_VIDEO_URL_PREFIX + get_vid(url)

    dr.hl()
    dr.print('{} -> {}'.format(url, output))
    dr.hl()
    bd = YouGetBilibiliX(cookies=cookies, qn_max=qn_max, qn_single=qn_single)

    if info:
        dl_kwargs = {'info_only': True}
    else:
        dl_kwargs = {'output_dir': output, 'merge': True, 'caption': caption}
    if fmt:
        dl_kwargs['format'] = fmt
    if moderate_audio:
        bd.set_audio_qn(30232)

    if playlist:
        bd.download_playlist_by_url(url, **dl_kwargs)
    else:
        if parts:
            base_url = url
            for p in parts:
                url = base_url + '?p={}'.format(p)
                dr.print(url)
                dr.hl()
                bd.download_by_url(url, **dl_kwargs)
        else:
            bd.download_by_url(url, **dl_kwargs)
3074 次点击
所在节点    Python
13 条回复
leafleave
2020-06-26 15:25:32 +08:00
支持
Ljcbaby
2020-06-26 16:06:34 +08:00
厉害!
SingeeKing
2020-06-26 16:18:46 +08:00
为什么不 clone 出来源码然后直接本地安装呢…
ungrown
2020-06-26 16:32:20 +08:00
@SingeeKing #3
因为 fork 副本并不好用:
1. 无法从公共源获取更新
2. 大大增加了自维护的代码量(虽然副本中的绝大部分代码没变,但是总量很大)
3. 与原版和谐共存的成本飙升
darer
2020-06-26 16:35:17 +08:00
好评
ClericPy
2020-06-26 17:39:51 +08:00
想起当年解析 B 站的逆天 CDN 支配的时候了... 老被用户反馈怎么播放这么卡啊~
cheese
2020-06-27 00:31:38 +08:00
@ClericPy #6 不是当年,现在我也时不时抽风,曾经一度以为是移动垃圾宽带的问题,后来才发现的是 b 站的自建垃圾 CDN
ClericPy
2020-06-27 00:40:08 +08:00
@cheese 我今天也抽了... 准备油猴强制改 cdn 了, 不过暂时通过挂野生的代理发现速度还挺快, 撑几天看看再搞

PS: 在老家山东的时候一路秒播, 到了北京各种转圈圈...
ysc3839
2020-06-27 09:29:42 +08:00
@ungrown 那改完了再提交 pull request 不好吗?
ungrown
2020-06-27 10:47:33 +08:00
@ysc3839 #9 那边几百个 PR 等着等着合并,我排到猴年马月?
ysc3839
2020-06-27 23:33:15 +08:00
@ungrown 首先 pull requests 多不代表你提交的就一定很晚才能被合并。
我之前也向 OpenWrt 的软件包仓库提交过 pr,并没有等待很久。目前这个仓库有 76 个 pr https://github.com/openwrt/packages/pulls
再者,即使提交了 pr 要等待很久,个人认为也比你这种方法简单、直观得多。
ungrown
2020-06-28 01:06:54 +08:00
@ysc3839 #11
你的意思是,如果那边合并慢了,你会帮我催他?
或者 OpenWrt 的维护者会秉持着与你一致的信念与理想和你一起帮我催他?
或者因为你用“首先……不代表……”造了一个句,所以世界线因此而收束,因果律因此而变动,神秘的东方力量扭转了 github 服务器的内存比特,从而使我提交的 PR 被自动合并?
再简单直观,那也不是从我的需求角度出发所看到的景象。
简单直观不能修复 bug,简单直观不能增加功能,简单直观不能让我马上就有工具用。
再说你所谓的简单直观其实从一开始就是站在你自己的角度讲的吧?充其量会稍~~~微~~~考虑一下那个项目的作者?
没有考虑我的难处吧?
没有考虑我的感受吧?
没有在乎我的心情吧?
没有在乎我的需要吧?
呐,你自己说,像你这样自私傲慢的评论,我该怎么回复才好呢?
Mayuri
2020-07-01 14:37:30 +08:00
作为一个也为 you-get 提过 PR 的 monkey patcher 能够理解楼主的感受,开源软件 patch 本来就是为了满足自己的需要,提 PR 也不是每个人都应尽的义务。感谢楼主的代码,学习了。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/684849

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX