0%

对kafka-python进行封装,增加了去重,因为默认情况下Kafka只保证不丢失数据,不保证极端情况不重复,这里增加去重处理后可保证不重不漏,还实现了从指定时间还是订阅的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

import kafka # pip install kafka-python
import time
import uuid
import datetime
import queue
from dataclasses import dataclass

@dataclass
class Message:
key: str = ""
value: bytes = b""

class KafkaProducer(kafka.KafkaProducer):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)

def send(self, topic: str, key: str, value: bytes):
# 生成一个长32位的id用于去重,同时顺便记录消息生成时间
# 默认情况下Kafka保证消息不丢失,但并不能保证不重复,所以我们通过id来自行去重
id = "{}{}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")[:-3], uuid.uuid4().hex[:15])
super().send(topic, key=key.encode(), value=id.encode()+value)


def create_kafka_producer(server):
retry_times = 3
for i in range(1, retry_times+1):
try:
producer = KafkaProducer(
bootstrap_servers=[server],
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return producer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)


class KafkaConsumer(kafka.KafkaConsumer):
def __init__(self, **kw):
super().__init__(**kw)
self.q = queue.Queue()
self.s = set()
self.DUP_DETECT_SIZE = 1000 # 去重检测大小

# 从最新消息开始订阅(subscribe在父类有实现)
# def subscribe(self, topic):
# pass

# 实现从一个历史时间点进行消息订阅(能订阅到的消息取决于Kafka服务器配置的保留策略,基于目前的配置可以保证72小时内的消息可以重复消费)
def subscribe_from_datetime(self, partition, topic, dt):
if type(dt) is int or type(dt) is float:
ts = dt
elif isinstance(dt, datetime.datetime):
ts = dt.timestamp()
else:
ts = time.mktime(time.strptime(f"{dt}", r"%Y-%m-%d %H:%M:%S"))
offset = self._get_offset_for_time(partition, topic, ts)
tp = kafka.TopicPartition(topic, partition)
super().assign([tp])
super().seek(tp, offset)

def __iter__(self):
return self

# 重新封装阻塞取消息方式,增加去重
def __next__(self):
while True:
message = super().__next__()
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(id)
self.q.put(id)

return Message(key=key.decode(), value=message.value[32:])

# 重新封装非阻塞取消息方式,增加去重
def get_messages(self, max_records=20):
r = super().poll(timeout_ms=max_records*25, max_records=max_records)
ret = []
for messages in r.values():
for message in messages:
key = message.key
if not key:
continue
id = message.value[:32]
if id in self.s: # 重复的消息
continue
if len(self.s) >= self.DUP_DETECT_SIZE:
e = self.q.get()
self.s.remove(e)
self.s.add(uuid)
self.q.put(uuid)

ret.append(Message(key=key.decode(), value=message.value[32:]))
return ret

def _get_latest_offset(self, partition, topic):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
off_set_dict = super(KafkaConsumer, self).end_offsets([tp])
return list(off_set_dict.values())[0]

def _get_offset_for_time(self, partition, topic, ts):
tp = kafka.TopicPartition(topic, partition)
super(KafkaConsumer, self).assign([tp])
offset_dict = super(KafkaConsumer, self).offsets_for_times({tp: int(ts*1000)})
offset = list(offset_dict.values())[0]
if offset is None:
return self._get_latest_offset(partition, topic)
else:
return offset.offset


def create_kafka_consumer(server, group_id):
retry_times = 3
for i in range(1, retry_times+1):
try:
consumer = KafkaConsumer(
bootstrap_servers=[server],
group_id=group_id,
auto_offset_reset='latest', # earliest, latest
enable_auto_commit=True,
api_version=(2,3,1),
api_version_auto_timeout_ms=5000
)
return consumer
except Exception as e:
if i == retry_times:
raise e
else:
time.sleep(0.1)


mount命令的执行需要sudo权限,其实有一些命令允许普通用户进行挂载操作

fusermount 用户空间挂载NFS共享
smbnetfs 用户空间挂载samba共享
sshfs 用户空间基于SSH挂载共享

卸载可以统一使用 fusermount -u
强制(延迟)卸载使用 fusermount -z

备份压缩

