mirror of
https://github.com/django/django.git
synced 2024-12-22 09:05:43 +00:00
Refs #35581 -- Used modern email parser and helpers in mail tests.
- Used modern email API (policy.default) for tests that reparse generated messages, and switched to modern accessors where helpful. - Split get_raw_attachments() helper out of get_decoded_attachments(), and used modern iter_attachments() to avoid finding nested attachments in attached message/* emails. - Stopped using legacy parseaddr.
This commit is contained in:
parent
bddd35cb1a
commit
5d7001b578
@ -4,10 +4,10 @@ import shutil
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
from email import charset, message_from_binary_file, message_from_bytes
|
||||
from email.header import Header
|
||||
from email import charset, message_from_binary_file
|
||||
from email import message_from_bytes as _message_from_bytes
|
||||
from email import policy
|
||||
from email.mime.text import MIMEText
|
||||
from email.utils import parseaddr
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from smtplib import SMTP, SMTPException
|
||||
@ -41,6 +41,14 @@ except ImportError:
|
||||
HAS_AIOSMTPD = False
|
||||
|
||||
|
||||
def message_from_bytes(s):
|
||||
"""
|
||||
email.message_from_bytes() using modern email.policy.default.
|
||||
Returns a modern email.message.EmailMessage.
|
||||
"""
|
||||
return _message_from_bytes(s, policy=policy.default)
|
||||
|
||||
|
||||
class MailTestsMixin:
|
||||
def assertMessageHasHeaders(self, message, headers):
|
||||
"""
|
||||
@ -101,24 +109,35 @@ class MailTestsMixin:
|
||||
"First string doesn't end with the second.",
|
||||
)
|
||||
|
||||
def get_decoded_attachments(self, django_message):
|
||||
def get_raw_attachments(self, django_message):
|
||||
"""
|
||||
Encode the specified django.core.mail.message.EmailMessage, then decode
|
||||
it using Python's email.parser module and, for each attachment of the
|
||||
message, return a list of tuples with (filename, content, mimetype).
|
||||
Return a list of the raw attachment parts in the MIME message generated
|
||||
by serializing django_message and reparsing the result.
|
||||
|
||||
This returns only "top-level" attachments. It will not descend into
|
||||
message/* attached emails to find nested attachments.
|
||||
"""
|
||||
msg_bytes = django_message.message().as_bytes()
|
||||
email_message = message_from_bytes(msg_bytes)
|
||||
message = message_from_bytes(msg_bytes)
|
||||
return list(message.iter_attachments())
|
||||
|
||||
def iter_attachments():
|
||||
for i in email_message.walk():
|
||||
if i.get_content_disposition() == "attachment":
|
||||
filename = i.get_filename()
|
||||
content = i.get_payload(decode=True)
|
||||
mimetype = i.get_content_type()
|
||||
yield filename, content, mimetype
|
||||
def get_decoded_attachments(self, django_message):
|
||||
"""
|
||||
Return a list of decoded attachments resulting from serializing
|
||||
django_message and reparsing the result.
|
||||
|
||||
return list(iter_attachments())
|
||||
Each attachment is returned as an EmailAttachment named tuple with
|
||||
fields filename, content, and mimetype. The content will be decoded
|
||||
to str for mimetype text/*; retained as bytes for other mimetypes.
|
||||
"""
|
||||
return [
|
||||
EmailAttachment(
|
||||
attachment.get_filename(),
|
||||
attachment.get_content(),
|
||||
attachment.get_content_type(),
|
||||
)
|
||||
for attachment in self.get_raw_attachments(django_message)
|
||||
]
|
||||
|
||||
|
||||
class MailTests(MailTestsMixin, SimpleTestCase):
|
||||
@ -684,7 +703,7 @@ class MailTests(MailTestsMixin, SimpleTestCase):
|
||||
self.assertEqual(msg.attachments[0].mimetype, mime_type)
|
||||
|
||||
attachments = self.get_decoded_attachments(msg)
|
||||
self.assertEqual(attachments[0], (file_name, file_content.encode(), mime_type))
|
||||
self.assertEqual(attachments[0], (file_name, file_content, mime_type))
|
||||
|
||||
def test_attachments_constructor(self):
|
||||
file_name = "example.txt"
|
||||
@ -706,7 +725,7 @@ class MailTests(MailTestsMixin, SimpleTestCase):
|
||||
self.assertEqual(msg.attachments[0].mimetype, mime_type)
|
||||
|
||||
attachments = self.get_decoded_attachments(msg)
|
||||
self.assertEqual(attachments[0], (file_name, file_content.encode(), mime_type))
|
||||
self.assertEqual(attachments[0], (file_name, file_content, mime_type))
|
||||
|
||||
def test_attachments_constructor_from_tuple(self):
|
||||
file_name = "example.txt"
|
||||
@ -726,7 +745,7 @@ class MailTests(MailTestsMixin, SimpleTestCase):
|
||||
self.assertEqual(msg.attachments[0].mimetype, mime_type)
|
||||
|
||||
attachments = self.get_decoded_attachments(msg)
|
||||
self.assertEqual(attachments[0], (file_name, file_content.encode(), mime_type))
|
||||
self.assertEqual(attachments[0], (file_name, file_content, mime_type))
|
||||
|
||||
def test_attachments_constructor_omit_mimetype(self):
|
||||
"""
|
||||
@ -767,10 +786,8 @@ class MailTests(MailTestsMixin, SimpleTestCase):
|
||||
msg = EmailMessage(body="Content")
|
||||
# Unicode in file name
|
||||
msg.attach("une pièce jointe.pdf", b"%PDF-1.4.%...", mimetype="application/pdf")
|
||||
msg_bytes = msg.message().as_bytes()
|
||||
message = message_from_bytes(msg_bytes)
|
||||
payload = message.get_payload()
|
||||
self.assertEqual(payload[1].get_filename(), "une pièce jointe.pdf")
|
||||
attachment = self.get_decoded_attachments(msg)[0]
|
||||
self.assertEqual(attachment.filename, "une pièce jointe.pdf")
|
||||
|
||||
def test_attach_file(self):
|
||||
"""
|
||||
@ -830,7 +847,7 @@ class MailTests(MailTestsMixin, SimpleTestCase):
|
||||
msg.attach("file.txt", b"file content")
|
||||
filename, content, mimetype = self.get_decoded_attachments(msg)[0]
|
||||
self.assertEqual(filename, "file.txt")
|
||||
self.assertEqual(content, b"file content")
|
||||
self.assertEqual(content, "file content") # (decoded)
|
||||
self.assertEqual(mimetype, "text/plain")
|
||||
|
||||
def test_attach_utf8_text_as_bytes(self):
|
||||
@ -842,7 +859,7 @@ class MailTests(MailTestsMixin, SimpleTestCase):
|
||||
msg.attach("file.txt", b"\xc3\xa4") # UTF-8 encoded a umlaut.
|
||||
filename, content, mimetype = self.get_decoded_attachments(msg)[0]
|
||||
self.assertEqual(filename, "file.txt")
|
||||
self.assertEqual(content, b"\xc3\xa4")
|
||||
self.assertEqual(content, "ä") # (decoded)
|
||||
self.assertEqual(mimetype, "text/plain")
|
||||
|
||||
def test_attach_non_utf8_text_as_bytes(self):
|
||||
@ -1341,10 +1358,9 @@ class BaseEmailBackendTests(MailTestsMixin):
|
||||
num_sent = mail.get_connection().send_messages([email])
|
||||
self.assertEqual(num_sent, 1)
|
||||
message = self.get_the_message()
|
||||
self.assertEqual(message["subject"], "=?utf-8?q?Ch=C3=A8re_maman?=")
|
||||
self.assertEqual(
|
||||
message.get_payload(decode=True).decode(), "Je t'aime très fort"
|
||||
)
|
||||
self.assertEqual(message["subject"], "Chère maman")
|
||||
self.assertIn(b"Subject: =?utf-8?q?Ch=C3=A8re_maman?=", message.as_bytes())
|
||||
self.assertEqual(message.get_content(), "Je t'aime très fort")
|
||||
|
||||
def test_send_long_lines(self):
|
||||
"""
|
||||
@ -1390,8 +1406,10 @@ class BaseEmailBackendTests(MailTestsMixin):
|
||||
)
|
||||
email.send()
|
||||
message = self.get_the_message()
|
||||
self.assertEqual(
|
||||
message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>"
|
||||
self.assertEqual(message["from"], "Firstname Sürname <from@example.com>")
|
||||
self.assertIn(
|
||||
b"From: =?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>",
|
||||
message.as_bytes(),
|
||||
)
|
||||
|
||||
def test_plaintext_send_mail(self):
|
||||
@ -1623,7 +1641,10 @@ class LocmemBackendTests(BaseEmailBackendTests, SimpleTestCase):
|
||||
email_backend = "django.core.mail.backends.locmem.EmailBackend"
|
||||
|
||||
def get_mailbox_content(self):
|
||||
return [m.message() for m in mail.outbox]
|
||||
# Reparse as modern messages to work with shared BaseEmailBackendTests.
|
||||
# (Once EmailMessage.message() uses Python's modern email API, this
|
||||
# can be changed back to `[m.message() for m in mail.outbox]`.)
|
||||
return [message_from_bytes(m.message().as_bytes()) for m in mail.outbox]
|
||||
|
||||
def flush_mailbox(self):
|
||||
mail.outbox = []
|
||||
@ -1702,7 +1723,7 @@ class FileBackendTests(BaseEmailBackendTests, SimpleTestCase):
|
||||
|
||||
self.assertEqual(len(os.listdir(self.tmp_dir)), 1)
|
||||
with open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0]), "rb") as fp:
|
||||
message = message_from_binary_file(fp)
|
||||
message = message_from_binary_file(fp, policy=policy.default)
|
||||
self.assertEqual(message.get_content_type(), "text/plain")
|
||||
self.assertEqual(message.get("subject"), "Subject")
|
||||
self.assertEqual(message.get("from"), "from@example.com")
|
||||
@ -1791,17 +1812,13 @@ class SMTPHandler:
|
||||
mail_from = envelope.mail_from
|
||||
|
||||
message = message_from_bytes(data.rstrip())
|
||||
message_addr = parseaddr(message.get("from"))[1]
|
||||
if mail_from != message_addr:
|
||||
# According to the spec, mail_from does not necessarily match the
|
||||
# From header - this is the case where the local part isn't
|
||||
# encoded, so try to correct that.
|
||||
lp, domain = mail_from.split("@", 1)
|
||||
lp = Header(lp, "utf-8").encode()
|
||||
mail_from = "@".join([lp, domain])
|
||||
try:
|
||||
header_from = message["from"].addresses[0].addr_spec
|
||||
except (KeyError, IndexError):
|
||||
header_from = None
|
||||
|
||||
if mail_from != message_addr:
|
||||
return f"553 '{mail_from}' != '{message_addr}'"
|
||||
if mail_from != header_from:
|
||||
return f"553 '{mail_from}' != '{header_from}'"
|
||||
self.mailbox.append(message)
|
||||
return "250 OK"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user