0%

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

// 马拉车算法,O(n)时间处理字符串所有子串的回文问题
// https://oi-wiki.org/string/manacher/
// 例题:https://leetcode.cn/problems/check-if-dfs-strings-are-palindromes/description/
// https://leetcode.cn/problems/longest-palindromic-substring/description/
class Manacher {
vector<int> d1; // 奇数长度子串的半径,d1[i] = r 表示,以下标i为中心的最长回文子串半径是r(长度是2r-1)
vector<int> d2; // 偶数长度子串的半径,d2[i] = r 表示,以下标i为右中心的最长回文子串半径是r(长度是2r)
public:
Manacher(const string& s) {
int n = s.size();
d1.resize(n);
d2.resize(n);

for (int i = 0, l = 0, r = -1; i < n; i++) {
int k = (i > r) ? 1 : min(d1[l + r - i], r - i + 1);
while (0 <= i - k && i + k < n && s[i - k] == s[i + k]) {
k++;
}
d1[i] = k--;
if (i + k > r) {
l = i - k;
r = i + k;
}
}


for (int i = 0, l = 0, r = -1; i < n; i++) {
int k = (i > r) ? 0 : min(d2[l + r - i + 1], r - i + 1);
while (0 <= i - k - 1 && i + k < n && s[i - k - 1] == s[i + k]) {
k++;
}
d2[i] = k--;
if (i + k > r) {
l = i - k - 1;
r = i + k;
}
}
}

// 返回s[b:e]是否回文
bool isPalindrome(int b, int e) {
if((e-b)%2) { // 奇数长度的子串
int c = b + (e-b) / 2;
int r = c - b + 1;
return d1[c] >= r;
} else { // 偶数长度的子串
int c = b + (e-b) / 2;
int r = c - b;
return d2[c] >= r;
}
}

// 返回s的最长回文子串的begin、end
pair<int, int> longestPalindrome() {
int b=0, e=0;
for(int c=0;c<d1.size();c++) {
if(d1[c]*2-1>e-b) {
b = c+1-d1[c];
e = c + d1[c];
}
if(d2[c]*2>e-b) {
b = c - d2[c];
e = c + d2[c];
}
}
return {b, e};
}
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import colorsys

def generate_colors(n):
"""
生成 n 个尽量有明显区别的颜色。

参数:
n (int):要生成的颜色数量,必须是正整数。

返回:
list:包含 n 个 RGB 颜色值的列表,每个颜色值是一个三元组 (r, g, b)。
"""
colors = []
cnt = 0
k = (n + 9) // 10
for i in range(k):
for j in range(10):
h = (360 / 10) * j + 23 * i
s = 1.0 if cnt < 25 else 0.8
l = 0.3 + (j * 19) % 7 / 7 * 0.4
cnt += 1
r, g, b = colorsys.hls_to_rgb(h / 360, l, s)
colors.append((int(r * 255), int(g * 255), int(b * 255)))
if cnt == n:
break
return colors

import matplotlib.pyplot as plt
import numpy as np

colors = generate_colors(30)

fig, ax = plt.subplots(figsize=(6, 6))
for i, color in enumerate(colors):
rect = plt.Rectangle((0, -i), 1, 1, color=np.array(color) / 255)
ax.add_patch(rect)
ax.set_xlim(0, 1)
ax.set_ylim(-len(colors), 0)
ax.axis('off')
plt.show()

用法

导入全局的profiler或创建一个局部的profiler

给函数添加装饰器@profile(profiler)

给代码段增加with profile_scope(profiler, name)

在统计结束后程序退出前调用print_profile(profiler)会在控制台输出各个线程中各个方法或代码段的耗时和调用次数,通过缩进体现调用关系。

如果想输出到日志文件,可以给print_profile传递print_func参数。

参考print_profile遍历节点的方式可以实现自定义输出或功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183

import time
import threading
from collections import deque
from functools import wraps
from contextlib import contextmanager


# 使用孩子兄弟树存储调用关系(对于递归的情况特判)
class Node:
def __init__(self, name, parent):
self.name = name
self.parent = parent
self.sibling = None
self.child = None
self.total_calls = 0 # 调用计数
self.recur_counter = 0 # 递归层数
self.start_ts = 0
self.total_ts = 0
self.is_recursive = False

def get_sub_node(self, name):
p = self.child
if p is None:
q = Node(name, self)
self.child = q
return q
else:
# 找到name节点并返回,如果找不到,退出循环时p是当前节点的最后一个孩子
while True:
if p.name == name:
return p
if p.sibling is None:
break
p = p.sibling
# 创建新节点
q = Node(name, self)
# 把新节点作为p的兄弟,即新的最后一个孩子
p.sibling = q
return q

def call(self):
self.total_calls += 1
self.recur_counter += 1
if self.recur_counter == 1:
self.start_ts = time.perf_counter()

def returns(self):
if self.recur_counter == 1:
self.total_ts += time.perf_counter() - self.start_ts
self.recur_counter -= 1

# 返回名字=name的祖先节点或当前节点自身
# 如果没有名字=name的祖先并且自身名字也不是name,则返回None
# 此方法用来辅助处理递归调用
def ancestor_or_self(self, name):
p = self
while p:
if p.name == name:
return p
p = p.parent
return None


class Profiler:
def __init__(self):
self.node_stack = deque()
self.roots = {}
self.current_node = None
self.main_tid = threading.get_ident()

def begin_range(self, name):
# 每个线程需要创建一个root节点
tid = threading.get_ident()
if tid not in self.roots:
root_name = f"thread:{tid}" if tid != self.main_tid else "main_thread"
self.current_node = self.roots[tid] = Node(root_name, None)

self.node_stack.append(self.current_node)
p = self.current_node.ancestor_or_self(name)
if p: # 说明发生了递归
self.current_node = p
else:
self.current_node = self.current_node.get_sub_node(name)
self.current_node.call()

def end_range(self):
self.current_node.returns()
self.current_node = self.node_stack.pop()


class ProfilerViewer:
def __init__(self, root, view_func):
self.root = root
self.view_func = view_func

def view(self):
self._view(self.root, 0)

def _view(self, p, depth):
self.view_func(p, depth)
if p.child:
self._view(p.child, depth+1)
if p.sibling:
self._view(p.sibling, depth)


def print_profile(profiler, print_func=print):
def view_func(p, depth):
if p.parent and p.parent.total_ts:
pct = p.total_ts / p.parent.total_ts
else:
pct = 1.0
s = r"{}{} {:.2f}pct {:.3f}ms {}calls".format(' '*depth, p.name, pct*100, p.total_ts*1000, p.total_calls)
print_func(s)

for root in profiler.roots.values():
viewer = ProfilerViewer(root, view_func)
viewer.view()


# 通过装饰器的方式统计一个函数的耗时和调用次数
def profile(profiler):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
profiler.begin_range(func.__name__)
ret = func(*args, **kwargs)
profiler.end_range()
return ret
return wrapper
return decorator


# 通过with的方式统计一个代码块的耗时和调用次数
@contextmanager
def profile_scope(profiler, name):
try:
profiler.begin_range(name)
yield
except Exception:
raise
finally:
profiler.end_range()


profiler = Profiler()


if __name__ == '__main__':

@profile(profiler)
def fib(n):
if n == 1 or n == 2:
return 1

with profile_scope(profiler, "sleep"):
time.sleep(0.1) # 模拟函数实现中一处耗时很多的操作

# 递归调用自身
return fib(n-1) + fib(n-2)

@profile(profiler)
def add(a, b):
return a + b

th = threading.Thread(target=add, args=(1, 2))
th.start()

@profile(profiler)
def test():
# 计算 fib(5)
print(fib(5))

# 计算10000次add
for i in range(10000):
add(1,i)

test()

print_profile(profiler)


pycallgraph2是一个Python模块,用于为Python应用程序创建可视化的调用关系图。

pycallgraph2 · PyPI

安装pycallgraph2

1
pip install pycallgraph2

安装graphviz

Linux

1
sudo apt install graphviz

Windows

1
https://graphviz.gitlab.io/_pages/Download/windows/graphviz-2.38.msi

通过

1
dot -V

可以确认安装成功

直接在命令行运行pycallgraph

1
usage: pycallgraph [options] OUTPUT_TYPE [output_options] -- SCRIPT.py [ARG ...]

例如

1
pycallgraph graphviz --output-file=my_callgraph.png -- ./test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

import redis
from dataclasses import dataclass

@dataclass
class Message:
key: str = ""
value: bytes = b""

class RedisMQProducer:
def __init__(self, *args, **kw):
self.client = redis.Redis(*args, **kw)

def send(self, topic: str, key: str, value: bytes):
redis_key = f"MQ_{topic}"
if len(key) >= 32:
raise Exception("key超长")
message = key.encode() + (b'\0' * (32 - len(key))) + value
self.client.lpush(redis_key, message)


def create_redis_mq_producer(server):
if server.find(':') != -1:
host, port = server.split(":", 1)
else:
host = server
port = 6379
return RedisMQProducer(host=host, port=port, db=0)


class RedisMQConsumer:
def __init__(self, *args, **kw):
self.client = redis.Redis(*args, **kw)
self.topics = []


def subscribe(self, topics):
self.topics = topics

def __iter__(self):
self.idx = 0
return self

# 阻塞地获取下一个新消息
def __next__(self):
if not self.topics:
raise StopIteration
while True:
topic = self.topics[self.idx]
self.idx = (self.idx + 1) % len(self.topics)
redis_key = f"MQ_{topic}"
data = self.client.rpop(redis_key)
if data:
return Message(key=data[:32].decode().strip('\0'), value=data[32:])

# 获取新消息并返回,直到所有订阅的topic中都没有新消息或者获取到了数量max_records个消息
def get_messages(self, max_records=20):
ret = []
topics = self.topics
while topics:
topics_bak = topics
topics = []
for topic in topics_bak:
redis_key = f"MQ_{topic}"
data = self.client.rpop(redis_key)
if data:
try:
ret.append(Message(key=data[:32].decode().strip('\0'), value=data[32:]))
except Exception as e:
pass
topics.append(topic)
if len(ret) >= max_records:
return ret
return ret


def create_redis_mq_consumer(server):
if server.find(':') != -1:
host, port = server.split(":", 1)
else:
host = server
port = 6379
return RedisMQConsumer(host=host, port=port, db=0)

对kafka-python进行封装,增加了去重,因为默认情况下Kafka只保证不丢失数据,不保证极端情况不重复,这里增加去重处理后可保证不重不漏,还实现了从指定时间还是订阅的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

import kafka # pip install kafka-python
import time
import uuid
import datetime
import queue
from dataclasses import dataclass

@dataclass
class Message:
key: str = ""
value: bytes = b""

class KafkaProducer(kafka.KafkaProducer):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)