tar_backup.sh 将指定目录压缩备份到指定目录下并自动轮转保留最近5次备份

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/bash

if [ "$#" -lt 2 ]; then
script_name=$(basename "$0")
echo "Usage: ${script_name} <需要备份的路径> <备份到的路径> [--exclude <需要排除的路径> ...]"
exit 1
fi

src_path=$1
dst_path=$2
shift 2
tar_params=$@

if [ ! -d "${dst_path}" ]; then
mkdir -p ${dst_path}
fi

for file in $(find ${dst_path} -name "backup-*" -type f | sort -r | tail -n +5 );
do
rm -f $file;
done;

if [ "${dst_path: -1}" != "/" ]; then
dst_path="${dst_path}/"
fi

tar cJf ${dst_path}/backup-$(date "+%Y%m%d-%H%M%S-%N").tar.xz ${tar_params} ${src_path} 2>&-

解压还原

解压到当前路径

1
tar -xvf backup-20240617-235910-016764400.tar.xz

解压到指定路径

1
tar -xvf backup-20240617-235910-016764400.tar.xz -C /path/to/destination

对于使用绝对路径的备份,解压到原路径

1
tar -xvf backup-20240617-235910-016764400.tar.xz -C /

有时候我们希望函数调用的缓存能够夸进程使用或者在程序重启后仍然有效。

一个显然的想法是缓存到redis,于是就有了下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

import os
import functools
import redis
from redis.connection import ConnectionPool
import msgpack
import msgpack_numpy
import numpy as np
import pandas as pd
import io

class _HashedSeq(list):
__slots__ = 'hashvalue'

def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)

def __hash__(self):
return self.hashvalue


def _make_key(args, kw, typed=False):
key = args
if kw:
kwd_mark = (object(),)
key += kwd_mark
for item in kw.items():
key += item
if typed:
key += tuple(type(v) for v in args)
if kw:
key += tuple(type(v) for v in kw.values())
elif len(key) == 1 and type(key[0]) in {int, str}:
return key[0]
return _HashedSeq(key)


def dumps_to_feather(df):
columns = df.columns
df.columns = [str(e) for e in df.columns]
buffer = io.BytesIO() # 创建一个内存中的字节流缓冲区
df.to_feather(buffer) # 将 DataFrame 序列化为 Feather 格式并存储到缓冲区
buffer.seek(0) # 重新定位到字节流的开头
serialized_data = buffer.getvalue() # 获取序列化后的二进制数据
return {"data": serialized_data, "columns": list(columns)}


def loads_from_feather(data):
buffer = io.BytesIO(data["data"]) # 创建一个内存中的字节流缓冲区,并加载序列化的数据
df = pd.read_feather(buffer) # 从 Feather 格式的序列化数据中加载 DataFrame
df.columns = data["columns"]
return df


def _custom_encode(obj):
if isinstance(obj, set):
return {b'__set__': list(obj)} # 将 set 对象转换为字典形式
if isinstance(obj, pd.DataFrame):
return {b'__feather__': dumps_to_feather(obj)}
return msgpack_numpy.encode(obj)


def _custom_decode(obj):
if b'__set__' in obj: # 判断是否是我们之前转换过的 set 对象
return set(obj[b'__set__']) # 将字典形式的 set 转换回原始的 set 对象
if b'__feather__' in obj:
return loads_from_feather(obj[b'__feather__'])
return msgpack_numpy.decode(obj) # 不需要转换的情况下直接返回原始对象


class RedisTTLCache:
def __init__(self, host='localhost', port=6379):
self.pool = ConnectionPool(host=host, port=port)

def cache(self, ttl, scope=os.path.basename(__file__)):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 构建唯一的缓存键
cache_key = f"FUNC_CACHE_{(scope, func.__name__, _make_key(args, kwargs))}"

# 尝试从Redis中获取缓存
redis_client = redis.Redis(connection_pool=self.pool)
result = redis_client.get(cache_key)
if result is not None:
# 如果缓存存在,直接返回结果
result = msgpack.loads(result, object_hook=_custom_decode)
return result

# 如果缓存中不存在该结果,则重新计算函数结果
result = func(*args, **kwargs)

# 将计算结果存入Redis缓存
redis_client.set(cache_key, msgpack.dumps(result, default=_custom_encode), ex=ttl)

