0%

简单描述kosaraju算法就两个步骤

  1. 在反向图上做后根遍历,按遍历顺序把结点压栈

  2. 以出栈顺序为初始节点去原图上遍历

在第二次遍历时,每次以初始节点遍历到的所有节点都和初始节点在同一个强连通分量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

void dfs1(int u, const vector<vector<int>>& reverse_edges, vector<bool>& visited, stack<int>& s) {
visited[u] = true;
for(int v: reverse_edges[u]) {
if(!visited[v]) {
dfs1(v, reverse_edges, visited, s);
}
}
s.push(u);
}

void dfs2(int u, const vector<vector<int>>& edges, vector<bool>& visited, int source, vector<int>& result) {
visited[u] = true;
result[u] = source;
for(int v: edges[u]) {
if(!visited[v]) {
dfs2(v, edges, visited, source, result);
}
}
}

// 计算图的强联通分量
// params: edges 邻接表
// returns: 返回每个结点所在的强联通分量的代表元
// 相同代表元说明属于同一强联通分量,不同代表元说明属于不同强联通分量
// 返回的不同值的个数就是强连通分量数
vector<int> kosaraju(const vector<vector<int>>& edges) {
int n = edges.size();
vector<int> result(n);

// 反向图
vector<vector<int>> reverse_edges(n);
for(int u=0;u<n;u++) {
for(int v: edges[u]) {
reverse_edges[v].push_back(u);
}
}

stack<int> s;
do {
// 在反向图上做后根遍历,把结点压栈
vector<bool> visited(n, false);
for(int u=0;u<n;u++) {
if(!visited[u]) {
dfs1(u, reverse_edges, visited, s);
}
}
} while(false);

do {
vector<bool> visited(n, false);
// 这时依次按照s中弹出的结点为开始结点去遍历原图(即使用edges),就可以得到强连通分量
while(!s.empty()) {
int u = s.top(); s.pop();
if(!visited[u]) {
dfs2(u, edges, visited, u/*代表元*/, result);
}
}
} while(false);
return result;
}

有了代表元数组就可以把原图简化成一个由强连通分量构成的有向无环图,再去做进一步处理就方便了,比如拓朴排序之类的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

template<int MOD>
class Combination {
vector<vector<int>> _comb;
public:
Combination(int max_n) {
// 打表组合值
_comb.resize(max_n+1, vector<int>());
for(int i=0;i<=max_n;i++) {
_comb[i].resize(i/2+1);
}
_comb[0][0] = 1;
for(int i=1;i<=max_n;i++) {
_comb[i][0] = 1;
for(int j=1;j<=i/2;j++) {
_comb[i][j] = (_comb[i-1][j-1] + _comb[i-1][j<=(i-1)/2?j:i-1-j]) % MOD;
}
}
}

// 组合数 n!/k!/(n-k)!
int operator()(int n, int k) {
if(k < 0 || n < 0 || k > n) return 0;
if(k > n/2) return _comb[n][n-k];
else return _comb[n][k];
}
};


// 不取MOD的特化实现
template<>
class Combination<0> {
vector<vector<int>> _comb;
public:
Combination(int max_n) {
// 打表组合值
_comb.resize(max_n+1, vector<int>());
for(int i=0;i<=max_n;i++) {
_comb[i].resize(i/2+1);
}
_comb[0][0] = 1;
for(int i=1;i<=max_n;i++) {
_comb[i][0] = 1;
for(int j=1;j<=i/2;j++) {
_comb[i][j] = (_comb[i-1][j-1] + _comb[i-1][j<=(i-1)/2?j:i-1-j]);
}
}
}

// 组合数 n!/k!/(n-k)!
int operator()(int n, int k) {
if(k < 0 || n < 0 || k > n) return 0;
if(k > n/2) return _comb[n][n-k];
else return _comb[n][k];
}
};