def send(self, topic: str, key: str, value: bytes):
# 生成一个长32位的id用于去重,同时顺便记录消息生成时间
# 默认情况下Kafka保证消息不丢失,但并不能保证不重复,所以我们通过id来自行去重
id = "{}{}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3], uuid.uuid4().hex[:15])
super().send(topic, key=key.encode(), value=id.encode()+value)


def create_kafka_producer(server):
retry_times = 3
for i in range(1, retry_times+1):
try:
producer = KafkaProducer(
bootstrap_servers=[server],
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return producer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)


class KafkaConsumer(kafka.KafkaConsumer):
def __init__(self, **kw):
super().__init__(**kw)
self.q = queue.Queue()
self.s = set()
self.DUP_DETECT_SIZE = 1000 # 去重检测大小

# 从最新消息开始订阅(subscribe在父类有实现)
# def subscribe(self, topic):
# pass

# 实现从一个历史时间点进行消息订阅(能订阅到的消息取决于Kafka服务器配置的保留策略,基于目前的配置可以保证72小时内的消息可以重复消费)
def subscribe_from_datetime(self, partition, topic, dt):
if type(dt) is int or type(dt) is float:
ts = dt
elif isinstance(dt, datetime.datetime):
ts = dt.timestamp()
else:
ts = time.mktime(time.strptime(f"{dt}", r"%Y-%m-%d %H:%M:%S"))
offset = self._get_offset_for_time(partition, topic, ts)
tp = kafka.TopicPartition(topic, partition)
super().assign([tp])
super().seek(tp, offset)