return result
return wrapper
return decorator


if __name__ == '__main__':
redis_ttl_cache = RedisTTLCache(host="localhost")
ttl_cached = redis_ttl_cache.cache

@ttl_cached(ttl=5)
def func(a, b):
print("Hello world")
return {
'result': a + b,
'colors': {'red', 'green', 'blue'},
'arr': np.array([[0, 1], [2,3]]),
'df': pd.DataFrame([[0, 1], [2,3]]),
}

print(func(1, 2))
print(func(1, 2))

这里的主要问题是使用什么作为redis的key来确保真正相同的调用使用到相同的缓存。

如果我们在key中拼入进程ID,那么则是非常保守的,不同进程将无法使用到相同的缓存。

我这里默认使用 脚本文件名+函数名+参数 作为key,但是或许有两个相同脚本名的脚本中定义了同名的但功能不同的函数,这时候不应该使用相同的缓存。

总之我们只能折中处理,因为我的目的就是跨进程使用缓存,所以没有特别保守,同时提供了scope参数,可以自定义一个拼入key的标识,在必要时由用户决定能否使用相同的缓存。

有时我们希望使用一些对数据大小或类型有限制的数据结构,例如树状数组(要求整型并且因为内存的关系最大数据范围通常为1e6量级)。

但是如果我们的数据个数在1e6之内,并且我们只关心数据的序,我们就可以在非负整数和我们的数据集之间建立映射来使用树状数组(或其他有限制的数据结构)。

那么这里我们就期望一个帮助建立映射的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
using namespace std;

// 返回一个整型数组包含 [0, arr.size()) 的数字,且和arr具有一致的序
template<typename T, typename CMP=less<T>>
vector<int> get_rank(const vector<T>& arr, CMP cmp=CMP()){
vector<int> idx(arr.size());
iota(idx.begin(), idx.end(), 0);
sort(idx.begin(), idx.end(), [&](auto i, auto j){
return cmp(arr[i], arr[j]);
});
// 我们在不改变arr的情况下对其下标数组进行排序
// 排序好后,对于任意i < j有arr[idx[i]] <= arr[idx[j]] (1)
// 如果我们构造一个rank,令rank[idx[i]] = i, 0<=i<n,
// 那么显然对于任意i < j有rank[idx[i]]=i < j=rank[idx[j]]
// 可以看到如果忽略(1)式中的等号,rank就已经和arr具有一致的序
// 所以下面我们就按这个思路,再顺便处理一下并列的情况就行了
vector<int> rank(arr.size());
rank[idx[0]] = 0;
for(int i=1; i<arr.size(); i++){
if(arr[idx[i]] == arr[idx[i-1]]){
rank[idx[i]] = rank[idx[i-1]];
} else {
rank[idx[i]] = i;
}
}
return rank;
}

上面的得到rank实际上是 arr -> 同序非负整数集 的映射。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding: utf-8 -*-
import openpyxl
import xlrd
import os

class Sheet:
def cell_value(self, row, col):
pass


class XlsxSheet(Sheet):
def __init__(self, sheet):
self.sheet = sheet

def cell_value(self, row, col):
return self.sheet.cell(row=row+1, column=col+1).value


class XlsSheet(Sheet):
def __init__(self, sheet):
self.sheet = sheet

def cell_value(self, row, col):
return self.sheet.cell_value(row, col)


class _Workbook:
def sheet_by_index(self, index):
pass


class XlsxFile(_Workbook):
def __init__(self, path):
self.workbook = openpyxl.load_workbook(path)

def sheet_by_index(self, index):
sheet = self.workbook[self.workbook.sheetnames[index]]
return XlsxSheet(sheet)


class XlsFile(_Workbook):
def __init__(self, path):
self.workbook = xlrd.open_workbook(path)

def sheet_by_index(self, index):
sheet = self.workbook.sheet_by_index(index)
return XlsSheet(sheet)


class Workbook:
def __init__(self, path):
filedir, filename = os.path.split(path)
_, extname = os.path.splitext(filename)
if extname == ".xlsx":
self.impl = XlsxFile(path)
elif extname == ".xls":
self.impl = XlsFile(path)
else:
raise IOError("未知的文件类型")

