``` python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import copy
import json
import logging
import pymysql
import itertools
import pandas as pd
import multiprocessing as mp
PWD = os.path.dirname(os.path.realpath(__file__))
LOGPATH = os.path.join(PWD, './info.log')
logging.basicConfig(level=
logging.INFO,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s',
filename=LOGPATH)
class FuncTactic:
def __init__(self, task_id):
self.task_id = task_id
def tactic(self):
case_id_path = "/Users/xxxx/Downloads/" + str(self.task_id) + ".log"
with open(case_id_path, 'r', encoding='utf-8') as content:
result_list = json.load(content)
content.close()
case_id_func = {} # 这个是 id 对应的方法变更
for i in result_list:
case_id_func[str(i.get('c_id'))] = i.get('c_c_func_origin_list')
conn = self.connect_db("localhost", 3306, "root", "root1234", "code_trees")
table_name = "android_code_tree"
sql = "SELECT case_id, trees FROM %s where case_id in (%s)" % (table_name, str(list(case_id_func.keys()))[
1:-1])
covs_pd = pd.read_sql(sql, conn)
funcs_dict = {} # 这个是 id 对应的关系树
for row in covs_pd.itertuples():
id = int(getattr(row, "case_id"))
funcs = getattr(row, "trees")
if funcs is None or len(funcs) < 1:
continue
else:
funcs_dict[id] = funcs
conn.close()
# 读取文件获取方法变更和对应的 case
func_case_id = {} # 方法对应的变更 id
conn = self.connect_db("x.x.x.x", 3306, "test", "test", "code_ing")
table_name = "task_diff_case_relation"
sql = "SELECT relation FROM %s where taskid = %s" % (table_name, self.task_id)
relation = pd.read_sql(sql, conn)
functions = ""
for row in relation.itertuples():
functions = getattr(row, "relation")
conn.close()
for i in json.loads(functions):
func_case_id[i.get('diff_code')] = i.get('caseid_list')
# 计算基类方法
pool = mp.Pool(mp.cpu_count())
jobs = []
for _dict in self.split_dict(func_case_id, mp.cpu_count()):
jobs.append(pool.apply(self.get_dict_common_func, _dict))
res = [job.get() for job in jobs]
pool.close()
common_funcs = list(itertools.chain(*map(eval, res)))
# 过滤
result_case_id_func = copy.deepcopy(case_id_func)
flitur_result = {}
for id, fun_list in case_id_func.items():
if set(fun_list) < set(common_funcs):
in_list = []
for fun in fun_list:
func_case_id[fun].remove(id)
if len(func_case_id[fun]) > 0:
if id in result_case_id_func:
del result_case_id_func[id]
in_list.append(fun)
flitur_result[id] = in_list
print(result_case_id_func)
print(flitur_result)
logging.info(str(result_case_id_func.keys()))
logging.info(str(flitur_result.keys()))
print(
len(list(flitur_result.keys())) / (len(list(flitur_result.keys())) + len(list(result_case_id_func.keys()))))
def connect_db(self, host, port, user, passwd, db):
try:
conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db)
return conn
except Exception as e:
logging.error("connect db error : " + str(e))
return None
def get_dict_common_func(self, _dict):
common_funcs = []
for fun, ids in _dict.items():
if len(_dict[fun]) >= 40:
common_funcs.append(fun)
continue
else:
in_list = [self.get_funcs_lines_from_tree(_dict[int(x)], fun) for x in ids]
if len(in_list) > 100:
common_funcs.append(fun)
continue
return common_funcs
def get_funcs_lines_from_tree(self, tree_string, func):
data = tree_string.split("\n")
index_list = [data.index(i) for i in data if func in i]
result_list = []
for i in index_list:
list_in = [data[i].split(" ")[-1].lstrip("L")]
prefix = self.rreplace(data[i].split("L")[0], "| ", "")
for j in data[0:i][::-1]:
if prefix in j:
# print("old new_prefix: ", prefix)
prefix = self.rreplace(prefix, "| ", "")
# print("新建 new_prefix: ", prefix)
list_in.append(j.split(" ")[-1].lstrip("L"))
continue
else:
pass
# 查找完成所有的前向调用,之后查找后向调用,先还原 prefxi
prefix = self.rreplace(data[i].split("L")[0], "| ", "| | ")
list_in = list_in[::-1]
for j in data[i:]:
if prefix in j:
prefix = self.rreplace(prefix, "| ", "| | ")
list_in.append(j.split(" ")[-1].lstrip("L"))
continue
else:
pass
if len(list_in) > 0:
result_list.append(list_in)
if len(result_list) > 0:
return result_list
else:
return
def rreplace(self, s, old, new):
li = s.rsplit(old, 1)
return new.join(li)
def split_dict(self, x, chunks):
i = itertools.cycle(range(chunks))
split = [dict() for _ in range(chunks)]
for k, v in x.items():
split[next(i)][k] = v
return split
if __name__ == "__main__":
fun = FuncTactic(993)
fun.tactic()
```