def __iter__(self):
return self

# 重新封装阻塞取消息方式,增加去重
def __next__(self):
while True:
message = super().__next__()
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(id)
self.q.put(id)

return Message(key=key.decode(), value=message.value[32:])

# 重新封装非阻塞取消息方式,增加去重
def get_messages(self, max_records=20):
r = super().poll(timeout_ms=max_records*25, max_records=max_records)
ret = []
for messages in r.values():
for message in messages:
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(uuid)
self.q.put(uuid)

ret.append(Message(key=key.decode(), value=message.value[32:]))
return ret

def _get_latest_offset(self, partition, topic):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
off_set_dict = super(KafkaConsumer, self).end_offsets([tp])
return list(off_set_dict.values())[0]

def _get_offset_for_time(self, partition, topic, ts):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
offset_dict = super(KafkaConsumer, self).offsets_for_times({tp: int(ts*1000)})
offset = list(offset_dict.values())[0]
if offset is None:
return self._get_latest_offset(partition, topic)
else:
return offset.offset


def create_kafka_consumer(server, group_id):
retry_times = 3
for i in range(1, retry_times+1):
try:
consumer = KafkaConsumer(
bootstrap_servers=[server],
group_id=group_id,
auto_offset_reset='latest', # earliest, latest
enable_auto_commit=True,
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return consumer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)