并查集结构支持3种操作:

  1. 合并两个集合
  2. 查询两个元素是否属于相同集合
  3. 查询元素所在集合的大小

实现的思路也很简单,最初每个元素都在一个只包含自身的集合中,之后通过Union操作建立关联,把一个元素的父亲指向另一个元素,那么判断两个元素是否属于同一集合只需要判断两个元素的根是否相同,这里顺便用parent记录下元素的个数,当parent[i]>=0时表示元素的父元素的下标,当parent[i]<0时(意味着i是某个集合的根元素),parent[i]则表示该集合的大小。
优化查询:在查询元素所属集合的根元素时顺便把自己的父元素直接指向根元素,也就是把树压扁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

class UFSets {
private:
vector<int> parent;
int size;
public:
UFSets(int s): parent(s){
size = s;
memset(&parent[0], -1, sizeof(int)*s);
}

int Find(int x){
if (parent[x] < 0) {
return x;
} else {
return parent[x] = Find(parent[x]);
}
}

void Union(int v1, int v2){
int s1 = Find(v1), s2 = Find(v2);
if(s1==s2)return;
int t = parent[s1] + parent[s2];
if ( parent[s2] < parent[s1] ) {
parent[s1] = s2;
parent[s2] = t;
}
else {
parent[s2] = s1;
parent[s1] = t;
}
}

int Count(int x){
return -parent[Find(x)];
}
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

// 马拉车算法,O(n)时间处理字符串所有子串的回文问题
// https://oi-wiki.org/string/manacher/
// 例题:https://leetcode.cn/problems/check-if-dfs-strings-are-palindromes/description/
// https://leetcode.cn/problems/longest-palindromic-substring/description/
class Manacher {
vector<int> d1; // 奇数长度子串的半径,d1[i] = r 表示,以下标i为中心的最长回文子串半径是r(长度是2r-1)
vector<int> d2; // 偶数长度子串的半径,d2[i] = r 表示,以下标i为右中心的最长回文子串半径是r(长度是2r)
public:
Manacher(const string& s) {
int n = s.size();
d1.resize(n);
d2.resize(n);

for (int i = 0, l = 0, r = -1; i < n; i++) {
int k = (i > r) ? 1 : min(d1[l + r - i], r - i + 1);
while (0 <= i - k && i + k < n && s[i - k] == s[i + k]) {
k++;
}
d1[i] = k--;
if (i + k > r) {
l = i - k;
r = i + k;
}
}


for (int i = 0, l = 0, r = -1; i < n; i++) {
int k = (i > r) ? 0 : min(d2[l + r - i + 1], r - i + 1);
while (0 <= i - k - 1 && i + k < n && s[i - k - 1] == s[i + k]) {
k++;
}
d2[i] = k--;
if (i + k > r) {
l = i - k - 1;
r = i + k;
}
}
}

// 返回s[b:e]是否回文
bool isPalindrome(int b, int e) {
if((e-b)%2) { // 奇数长度的子串
int c = b + (e-b) / 2;
int r = c - b + 1;
return d1[c] >= r;
} else { // 偶数长度的子串
int c = b + (e-b) / 2;
int r = c - b;
return d2[c] >= r;
}
}

// 返回s的最长回文子串的begin、end
pair<int, int> longestPalindrome() {
int b=0, e=0;
for(int c=0;c<d1.size();c++) {
if(d1[c]*2-1>e-b) {
b = c+1-d1[c];
e = c + d1[c];
}
if(d2[c]*2>e-b) {
b = c - d2[c];
e = c + d2[c];
}
}
return {b, e};
}
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import colorsys

def generate_colors(n):
"""
生成 n 个尽量有明显区别的颜色。

参数:
n (int):要生成的颜色数量,必须是正整数。

返回:
list:包含 n 个 RGB 颜色值的列表,每个颜色值是一个三元组 (r, g, b)。
"""
colors = []
cnt = 0
k = (n + 9) // 10
for i in range(k):
for j in range(10):
h = (360 / 10) * j + 23 * i
s = 1.0 if cnt < 25 else 0.8
l = 0.3 + (j * 19) % 7 / 7 * 0.4
cnt += 1
r, g, b = colorsys.hls_to_rgb(h / 360, l, s)
colors.append((int(r * 255), int(g * 255), int(b * 255)))
if cnt == n:
break
return colors

import matplotlib.pyplot as plt
import numpy as np

colors = generate_colors(30)

fig, ax = plt.subplots(figsize=(6, 6))
for i, color in enumerate(colors):
rect = plt.Rectangle((0, -i), 1, 1, color=np.array(color) / 255)
ax.add_patch(rect)
ax.set_xlim(0, 1)
ax.set_ylim(-len(colors), 0)
ax.axis('off')
plt.show()

用法

导入全局的profiler或创建一个局部的profiler

给函数添加装饰器@profile(profiler)

给代码段增加with profile_scope(profiler, name)

在统计结束后程序退出前调用print_profile(profiler)会在控制台输出各个线程中各个方法或代码段的耗时和调用次数,通过缩进体现调用关系。

如果想输出到日志文件,可以给print_profile传递print_func参数。

参考print_profile遍历节点的方式可以实现自定义输出或功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183

import time
import threading
from collections import deque
from functools import wraps
from contextlib import contextmanager


# 使用孩子兄弟树存储调用关系(对于递归的情况特判)
class Node:
def __init__(self, name, parent):
self.name = name
self.parent = parent
self.sibling = None
self.child = None
self.total_calls = 0 # 调用计数
self.recur_counter = 0 # 递归层数
self.start_ts = 0
self.total_ts = 0
self.is_recursive = False

def get_sub_node(self, name):
p = self.child
if p is None:
q = Node(name, self)
self.child = q
return q
else:
# 找到name节点并返回,如果找不到,退出循环时p是当前节点的最后一个孩子
while True:
if p.name == name:
return p
if p.sibling is None:
break
p = p.sibling
# 创建新节点
q = Node(name, self)
# 把新节点作为p的兄弟,即新的最后一个孩子
p.sibling = q
return q

def call(self):
self.total_calls += 1
self.recur_counter += 1
if self.recur_counter == 1:
self.start_ts = time.perf_counter()

def returns(self):
if self.recur_counter == 1:
self.total_ts += time.perf_counter() - self.start_ts
self.recur_counter -= 1

# 返回名字=name的祖先节点或当前节点自身
# 如果没有名字=name的祖先并且自身名字也不是name,则返回None
# 此方法用来辅助处理递归调用
def ancestor_or_self(self, name):
p = self
while p:
if p.name == name:
return p
p = p.parent
return None


class Profiler:
def __init__(self):
self.node_stack = deque()
self.roots = {}
self.current_node = None
self.main_tid = threading.get_ident()

def begin_range(self, name):
# 每个线程需要创建一个root节点
tid = threading.get_ident()
if tid not in self.roots:
root_name = f"thread:{tid}" if tid != self.main_tid else "main_thread"
self.current_node = self.roots[tid] = Node(root_name, None)

self.node_stack.append(self.current_node)
p = self.current_node.ancestor_or_self(name)
if p: # 说明发生了递归
self.current_node = p
else:
self.current_node = self.current_node.get_sub_node(name)
self.current_node.call()

def end_range(self):
self.current_node.returns()
self.current_node = self.node_stack.pop()


class ProfilerViewer:
def __init__(self, root, view_func):
self.root = root
self.view_func = view_func

def view(self):
self._view(self.root, 0)

def _view(self, p, depth):
self.view_func(p, depth)
if p.child:
self._view(p.child, depth+1)
if p.sibling:
self._view(p.sibling, depth)


def print_profile(profiler, print_func=print):
def view_func(p, depth):
if p.parent and p.parent.total_ts:
pct = p.total_ts / p.parent.total_ts
else:
pct = 1.0
s = r"{}{} {:.2f}pct {:.3f}ms {}calls".format(' '*depth, p.name, pct*100, p.total_ts*1000, p.total_calls)
print_func(s)

for root in profiler.roots.values():
viewer = ProfilerViewer(root, view_func)
viewer.view()


# 通过装饰器的方式统计一个函数的耗时和调用次数
def profile(profiler):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
profiler.begin_range(func.__name__)
ret = func(*args, **kwargs)
profiler.end_range()
return ret
return wrapper
return decorator


# 通过with的方式统计一个代码块的耗时和调用次数
@contextmanager
def profile_scope(profiler, name):
try:
profiler.begin_range(name)
yield
except Exception:
raise
finally:
profiler.end_range()


profiler = Profiler()


if __name__ == '__main__':

@profile(profiler)
def fib(n):
if n == 1 or n == 2:
return 1

with profile_scope(profiler, "sleep"):
time.sleep(0.1) # 模拟函数实现中一处耗时很多的操作

# 递归调用自身
return fib(n-1) + fib(n-2)

@profile(profiler)
def add(a, b):
return a + b

th = threading.Thread(target=add, args=(1, 2))
th.start()

@profile(profiler)
def test():
# 计算 fib(5)
print(fib(5))

# 计算10000次add
for i in range(10000):
add(1,i)

test()

print_profile(profiler)


pycallgraph2是一个Python模块,用于为Python应用程序创建可视化的调用关系图。

pycallgraph2 · PyPI

安装pycallgraph2

1
pip install pycallgraph2

安装graphviz

Linux

1
sudo apt install graphviz

Windows

1
https://graphviz.gitlab.io/_pages/Download/windows/graphviz-2.38.msi

通过

1
dot -V

可以确认安装成功

直接在命令行运行pycallgraph

1
usage: pycallgraph [options] OUTPUT_TYPE [output_options] -- SCRIPT.py [ARG ...]

例如

1
pycallgraph graphviz --output-file=my_callgraph.png -- ./test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

import redis
from dataclasses import dataclass

@dataclass
class Message:
key: str = ""
value: bytes = b""

class RedisMQProducer:
def __init__(self, *args, **kw):
self.client = redis.Redis(*args, **kw)

def send(self, topic: str, key: str, value: bytes):
redis_key = f"MQ_{topic}"
if len(key) >= 32:
raise Exception("key超长")
message = key.encode() + (b'\0' * (32 - len(key))) + value
self.client.lpush(redis_key, message)


def create_redis_mq_producer(server):
if server.find(':') != -1:
host, port = server.split(":", 1)
else:
host = server
port = 6379
return RedisMQProducer(host=host, port=port, db=0)


class RedisMQConsumer:
def __init__(self, *args, **kw):
self.client = redis.Redis(*args, **kw)
self.topics = []


def subscribe(self, topics):
self.topics = topics

def __iter__(self):
self.idx = 0
return self

# 阻塞地获取下一个新消息
def __next__(self):
if not self.topics:
raise StopIteration
while True:
topic = self.topics[self.idx]
self.idx = (self.idx + 1) % len(self.topics)
redis_key = f"MQ_{topic}"
data = self.client.rpop(redis_key)
if data:
return Message(key=data[:32].decode().strip('\0'), value=data[32:])

# 获取新消息并返回,直到所有订阅的topic中都没有新消息或者获取到了数量max_records个消息
def get_messages(self, max_records=20):
ret = []
topics = self.topics
while topics:
topics_bak = topics
topics = []
for topic in topics_bak:
redis_key = f"MQ_{topic}"
data = self.client.rpop(redis_key)
if data:
try:
ret.append(Message(key=data[:32].decode().strip('\0'), value=data[32:]))
except Exception as e:
pass
topics.append(topic)
if len(ret) >= max_records:
return ret
return ret


def create_redis_mq_consumer(server):
if server.find(':') != -1:
host, port = server.split(":", 1)
else:
host = server
port = 6379
return RedisMQConsumer(host=host, port=port, db=0)

对kafka-python进行封装,增加了去重,因为默认情况下Kafka只保证不丢失数据,不保证极端情况不重复,这里增加去重处理后可保证不重不漏,还实现了从指定时间还是订阅的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

import kafka # pip install kafka-python
import time
import uuid
import datetime
import queue
from dataclasses import dataclass

@dataclass
class Message:
key: str = ""
value: bytes = b""

class KafkaProducer(kafka.KafkaProducer):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)

def send(self, topic: str, key: str, value: bytes):
# 生成一个长32位的id用于去重,同时顺便记录消息生成时间
# 默认情况下Kafka保证消息不丢失,但并不能保证不重复,所以我们通过id来自行去重
id = "{}{}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3], uuid.uuid4().hex[:15])
super().send(topic, key=key.encode(), value=id.encode()+value)


def create_kafka_producer(server):
retry_times = 3
for i in range(1, retry_times+1):
try:
producer = KafkaProducer(
bootstrap_servers=[server],
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return producer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)


class KafkaConsumer(kafka.KafkaConsumer):
def __init__(self, **kw):
super().__init__(**kw)
self.q = queue.Queue()
self.s = set()
self.DUP_DETECT_SIZE = 1000 # 去重检测大小

# 从最新消息开始订阅(subscribe在父类有实现)
# def subscribe(self, topic):
# pass

# 实现从一个历史时间点进行消息订阅(能订阅到的消息取决于Kafka服务器配置的保留策略,基于目前的配置可以保证72小时内的消息可以重复消费)
def subscribe_from_datetime(self, partition, topic, dt):
if type(dt) is int or type(dt) is float:
ts = dt
elif isinstance(dt, datetime.datetime):
ts = dt.timestamp()
else:
ts = time.mktime(time.strptime(f"{dt}", r"%Y-%m-%d %H:%M:%S"))
offset = self._get_offset_for_time(partition, topic, ts)
tp = kafka.TopicPartition(topic, partition)
super().assign([tp])
super().seek(tp, offset)

def __iter__(self):
return self

# 重新封装阻塞取消息方式,增加去重
def __next__(self):
while True:
message = super().__next__()
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(id)
self.q.put(id)

return Message(key=key.decode(), value=message.value[32:])

# 重新封装非阻塞取消息方式,增加去重
def get_messages(self, max_records=20):
r = super().poll(timeout_ms=max_records*25, max_records=max_records)
ret = []
for messages in r.values():
for message in messages:
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(uuid)
self.q.put(uuid)

ret.append(Message(key=key.decode(), value=message.value[32:]))
return ret

def _get_latest_offset(self, partition, topic):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
off_set_dict = super(KafkaConsumer, self).end_offsets([tp])
return list(off_set_dict.values())[0]

def _get_offset_for_time(self, partition, topic, ts):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
offset_dict = super(KafkaConsumer, self).offsets_for_times({tp: int(ts*1000)})
offset = list(offset_dict.values())[0]
if offset is None:
return self._get_latest_offset(partition, topic)
else:
return offset.offset


def create_kafka_consumer(server, group_id):
retry_times = 3
for i in range(1, retry_times+1):
try:
consumer = KafkaConsumer(
bootstrap_servers=[server],
group_id=group_id,
auto_offset_reset='latest', # earliest, latest
enable_auto_commit=True,
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return consumer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)


mount命令的执行需要sudo权限,其实有一些命令允许普通用户进行挂载操作

fusermount 用户空间挂载NFS共享
smbnetfs 用户空间挂载samba共享
sshfs 用户空间基于SSH挂载共享

卸载可以统一使用 fusermount -u
强制(延迟)卸载使用 fusermount -z