def sheet_by_index(self, index):
return self.impl.sheet_by_index(index)

让Sublime Text使用WSL中的Python执行.py脚本

找到C:\Program Files\Sublime Text\Packages\Python.sublime-package

copy一份将增加后缀.zip用解压软件打开。

我们增加一个文件来定义新的构建方式,新建文件WSL Python.sublime-build,输入内容

1
2
3
4
5
6
{
"cmd":["python.bat","${file}"],
"file_regex": "^[ ]*File \"(...*?)\", line ([0-9]*)",
"path":"C:/Program Files/Sublime Text/helper",
"selector": "source.python",
}

大概意思是由C:/Program Files/Sublime Text/helper/python.bat来执行通过source.python选出的文件,选出的理解为所有*.py即可。

新建文件C:/Program Files/Sublime Text/helper/python.bat,输入内容

1
2
3
4
5
6
@echo off
set str=%1
set str=%str:\=/%
set str=%str:C:=/mnt/c%
set str=%str:D:=/mnt/d%
C:\windows\system32\wsl.exe python -u %str%

重启Sublime Text,构建*.py时选择WSL Python即可。

让Sublime Text使用WSL中的g++编译运行.cpp文件

找到C:\Program Files\Sublime Text\Packages\C++.sublime-package

copy一份将增加后缀.zip用解压软件打开。

我们增加一个文件来定义新的构建方式,新建文件WSL C++ Single File.sublime-build,输入内容

1
2
3
4
5
6
{
"cmd":["cpp.bat","${file}", "${file_path}", "${file_base_name}"],
"file_regex": "^[ ]*File \"(...*?)\", line ([0-9]*)",
"path":"C:/Program Files/Sublime Text/helper",
"selector": "source.c++",
}

大概意思是由C:/Program Files/Sublime Text/helper/cpp.bat来执行通过source.c++选出的文件,选出的理解为所有*.cpp即可。

新建文件C:/Program Files/Sublime Text/helper/cpp.bat,输入内容

1
2
3
4
5
6
7
8
9
10
11
12
@echo off
set file=%1
set file_path=%2
set file_base_name=%3
set file=%file:\=/%
set file=%file:C:=/mnt/c%
set file=%file:D:=/mnt/d%
set file_path=%file_path:\=/%
set file_path=%file_path:C:=/mnt/c%
set file_path=%file_path:D:=/mnt/d%

C:\windows\system32\wsl.exe g++ -std=c++17 %file% -o %file_path%/%file_base_name% && C:\windows\system32\wsl.exe %file_path%/%file_base_name%

重启Sublime Text,构建*.cpp时选择WSL C++ Single File即可。

下面的代码实现了3个有用的东西:

  • KillableThread 一个可kill的线程,并且可以通过join返回线程方法的数据,以及在join时把线程内的异常重新抛出到调用join的线程。
  • timeout装饰器 超时自动抛出TimeoutError异常。
  • retry装饰器 提供异常时自动重试,可以指定重试次数,0表示不重试,重试次数超过后会把最后一次的异常向外抛出。

func_utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
import threading
import time
import inspect
import ctypes
import traceback
import sys, os
from functools import wraps

class KillableThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return_value = None
self._exception = None

def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")

def kill(self):
KillableThread._async_raise(self.ident, SystemExit)

def run(self):
try:
self._return_value = self._target(*self._args, **self._kwargs)
except Exception as e:
self._exception = e

def join(self):
super().join()
if self._exception is not None:
raise self._exception
return self._return_value

def join(self, timeout):
super().join(timeout)
if self._exception is not None:
raise self._exception
return self._return_value


def _get_thread(tid):
for t in threading.enumerate():
if t.ident == tid:
return t
return None


def _get_frame_stack(tid):
for thread_id, stack in sys._current_frames().items():
if thread_id == tid:
return stack
return None


def _get_formated_frame_stack(tid):
info = []
th = _get_thread(tid)
stack = _get_frame_stack(tid)
info.append('%s thread_id=%d' % (th.name, tid))
for filename, lineno, _, line in traceback.extract_stack(stack):
info.append(' at %s(%s:%d)' % (line, filename[filename.rfind(os.path.sep) + 1:], lineno))
return '\n'.join(info)