mount命令的执行需要sudo权限,其实有一些命令允许普通用户进行挂载操作

fusermount 用户空间挂载NFS共享
smbnetfs 用户空间挂载samba共享
sshfs 用户空间基于SSH挂载共享

卸载可以统一使用 fusermount -u
强制(延迟)卸载使用 fusermount -z

备份压缩

tar_backup.sh 将指定目录压缩备份到指定目录下并自动轮转保留最近5次备份

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/bash

if [ "$#" -lt 2 ]; then
script_name=$(basename "$0")
echo "Usage: ${script_name} <需要备份的路径> <备份到的路径> [--exclude <需要排除的路径> ...]"
exit 1
fi

src_path=$1
dst_path=$2
shift 2
tar_params=$@

if [ ! -d "${dst_path}" ]; then
mkdir -p ${dst_path}
fi

for file in $(find ${dst_path} -name "backup-*" -type f | sort -r | tail -n +5 );
do
rm -f $file;
done;

if [ "${dst_path: -1}" != "/" ]; then
dst_path="${dst_path}/"
fi

tar cJf ${dst_path}/backup-$(date "+%Y%m%d-%H%M%S-%N").tar.xz ${tar_params} ${src_path} 2>&-

解压还原

解压到当前路径

1
tar -xvf backup-20240617-235910-016764400.tar.xz

解压到指定路径

1
tar -xvf backup-20240617-235910-016764400.tar.xz -C /path/to/destination

对于使用绝对路径的备份,解压到原路径

1
tar -xvf backup-20240617-235910-016764400.tar.xz -C /

有时候我们希望函数调用的缓存能够夸进程使用或者在程序重启后仍然有效。

一个显然的想法是缓存到redis,于是就有了下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

import os
import functools
import redis
from redis.connection import ConnectionPool
import msgpack
import msgpack_numpy
import numpy as np
import pandas as pd
import io

class _HashedSeq(list):
__slots__ = 'hashvalue'

def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)

def __hash__(self):
return self.hashvalue


def _make_key(args, kw, typed=False):
key = args
if kw:
kwd_mark = (object(),)
key += kwd_mark
for item in kw.items():
key += item
if typed:
key += tuple(type(v) for v in args)
if kw:
key += tuple(type(v) for v in kw.values())
elif len(key) == 1 and type(key[0]) in {int, str}:
return key[0]
return _HashedSeq(key)


def dumps_to_feather(df):
columns = df.columns
df.columns = [str(e) for e in df.columns]
buffer = io.BytesIO() # 创建一个内存中的字节流缓冲区
df.to_feather(buffer) # 将 DataFrame 序列化为 Feather 格式并存储到缓冲区
buffer.seek(0) # 重新定位到字节流的开头
serialized_data = buffer.getvalue() # 获取序列化后的二进制数据
return {"data": serialized_data, "columns": list(columns)}


def loads_from_feather(data):
buffer = io.BytesIO(data["data"]) # 创建一个内存中的字节流缓冲区,并加载序列化的数据
df = pd.read_feather(buffer) # 从 Feather 格式的序列化数据中加载 DataFrame
df.columns = data["columns"]
return df


def _custom_encode(obj):
if isinstance(obj, set):
return {b'__set__': list(obj)} # 将 set 对象转换为字典形式
if isinstance(obj, pd.DataFrame):
return {b'__feather__': dumps_to_feather(obj)}
return msgpack_numpy.encode(obj)


def _custom_decode(obj):
if b'__set__' in obj: # 判断是否是我们之前转换过的 set 对象
return set(obj[b'__set__']) # 将字典形式的 set 转换回原始的 set 对象
if b'__feather__' in obj:
return loads_from_feather(obj[b'__feather__'])
return msgpack_numpy.decode(obj) # 不需要转换的情况下直接返回原始对象


class RedisTTLCache:
def __init__(self, host='localhost', port=6379):
self.pool = ConnectionPool(host=host, port=port)

def cache(self, ttl, scope=os.path.basename(__file__)):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 构建唯一的缓存键
cache_key = f"FUNC_CACHE_{(scope, func.__name__, _make_key(args, kwargs))}"

# 尝试从Redis中获取缓存
redis_client = redis.Redis(connection_pool=self.pool)
result = redis_client.get(cache_key)
if result is not None:
# 如果缓存存在,直接返回结果
result = msgpack.loads(result, object_hook=_custom_decode)
return result

# 如果缓存中不存在该结果,则重新计算函数结果
result = func(*args, **kwargs)

# 将计算结果存入Redis缓存
redis_client.set(cache_key, msgpack.dumps(result, default=_custom_encode), ex=ttl)

return result
return wrapper
return decorator


if __name__ == '__main__':
redis_ttl_cache = RedisTTLCache(host="localhost")
ttl_cached = redis_ttl_cache.cache

@ttl_cached(ttl=5)
def func(a, b):
print("Hello world")
return {
'result': a + b,
'colors': {'red', 'green', 'blue'},
'arr': np.array([[0, 1], [2,3]]),
'df': pd.DataFrame([[0, 1], [2,3]]),
}

print(func(1, 2))
print(func(1, 2))

这里的主要问题是使用什么作为redis的key来确保真正相同的调用使用到相同的缓存。

如果我们在key中拼入进程ID,那么则是非常保守的,不同进程将无法使用到相同的缓存。

我这里默认使用 脚本文件名+函数名+参数 作为key,但是或许有两个相同脚本名的脚本中定义了同名的但功能不同的函数,这时候不应该使用相同的缓存。

总之我们只能折中处理,因为我的目的就是跨进程使用缓存,所以没有特别保守,同时提供了scope参数,可以自定义一个拼入key的标识,在必要时由用户决定能否使用相同的缓存。

有时我们希望使用一些对数据大小或类型有限制的数据结构,例如树状数组(要求整型并且因为内存的关系最大数据范围通常为1e6量级)。

但是如果我们的数据个数在1e6之内,并且我们只关心数据的序,我们就可以在非负整数和我们的数据集之间建立映射来使用树状数组(或其他有限制的数据结构)。

那么这里我们就期望一个帮助建立映射的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
using namespace std;

// 返回一个整型数组包含 [0, arr.size()) 的数字,且和arr具有一致的序
template<typename T, typename CMP=less<T>>
vector<int> get_rank(const vector<T>& arr, CMP cmp=CMP()){
vector<int> idx(arr.size());
iota(idx.begin(), idx.end(), 0);
sort(idx.begin(), idx.end(), [&](auto i, auto j){
return cmp(arr[i], arr[j]);
});
// 我们在不改变arr的情况下对其下标数组进行排序
// 排序好后,对于任意i < j有arr[idx[i]] <= arr[idx[j]] (1)
// 如果我们构造一个rank,令rank[idx[i]] = i, 0<=i<n,
// 那么显然对于任意i < j有rank[idx[i]]=i < j=rank[idx[j]]
// 可以看到如果忽略(1)式中的等号,rank就已经和arr具有一致的序
// 所以下面我们就按这个思路,再顺便处理一下并列的情况就行了
vector<int> rank(arr.size());
rank[idx[0]] = 0;
for(int i=1; i<arr.size(); i++){
if(arr[idx[i]] == arr[idx[i-1]]){
rank[idx[i]] = rank[idx[i-1]];
} else {
rank[idx[i]] = i;
}
}
return rank;
}

上面的得到rank实际上是 arr -> 同序非负整数集 的映射。