"""
Main SMTP client class.
Implementation is split into the following parent classes:
* :class:`.auth.SMTPAuth` - login and authentication methods
* :class:`.esmtp.ESMTP` - ESMTP command support
* :class:`.connection.SMTPConnection` - connection handling
"""
import asyncio
import email.message
from typing import Dict, Iterable, Optional, Sequence, Tuple, Union
from .auth import SMTPAuth
from .connection import SMTPConnection
from .default import Default, _default
from .email import extract_recipients, extract_sender, flatten_message
from .errors import (
SMTPNotSupported,
SMTPRecipientRefused,
SMTPRecipientsRefused,
SMTPResponseException,
)
from .response import SMTPResponse
from .sync import async_to_sync
__all__ = ("SMTP",)
[docs]class SMTP(SMTPAuth):
"""
Main SMTP client class.
Basic usage:
>>> loop = asyncio.get_event_loop()
>>> smtp = aiosmtplib.SMTP(hostname="127.0.0.1", port=1025)
>>> loop.run_until_complete(smtp.connect())
(220, ...)
>>> sender = "root@localhost"
>>> recipients = ["somebody@localhost"]
>>> message = "Hello World"
>>> send = smtp.sendmail(sender, recipients, "Hello World")
>>> loop.run_until_complete(send)
({}, 'OK')
"""
[docs] def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._sendmail_lock = None # type: Optional[asyncio.Lock]
# Hack to make Sphinx find the SMTPConnection docstring
__init__.__doc__ = SMTPConnection.__init__.__doc__
[docs] async def sendmail(
self,
sender: str,
recipients: Union[str, Sequence[str]],
message: Union[str, bytes],
mail_options: Optional[Iterable[str]] = None,
rcpt_options: Optional[Iterable[str]] = None,
timeout: Optional[Union[float, Default]] = _default,
) -> Tuple[Dict[str, SMTPResponse], str]:
"""
This command performs an entire mail transaction.
The arguments are:
- sender: The address sending this mail.
- recipients: A list of addresses to send this mail to. A bare
string will be treated as a list with 1 address.
- message: The message string to send.
- mail_options: List of options (such as ESMTP 8bitmime) for the
MAIL command.
- rcpt_options: List of options (such as DSN commands) for all the
RCPT commands.
message must be a string containing characters in the ASCII range.
The string is encoded to bytes using the ascii codec, and lone \\\\r
and \\\\n characters are converted to \\\\r\\\\n characters.
If there has been no previous HELO or EHLO command this session, this
method tries EHLO first.
This method will return normally if the mail is accepted for at least
one recipient. It returns a tuple consisting of:
- an error dictionary, with one entry for each recipient that was
refused. Each entry contains a tuple of the SMTP error code
and the accompanying error message sent by the server.
- the message sent by the server in response to the DATA command
(often containing a message id)
Example:
>>> loop = asyncio.get_event_loop()
>>> smtp = aiosmtplib.SMTP(hostname="127.0.0.1", port=1025)
>>> loop.run_until_complete(smtp.connect())
(220, ...)
>>> recipients = ["one@one.org", "two@two.org", "3@three.org"]
>>> message = "From: Me@my.org\\nSubject: testing\\nHello World"
>>> send_coro = smtp.sendmail("me@my.org", recipients, message)
>>> loop.run_until_complete(send_coro)
({}, 'OK')
>>> loop.run_until_complete(smtp.quit())
(221, Bye)
In the above example, the message was accepted for delivery for all
three addresses. If delivery had been only successful to two
of the three addresses, and one was rejected, the response would look
something like::
(
{"nobody@three.org": (550, "User unknown")},
"Written safely to disk. #902487694.289148.12219.",
)
If delivery is not successful to any addresses,
:exc:`.SMTPRecipientsRefused` is raised.
If :exc:`.SMTPResponseException` is raised by this method, we try to
send an RSET command to reset the server envelope automatically for
the next attempt.
:raises SMTPRecipientsRefused: delivery to all recipients failed
:raises SMTPResponseException: on invalid response
"""
if isinstance(recipients, str):
recipients = [recipients]
if mail_options is None:
mail_options = []
else:
mail_options = list(mail_options)
if rcpt_options is None:
rcpt_options = []
else:
rcpt_options = list(rcpt_options)
if any(option.lower() == "smtputf8" for option in mail_options):
mailbox_encoding = "utf-8"
else:
mailbox_encoding = "ascii"
if self._sendmail_lock is None:
self._sendmail_lock = asyncio.Lock()
async with self._sendmail_lock:
# Make sure we've done an EHLO for extension checks
await self._ehlo_or_helo_if_needed()
if mailbox_encoding == "utf-8" and not self.supports_extension("smtputf8"):
raise SMTPNotSupported("SMTPUTF8 is not supported by this server")
if self.supports_extension("size"):
size_option = "size={}".format(len(message))
mail_options.insert(0, size_option)
try:
await self.mail(
sender,
options=mail_options,
encoding=mailbox_encoding,
timeout=timeout,
)
recipient_errors = await self._send_recipients(
recipients, rcpt_options, encoding=mailbox_encoding, timeout=timeout
)
response = await self.data(message, timeout=timeout)
except (SMTPResponseException, SMTPRecipientsRefused) as exc:
# If we got an error, reset the envelope.
try:
await self.rset(timeout=timeout)
except (ConnectionError, SMTPResponseException):
# If we're disconnected on the reset, or we get a bad
# status, don't raise that as it's confusing
pass
raise exc
return recipient_errors, response.message
async def _send_recipients(
self,
recipients: Sequence[str],
options: Iterable[str],
encoding: str = "ascii",
timeout: Optional[Union[float, Default]] = _default,
) -> Dict[str, SMTPResponse]:
"""
Send the recipients given to the server. Used as part of
:meth:`.sendmail`.
"""
recipient_errors = []
for address in recipients:
try:
await self.rcpt(
address, options=options, encoding=encoding, timeout=timeout
)
except SMTPRecipientRefused as exc:
recipient_errors.append(exc)
if len(recipient_errors) == len(recipients):
raise SMTPRecipientsRefused(recipient_errors)
formatted_errors = {
err.recipient: SMTPResponse(err.code, err.message)
for err in recipient_errors
}
return formatted_errors
[docs] async def send_message(
self,
message: Union[email.message.EmailMessage, email.message.Message],
sender: Optional[str] = None,
recipients: Optional[Union[str, Sequence[str]]] = None,
mail_options: Optional[Iterable[str]] = None,
rcpt_options: Optional[Iterable[str]] = None,
timeout: Optional[Union[float, Default]] = _default,
) -> Tuple[Dict[str, SMTPResponse], str]:
r"""
Sends an :py:class:`email.message.EmailMessage` object.
Arguments are as for :meth:`.sendmail`, except that message is an
:py:class:`email.message.EmailMessage` object. If sender is None or
recipients is None, these arguments are taken from the headers of the
EmailMessage as described in RFC 2822. Regardless of the values of sender
and recipients, any Bcc field (or Resent-Bcc field, when the message is a
resent) of the EmailMessage object will not be transmitted. The EmailMessage
object is then serialized using :py:class:`email.generator.Generator` and
:meth:`.sendmail` is called to transmit the message.
'Resent-Date' is a mandatory field if the message is resent (RFC 2822
Section 3.6.6). In such a case, we use the 'Resent-\*' fields.
However, if there is more than one 'Resent-' block there's no way to
unambiguously determine which one is the most recent in all cases,
so rather than guess we raise a ``ValueError`` in that case.
:raises ValueError:
on more than one Resent header block
on no sender kwarg or From header in message
on no recipients kwarg or To, Cc or Bcc header in message
:raises SMTPRecipientsRefused: delivery to all recipients failed
:raises SMTPResponseException: on invalid response
"""
if mail_options is None:
mail_options = []
else:
mail_options = list(mail_options)
if sender is None:
sender = extract_sender(message)
if sender is None:
raise ValueError("No From header provided in message")
if isinstance(recipients, str):
recipients = [recipients]
elif recipients is None:
recipients = extract_recipients(message)
if not recipients:
raise ValueError("No recipient headers provided in message")
# Make sure we've done an EHLO for extension checks
await self._ehlo_or_helo_if_needed()
try:
sender.encode("ascii")
"".join(recipients).encode("ascii")
except UnicodeEncodeError:
utf8_required = True
else:
utf8_required = False
if utf8_required:
if not self.supports_extension("smtputf8"):
raise SMTPNotSupported(
"An address containing non-ASCII characters was provided, but "
"SMTPUTF8 is not supported by this server"
)
elif "smtputf8" not in [option.lower() for option in mail_options]:
mail_options.append("SMTPUTF8")
if self.supports_extension("8BITMIME"):
if "body=8bitmime" not in [option.lower() for option in mail_options]:
mail_options.append("BODY=8BITMIME")
cte_type = "8bit"
else:
cte_type = "7bit"
flat_message = flatten_message(message, utf8=utf8_required, cte_type=cte_type)
return await self.sendmail(
sender,
recipients,
flat_message,
mail_options=mail_options,
rcpt_options=rcpt_options,
timeout=timeout,
)
[docs] def sendmail_sync(self, *args, **kwargs) -> Tuple[Dict[str, SMTPResponse], str]:
"""
Synchronous version of :meth:`.sendmail`. This method starts
the event loop to connect, send the message, and disconnect.
"""
async def sendmail_coroutine():
async with self:
result = await self.sendmail(*args, **kwargs)
return result
return async_to_sync(sendmail_coroutine(), loop=self.loop)
[docs] def send_message_sync(self, *args, **kwargs) -> Tuple[Dict[str, SMTPResponse], str]:
"""
Synchronous version of :meth:`.send_message`. This method
starts the event loop to connect, send the message, and disconnect.
"""
async def send_message_coroutine():
async with self:
result = await self.send_message(*args, **kwargs)
return result
return async_to_sync(send_message_coroutine(), loop=self.loop)