def timeout(seconds):
"""
Decorator to execute a function with a specified timeout.

Args:
- seconds (int): The time limit in seconds for the function to complete.

Returns:
- function: The decorated function.

Raises:
- TimeoutError: If the function does not complete within the specified time limit.

Usage:
@timeout(seconds=10)
def my_function():
# Function body
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
th = KillableThread(target=func, args=args, kwargs=kwargs)
th.daemon = True
th.start()
ret = th.join(seconds)
if th.is_alive():
formated_frame_stack = _get_formated_frame_stack(th.ident)
th.kill()
raise TimeoutError(f"{repr(func)} timeout. Frame stack:\n{formated_frame_stack}")
return ret
return wrapper
return decorator


def retry(retries=1, retry_interval=0):
"""
Decorator to retry a function a specified number of times with a given interval between retries.

Args:
- retries (int): The number of times the function should be retried if it raises an exception. If set to 1, the function will be attempted initially and retried once.
- retry_interval (int): The time interval in seconds to wait between retries.

Returns:
- function: The decorated function.

Raises:
- The original exception raised by the function if all retries fail.

Usage:
@retry(retries=2, retry_interval=2)
def my_function():
# Function body
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(retries+1):
try:
return func(*args, **kwargs)
except Exception as e:
if i < retries:
time.sleep(retry_interval)
else:
raise e
return wrapper
return decorator

对logging模块的封装,修正了标准库的TimedRotatingFileHandler在多进程时可能把前一日的日志误删除的问题。

通常使用只需要导入init_loggerlogger即可

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# -*- coding: utf-8 -*-
import logging, os, sys, time, re
from stat import ST_MTIME
from logging.handlers import BaseRotatingHandler, _MIDNIGHT
# logging.basicConfig(format='%(asctime)s %(levelname)s [%(filename)s->%(funcName)s:%(lineno)d]\t%(message)s',
# level=logging.DEBUG)


class TimedRotatingFileHandler(BaseRotatingHandler):
"""
Handler for logging to a file, rotating the log file at certain timed
intervals.

If backupCount is > 0, when rollover is done, no more than backupCount
files are kept - the oldest ones are deleted.
"""
def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):
BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
self.when = when.upper()
self.backupCount = backupCount
self.utc = utc
self.atTime = atTime
# Calculate the real rollover interval, which is just the number of
# seconds between rollovers. Also set the filename suffix used when
# a rollover occurs. Current 'when' events supported:
# S - Seconds
# M - Minutes
# H - Hours
# D - Days
# midnight - roll over at midnight
# W{0-6} - roll over on a certain day; 0 - Monday
#
# Case of the 'when' specifier is not important; lower or upper case
# will work.
if self.when == 'S':
self.interval = 1 # one second
self.suffix = "%Y-%m-%d_%H-%M-%S"
self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"
elif self.when == 'M':
self.interval = 60 # one minute
self.suffix = "%Y-%m-%d_%H-%M"
self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}(\.\w+)?$"
elif self.when == 'H':
self.interval = 60 * 60 # one hour
self.suffix = "%Y-%m-%d_%H"
self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}(\.\w+)?$"
elif self.when == 'D' or self.when == 'MIDNIGHT':
self.interval = 60 * 60 * 24 # one day
self.suffix = "%Y-%m-%d"
self.extMatch = r"^\d{4}-\d{2}-\d{2}(\.\w+)?$"
elif self.when.startswith('W'):
self.interval = 60 * 60 * 24 * 7 # one week
if len(self.when) != 2:
raise ValueError("You must specify a day for weekly rollover from 0 to 6 (0 is Monday): %s" % self.when)
if self.when[1] < '0' or self.when[1] > '6':
raise ValueError("Invalid day specified for weekly rollover: %s" % self.when)
self.dayOfWeek = int(self.when[1])
self.suffix = "%Y-%m-%d"
self.extMatch = r"^\d{4}-\d{2}-\d{2}(\.\w+)?$"
else:
raise ValueError("Invalid rollover interval specified: %s" % self.when)

