mitmproxy 解密插件(其实没有解密算法)

2017-09-01 18:36:14 +08:00
 yivanus
"""
mitmproxy 解密插件(其实没有写具体的解密算法),具体解密算法需要单独实现,
alldecrypt 函数中根据 host 和 what 来判断需要使用的具体的解密方法
-q 选项参数会开启自动解密,当满足过滤条件时自动解密,结果显示在 eventlog 中,指令 e 显示 eventlog
根据 host 设置过滤条件,例子 ~d 127.0.0.1
usage: mitmproxy -s mydecrypt.py -q 开启自动解密

"""
import binascii
import json

import mitmproxy
from mitmproxy import contentviews
from mitmproxy import ctx
from mitmproxy import flowfilter
from mitmproxy.contentviews import base
from mitmproxy.http import HTTPFlow
from mitmproxy.tools.console import flowview


# ============举个例子=============
def decrespone(data):
    # 解密 response
    return "decres<< " + "需要实现解密算法" + data


def decrequest(data):
    # 解密 request
    return "decreq>> " + "需要实现解密算法" + data


# ==============结束===============


def alldecrypt(host="127.0.0.1", data=None, what=0):
    """
    统一解密入口
    :param host: 接口的 host 地址
    :param data: 需要解密的数据
    :param what: 0 客户端数据 1 服务端数据
    :return: 返回解密后的数据
    """
    # TODO 根据 host 判断需要使用的解密算法,根据 what 判断解密的是客户端数据或者服务端数据
    # 下面一组数据是为测试用的
    data = "这是假的数据为了测试"
    # 存放需要匹配的 host 与对应的解密方法名称
    sks = {("127.0.0.1", 1, "decrespone(data)"),
           ("127.0.0.1", 0, "decrequest(data)")}
    for s in sks:
        # host 与 what 同时匹配时得到 解密方法,可能解密客户端和服务端使用不同的算法
        if s[0] == host and s[1] == what:
            func = s[2]
    if func is not None:
        return eval(func)
    else:
        return "没有匹配的 host 数据"


class Mydecrypt(base.View):
    """
    插件 ui,用于显示解密的请求数据或者服务器返回数据
    """
    name = "MyDecrypt"
    prompt = ("decrypt", "d")

    def __call__(self, data, **kwargs):
        # 获取当前 http 的信息传给 统一解密函数
        what = curflow.getwhat()
        host = curflow.gethost()
        descypt = alldecrypt(host, data, what)
        title = "解密"
        if what == 0:
            title += "请求数据"
        else:
            title += "返回数据"
        return title, base.format_text(descypt)


class Currentflow:
    """
    用于保存当前的 http 数据,每次通过 ui 切换时都会更新最新的 http 数据
    what = 0 表示当前是 request
    what = 1 表示当前是 response
    host 当前请求的 host 地址
    """

    def __init__(self):
        self.what = 0
        self.host = ""

    def updatewhat(self, what):
        self.what = what

    def updatehost(self, host):
        self.host = host

    def gethost(self):
        return self.host

    def getwhat(self):
        return self.what


curflow = Currentflow()
mydecrypt = Mydecrypt()


def mylog_tier(level):
    # 修改 warn 类型的值小于 error,只有 warn 类型的日志会显示
    return dict(error=0, warn=-1, info=2, debug=3).get(level)


def myview_request(self):
    # 更新当前显示的 host 以及 data 类型为 request
    curflow.updatewhat(0)
    curflow.updatehost(self.flow.request.host)
    return self.conn_text(self.flow.request)


def myview_response(self):
    # 更新当前显示的 host 以及 data 类型为 response
    curflow.updatewhat(1)
    curflow.updatehost(self.flow.request.host)
    return self.conn_text(self.flow.response)


class Myaddon:
    """
    插件主体,实现对插件 ui 的管理,处理请求的实时解密
    """

    def __init__(self):
        self.filter = None

    def configure(self, options, updated):
        # 保存最新的 过滤条件
        self.filter = options.filter

    def response(self, f: HTTPFlow):
        """
        全部 response 都会经过这里的处理
        :param f: 当前正在处理的 http
        :return:
        """
        if self.filter is None or self.filter == "":
            pass
        else:
            # 判断当前过滤条件是否合法
            flt = flowfilter.parse(self.filter)
            if not flt:
                pass
            else:
                if flowfilter.match(self.filter, f):
                    # 显示当前匹配的数据
                    ctx.log.warn("返回数据  Raw:<< " + str(binascii.b2a_hex(f.response.content)))
                    if ctx.master.options.verbosity == 0:
                        # 自动解密
                        ctx.log.warn("自动解密返回数据 >> ")
                        ctx.log.warn(alldecrypt(f.request.host, f.response.content, 1))
                else:
                    pass

    def request(self, f: HTTPFlow):
        """
        全部 request 都会经过这里的处理
        :param f: 当前正在处理的 http
        :return:
        """
        if self.filter is None or self.filter == "":
            pass
        else:
            # 判断当前过滤条件是否合法
            flt = flowfilter.parse(self.filter)
            if not flt:
                pass
            else:
                if flowfilter.match(self.filter, f):
                    # 显示当前匹配的数据
                    ctx.log.warn("请求数据 Raw:>> " + str(binascii.b2a_hex(f.request.content)))
                    if ctx.master.options.verbosity == 0:
                        # 自动解密
                        ctx.log.warn("自动解密请求数据 >> ")
                        ctx.log.warn(alldecrypt(f.request.host, f.request.content, 0))
                else:
                    pass

    def start(self):
        """
        处理插件 ui,以及函数修改
        :return:
        """

        oldview = contentviews.get(mydecrypt.name)
        if oldview is not None:
            contentviews.remove(oldview)
        contentviews.add(mydecrypt)

        # 修改原方法,增加保存当前 flow 的功能
        mitmproxy.tools.console.flowview.FlowView.view_response = myview_response
        mitmproxy.tools.console.flowview.FlowView.view_request = myview_request
        # ctx.master.options.verbosity 此变量控制自动解密,0 开启自动解密,默认关闭
        if ctx.master.options.verbosity == 0:
            # 处理解密模式时日志显示,只显示重要信息
            mitmproxy.log.log_tier = mylog_tier
        ctx.log.warn("解密插件加载完成")


def start():
    ctx.log.warn("解密插件开始加载")
    return Myaddon()


def done():
    contentviews.remove(mydecrypt)
3205 次点击
所在节点    分享创造
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/387531

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX