用Python通过IMAP读取邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# -*- coding: utf-8 -*-
import imaplib
imaplib._MAXLINE = 10000000
import email
import email.utils
import re
import os
import datetime
import pytz
import traceback
from dataclasses import dataclass


def decode(s, charset):
if type(s) is str:
return s
try:
return s.decode(charset)
except Exception:
pass
try:
return s.decode('utf-8')
except Exception:
pass
try:
return s.decode('latin1')
except Exception as e:
pass
return s.decode('gbk')


class Attachment:
def __init__(self, part):
self.content_type = part.get_content_type()
raw_filename = part.get_filename() # .strip()
# print(dir(part), raw_filename)
if raw_filename.startswith("=?") and raw_filename.endswith("?="):
dh = email.header.decode_header(raw_filename)
self.filename = decode(dh[0][0], dh[0][1])
else:
h = email.header.Header(raw_filename)
dh = email.header.decode_header(h)
self.filename = decode(dh[0][0], dh[0][1])
self.data = part.get_payload(decode=True) #下载附件

def __repr__(self):
return f"Attachment(content_type='{self.content_type}', filename='{self.filename}', size={len(self.data)})"

def save_to(self, path):
if os.path.exists(path):
if os.path.isdir(path): # 已附件原文件名保存到目录下
path = os.path.join(path, self.filename)
with open(path, 'wb') as fp:
fp.write(self.data)
else: # 覆盖已存在文件
with open(path, 'wb') as fp:
fp.write(self.data)
else: # 新建文件
with open(path, 'wb') as fp:
fp.write(self.data)


class Mail:
def __init__(self, num, msg):
# 这些字段是在读取邮件列表时就解析的
self.num = num
self.subject: str = self._decode_value(msg.get("subject"))
date = email.utils.parsedate_to_datetime(msg.get("date"))
if date:
timezone = pytz.timezone('Asia/Shanghai')
date = date.astimezone(timezone) # 设置时区为+8区
date = date.replace(tzinfo=None) # 移除时区信息
self.date: str = str(date) if date else msg.get("date")
from_name, self.from_addr = email.utils.parseaddr(msg.get("from"))
self.from_name = self._decode_value(from_name)
to_name, self.to_addr = email.utils.parseaddr(msg.get("to"))
self.to_name = self._decode_value(to_name)

# 这些字段是延迟到需要访问时才解析的
self._plain: str = ""
self._html: str = ""
self._attachments: list = []

self._msg: str = msg
self._parsed: bool = False

def _decode_value(self, value):
try:
header = email.header.decode_header(value)
raw_value = email.header.decode_header(value)[0][0]
charset = email.header.decode_header(value)[0][1]
return decode(raw_value, charset)
except Exception as e:
print(f"decode failed [value] {value}")
traceback.print_exc()
return None

@property
def plain(self):
# 为了延迟解析邮件内容
if not self._parsed:
self.parse_content()
return self._plain

@property
def html(self):
if not self._parsed:
self.parse_content()
return self._html

@property
def attachments(self):
if not self._parsed:
self.parse_content()
return self._attachments

# 解析mail的内容
def parse_content(self):
self._attachments = []
for part in self._msg.walk():
if part.is_multipart():
continue
if part.get_content_type() == "text/plain":
charset = part.get_content_charset()
content = decode(part.get_payload(decode=True), charset)
self._plain = content
if part.get_content_type() == "text/html":
charset = part.get_content_charset()
content = decode(part.get_payload(decode=True), charset)
self._html = content

if part.get_content_disposition():
if part.get_content_disposition() == "inline":
# HTML内容引用的图片之类的
pass
elif part.get_content_disposition() == "attachment":
# 附件
self._attachments.append(Attachment(part))
if self._plain:
self._html = ""
self._parsed = True


class ImapMailBox:
def __init__(self, host, username, password, port=None, ssl=None):
self.host = host
self.port = port
self.ssl = ssl
self.username = username
self.password = password
if ssl is None and port == 993:
ssl = True
if ssl:
# connecting to host via SSL
self.conn = imaplib.IMAP4_SSL(host=host, port=port or 993)
else:
self.conn = imaplib.IMAP4(host=host, port=port or 143)
# logging in to servers
self.conn.login(username, password)

def get_mail_count(self):
# Selecting the inbox of the logged in account
self.conn.select('Inbox')
state, data = self.conn.search(None, 'ALL')
mail_list = []
mails = data[0].split()
return len(mails)

def get_mail_list(self, page=1, page_size=50):
# Selecting the inbox of the logged in account
self.conn.select('Inbox')
state, data = self.conn.search(None, 'ALL')
mail_list = []
mails = data[0].split()[::-1]
if page_size:
mails = mails[(page-1)*page_size: page*page_size]
for num in mails:
state, data = self.conn.fetch(num, '(RFC822)')
raw_email = data[0][1]
try:
msg = email.message_from_bytes(raw_email)
mail = Mail(num, msg)
mail_list.append(mail)
except Exception as e:
print(f"Parse raw data failed. [raw_data] '{raw_email}'")
traceback.print_exc()
return mail_list

def mark_as_seen(self, mail):
self.conn.store(mail.num, '+FLAGS', '\\seen')


if __name__ == '__main__':
mailbox = ImapMailBox(
host='imap.aliyun.com', port=993,
username="******", password="******"
)
count = mailbox.get_mail_count()
# 收件箱里的邮件数
print(count)
# 分页获取邮件
for mail in mailbox.get_mail_list(page=1, page_size=25):
# 打印 日期、发件人、标题、纯文本内容
print(mail.date, mail.from_addr, mail.subject, mail.plain)

# 如果有附件,就下载保存到本地
if mail.attachments:
for attachment in mail.attachments:
attachment.save_to("./")