self.extMatch = re.compile(self.extMatch, re.ASCII)
self.interval = self.interval * interval # multiply by units requested
# The following line added because the filename passed in could be a
# path object (see Issue #27493), but self.baseFilename will be a string
filename = self.baseFilename
if os.path.exists(filename):
t = os.stat(filename)[ST_MTIME]
else:
t = int(time.time())
self.rolloverAt = self.computeRollover(t)

def computeRollover(self, currentTime):
"""
Work out the rollover time based on the specified time.
"""
result = currentTime + self.interval
# If we are rolling over at midnight or weekly, then the interval is already known.
# What we need to figure out is WHEN the next interval is. In other words,
# if you are rolling over at midnight, then your base interval is 1 day,
# but you want to start that one day clock at midnight, not now. So, we
# have to fudge the rolloverAt value in order to trigger the first rollover
# at the right time. After that, the regular interval will take care of
# the rest. Note that this code doesn't care about leap seconds. :)
if self.when == 'MIDNIGHT' or self.when.startswith('W'):
# This could be done with less code, but I wanted it to be clear
if self.utc:
t = time.gmtime(currentTime)
else:
t = time.localtime(currentTime)
currentHour = t[3]
currentMinute = t[4]
currentSecond = t[5]
currentDay = t[6]
# r is the number of seconds left between now and the next rotation
if self.atTime is None:
rotate_ts = _MIDNIGHT
else:
rotate_ts = ((self.atTime.hour * 60 + self.atTime.minute)*60 +
self.atTime.second)

r = rotate_ts - ((currentHour * 60 + currentMinute) * 60 +
currentSecond)
if r < 0:
# Rotate time is before the current time (for example when
# self.rotateAt is 13:45 and it now 14:15), rotation is
# tomorrow.
r += _MIDNIGHT
currentDay = (currentDay + 1) % 7
result = currentTime + r
# If we are rolling over on a certain day, add in the number of days until
# the next rollover, but offset by 1 since we just calculated the time
# until the next day starts. There are three cases:
# Case 1) The day to rollover is today; in this case, do nothing
# Case 2) The day to rollover is further in the interval (i.e., today is
# day 2 (Wednesday) and rollover is on day 6 (Sunday). Days to
# next rollover is simply 6 - 2 - 1, or 3.
# Case 3) The day to rollover is behind us in the interval (i.e., today
# is day 5 (Saturday) and rollover is on day 3 (Thursday).
# Days to rollover is 6 - 5 + 3, or 4. In this case, it's the
# number of days left in the current week (1) plus the number
# of days in the next week until the rollover day (3).
# The calculations described in 2) and 3) above need to have a day added.
# This is because the above time calculation takes us to midnight on this
# day, i.e. the start of the next day.
if self.when.startswith('W'):
day = currentDay # 0 is Monday
if day != self.dayOfWeek:
if day < self.dayOfWeek:
daysToWait = self.dayOfWeek - day
else:
daysToWait = 6 - day + self.dayOfWeek + 1
newRolloverAt = result + (daysToWait * (60 * 60 * 24))
if not self.utc:
dstNow = t[-1]
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
result = newRolloverAt
return result

def shouldRollover(self, record):
"""
Determine if rollover should occur.

record is not used, as we are just comparing times, but it is needed so
the method signatures are the same
"""
t = int(time.time())
if t >= self.rolloverAt:
return 1
return 0

def getFilesToDelete(self):
"""
Determine the files to delete when rolling over.

More specific than the earlier method, which just used glob.glob().
"""
dirName, baseName = os.path.split(self.baseFilename)
fileNames = os.listdir(dirName)
result = []
prefix = baseName + "."
plen = len(prefix)
for fileName in fileNames:
if fileName[:plen] == prefix:
suffix = fileName[plen:]
if self.extMatch.match(suffix):
result.append(os.path.join(dirName, fileName))
if len(result) < self.backupCount:
result = []
else:
result.sort()
result = result[:len(result) - self.backupCount]
return result

