Python函数调用关系和性能分析

用法

导入全局的profiler或创建一个局部的profiler

给函数添加装饰器@profile(profiler)

给代码段增加with profile_scope(profiler, name)

在统计结束后程序退出前调用print_profile(profiler)会在控制台输出各个线程中各个方法或代码段的耗时和调用次数,通过缩进体现调用关系。

如果想输出到日志文件,可以给print_profile传递print_func参数。

参考print_profile遍历节点的方式可以实现自定义输出或功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183

import time
import threading
from collections import deque
from functools import wraps
from contextlib import contextmanager


# 使用孩子兄弟树存储调用关系(对于递归的情况特判)
class Node:
def __init__(self, name, parent):
self.name = name
self.parent = parent
self.sibling = None
self.child = None
self.total_calls = 0 # 调用计数
self.recur_counter = 0 # 递归层数
self.start_ts = 0
self.total_ts = 0
self.is_recursive = False

def get_sub_node(self, name):
p = self.child
if p is None:
q = Node(name, self)
self.child = q
return q
else:
# 找到name节点并返回,如果找不到,退出循环时p是当前节点的最后一个孩子
while True:
if p.name == name:
return p
if p.sibling is None:
break
p = p.sibling
# 创建新节点
q = Node(name, self)
# 把新节点作为p的兄弟,即新的最后一个孩子
p.sibling = q
return q

def call(self):
self.total_calls += 1
self.recur_counter += 1
if self.recur_counter == 1:
self.start_ts = time.perf_counter()

def returns(self):
if self.recur_counter == 1:
self.total_ts += time.perf_counter() - self.start_ts
self.recur_counter -= 1

# 返回名字=name的祖先节点或当前节点自身
# 如果没有名字=name的祖先并且自身名字也不是name,则返回None
# 此方法用来辅助处理递归调用
def ancestor_or_self(self, name):
p = self
while p:
if p.name == name:
return p
p = p.parent
return None


class Profiler:
def __init__(self):
self.node_stack = deque()
self.roots = {}
self.current_node = None
self.main_tid = threading.get_ident()

def begin_range(self, name):
# 每个线程需要创建一个root节点
tid = threading.get_ident()
if tid not in self.roots:
root_name = f"thread:{tid}" if tid != self.main_tid else "main_thread"
self.current_node = self.roots[tid] = Node(root_name, None)

self.node_stack.append(self.current_node)
p = self.current_node.ancestor_or_self(name)
if p: # 说明发生了递归
self.current_node = p
else:
self.current_node = self.current_node.get_sub_node(name)
self.current_node.call()

def end_range(self):
self.current_node.returns()
self.current_node = self.node_stack.pop()


class ProfilerViewer:
def __init__(self, root, view_func):
self.root = root
self.view_func = view_func

def view(self):
self._view(self.root, 0)

def _view(self, p, depth):
self.view_func(p, depth)
if p.child:
self._view(p.child, depth+1)
if p.sibling:
self._view(p.sibling, depth)


def print_profile(profiler, print_func=print):
def view_func(p, depth):
if p.parent and p.parent.total_ts:
pct = p.total_ts / p.parent.total_ts
else:
pct = 1.0
s = r"{}{} {:.2f}pct {:.3f}ms {}calls".format(' '*depth, p.name, pct*100, p.total_ts*1000, p.total_calls)
print_func(s)

for root in profiler.roots.values():
viewer = ProfilerViewer(root, view_func)
viewer.view()


# 通过装饰器的方式统计一个函数的耗时和调用次数
def profile(profiler):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
profiler.begin_range(func.__name__)
ret = func(*args, **kwargs)
profiler.end_range()
return ret
return wrapper
return decorator


# 通过with的方式统计一个代码块的耗时和调用次数
@contextmanager
def profile_scope(profiler, name):
try:
profiler.begin_range(name)
yield
except Exception:
raise
finally:
profiler.end_range()


profiler = Profiler()


if __name__ == '__main__':

@profile(profiler)
def fib(n):
if n == 1 or n == 2:
return 1

with profile_scope(profiler, "sleep"):
time.sleep(0.1) # 模拟函数实现中一处耗时很多的操作

# 递归调用自身
return fib(n-1) + fib(n-2)

@profile(profiler)
def add(a, b):
return a + b

th = threading.Thread(target=add, args=(1, 2))
th.start()

@profile(profiler)
def test():
# 计算 fib(5)
print(fib(5))

# 计算10000次add
for i in range(10000):
add(1,i)

test()

print_profile(profiler)