def doRollover(self):
"""
do a rollover; in this case, a date/time stamp is appended to the filename
when the rollover happens. However, you want the file to be named for the
start of the interval, not the current time. If there is a backup count,
then we have to get a list of matching filenames, sort them and remove
the one with the oldest suffix.
"""
if self.stream:
self.stream.close()
self.stream = None
# get the time that this sequence started at and make it a TimeTuple
currentTime = int(time.time())
dstNow = time.localtime(currentTime)[-1]
t = self.rolloverAt - self.interval
if self.utc:
timeTuple = time.gmtime(t)
else:
timeTuple = time.localtime(t)
dstThen = timeTuple[-1]
if dstNow != dstThen:
if dstNow:
addend = 3600
else:
addend = -3600
timeTuple = time.localtime(t + addend)
dfn = self.rotation_filename(self.baseFilename + "." +
time.strftime(self.suffix, timeTuple))

# 修正多进程时日志前一天的日志可能在滚动时被误删
if os.path.exists(dfn):
ori_dfn = dfn
i = 1
while os.path.exists(dfn):
i += 1
dfn = f"{ori_dfn}.{i}"

self.rotate(self.baseFilename, dfn)
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
if not self.delay:
self.stream = self._open()
newRolloverAt = self.computeRollover(currentTime)
while newRolloverAt <= currentTime:
newRolloverAt = newRolloverAt + self.interval
#If DST changes and midnight or weekly rollover, adjust for this.
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
self.rolloverAt = newRolloverAt

# name 用于支持同时输出多个日志文件,如果仅需要一个文件则可以忽略此参数,保持为'main'即可
# echo 表示需要回显到控制台
# filedir 自定义日志文件目录,默认为当前目录
# filename 自定义日志的文件名,为None或False时不输出到文件,为True时自动根据脚本名生成文件名
# level 定义日志输出的过滤等级,表示需要输出的最低等级,指定为INFO则DEBUG级别的日志不会输出
# fmt 配置每行日志的格式,None表示使用默认的格式,详见logging模块的文档
# backupCount 默认为7,日志按天切割保留最近7天的日志,如果指定为None表示不按天切割
def init_logger(name="main", echo=True, filedir=None, filename=None, level=None, fmt=None, backupCount=7):
fmt = fmt or '%(asctime)s %(levelname)s [%(filename)s->%(funcName)s:%(lineno)d]\t%(message)s'
formatter = logging.Formatter(fmt) #设置日志格式
logger = logging.getLogger(name)
if level:
if level == "DEBUG" or level == logging.DEBUG:
logger.setLevel(level = logging.DEBUG)
elif level == "INFO" or level == logging.INFO:
logger.setLevel(level = logging.INFO)
elif level == "WARN" or level == "WARNING" or level == logging.WARN:
logger.setLevel(level = logging.WARN)
elif level == "FATAL" or level == logging.FATAL:
logger.setLevel(level = logging.FATAL)
else:
logger.setLevel(level = level)
if not logger.hasHandlers():
if echo:
console_handler = logging.StreamHandler() # 控制台Handler
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG)
# if logger.hasHandlers()
logger.addHandler(console_handler)
if filename:
if type(filename) == bool and filename:
filename = get_logfile_name()
if filedir is not None:
_, filename = os.path.split(filename)
filename = os.path.join(filedir, filename)
else:
if filedir is not None:
filename = os.path.join(filedir, filename)

filedir, _ = os.path.split(filename)
if filedir and not os.path.exists(filedir):
os.makedirs(filedir)

if backupCount:
file_handler = TimedRotatingFileHandler(filename=filename, when='MIDNIGHT', interval=1, backupCount=backupCount, encoding='utf-8')
else:
file_handler = logging.FileHandler(filename=filename, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.info(f"logger({name or ''}) initialized. [level] {level} [format] {fmt} [filename] {filename}")
return logger


def get_logger(name=None):
return logging.getLogger(name)


def get_logfile_name():
path, filename = os.path.split(sys.argv[0])
name, ext = os.path.splitext(filename)
return os.path.join(path, f"{name}.log")


logger = get_logger(name="main")

用法

1
2
3
4
5
6
7
8
9
10

from utils.logger import logger, init_logger

if __name__ == '__main__':
init_logger(level="INFO", echo=True, filename=True, filedir="logs")

logger.debug("test")
logger.info("test")
logger.warning("test")

新建或编辑~/.screenrc

增加

1
termcapinfo xterm|xterms|xs|rxvt ti@:te@

这样只对当前用户生效,如果要对所有人生效,则编辑vim /etc/screenrc

找到上面的语句并解开注释,或者新增上面的语句。