首页 > 技术文章 > python 邮件发送与接收封装

yuhaipeng 2020-04-28 11:34 原文

目录结构

  EmailSendOrRecv

      recv

                              __init__.py

             emailRecv.py

 

      send

         __init__.py

        emailSend.py

      base.py

      imap_utf7.py

  emailRecv.py

      

      # -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: emailRecv.py
# time: 2020/3/28 14:23
""" Write an introduction to the module here """
         import logging

   

         from ..base import EmailBase
from ..imap_utf7 import decode as mail_decode, encode as mail_encode
from imbox import Imbox
         class EmailRecv(EmailBase):
"""
messages 方法
messages.sent_from 发件人
messages.sent_to 收件人
messages.subject 邮件主题
messages.date 发送日期
messages.body['plain'] 文本格式的正文
messages.body['html'] HTML格式的正文
messages.attachments 附件
messages.parsed_date datetime 类型
"""

def __init__(self, *args, **kwargs):
self._server = None
super(EmailRecv, self).__init__(*args, **kwargs)

def _login(self, *args, **kwargs):
if not isinstance(self._server, Imbox):

self._server = Imbox(self._host, username=self._user,
password=self._pwd, port=self._port, *args, **kwargs)

return self._server

def logout(self, *args, **kwargs):
if isinstance(self._server, Imbox):
self._server.logout()
return

def folders(self):

temp_list = list()

folder_tuple = self._server.folders()
if folder_tuple[0] != 'OK':
return []
for folder in folder_tuple[1]:
readable_folder = mail_decode(folder)
temp_list.append(readable_folder.split()[-1].strip('"'))
return temp_list

def mark_seen(self, uuid):
"""
标记本邮件已读
:param uuid: 邮箱唯一编号
:return:
"""
self._server.mark_seen(uuid)

def mark_flag(self, uuid):
"""
标记红旗邮件
:param uuid: 邮箱唯一编号
:return:
"""
self._server.mark_flag(uuid)

def delete(self, uuid):
"""
删除邮件
:param uuid: 邮箱唯一编号
:return:
"""
self._server.delete(uuid)

def read(self, **kwargs):
"""

param folder: 可传参数 (邮箱目录) 等 ...
                       INBOX: (收件箱)
草稿箱
已发送
已删除
垃圾邮件
病毒邮件
广告邮件
              .......

param unread: 未读邮件 bool
param unflagged: 不是红旗邮件 bool
param flagged: 红旗邮件 bool
param sent_from: 读取某个发件人的邮件 str
param sent_to: 读取某个收件人的邮件 str
param date__gt: 某天之后
param date__lt: 某天之前 datetime.date(2019, 10, 28)
param date__on:某天
param subject: 主提邮件
param lookup_error_ignore: 忽略LookupError错误
:param kwargs:
:return: iter obj example (email_id: str, msg:object )
"""
lookup_error_ignore = kwargs.pop('lookup_error_ignore', False)

self.__parser_folders(kwargs)
all_messages = self._server.messages(**kwargs)
if lookup_error_ignore:
return self.iter_all_messages(all_messages)
return all_messages

@staticmethod
def iter_all_messages(all_messages):
n = 0
length = len(all_messages._uid_list) # noqa
while n < length:
try:
uid = all_messages._uid_list[n] # noqa
msg = all_messages._fetch_email(uid) # noqa
n += 1
yield uid, msg
except LookupError as e:
logging.error('uid %s error %s', uid, e) # noqa
n += 1

@staticmethod
def __parser_folders(folder_other):
                folder = folder_other.pop('folder', None)
              if folder:
folder_other['folder'] = mail_encode(folder)

def copy(self, uid, destination_folder):
"""

:param uid: 邮箱唯一编号
:param destination_folder: 目标文件夹
:return:
"""

return self._server.copy(uid, destination_folder)

def move(self, uid, destination_folder):
"""
:param uid: 邮箱唯一编号
:param destination_folder: 目标文件夹
:return:
"""
self._server.move(uid, destination_folder)

 

     emailSend.py

         

          # -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: emailSend.py
# time: 2020/3/28 14:24
""" Write an introduction to the module here """
import yagmail
from ..base import EmailBase

 

          class EmailSend(EmailBase):
"""邮件发送 map.vip.126.com"""

def __init__(self, *args, **kwargs):
self._server = None
super(EmailSend, self).__init__(*args, **kwargs)

def _login(self, *args, **kwargs):
if self._server is None:

self._server = yagmail.SMTP(user=self._user,
password=self._pwd, host=self._host,
port=self._port,
*args, **kwargs)

def logout(self, *args, **kwargs):
if self._server is not None:
self._server.close()

def send_email(self, **kwargs):
"""
发送带有内嵌的邮件 yagmail.inline('demo.png')
param to 收件人列表 ['xxx@126.com', 'xxx@163.com']
param subject 邮件标题
param contents 邮件内容 ['xxx', 'xxx.html', '<a href="#">百度</a>']
param attachments 附件 example [xxx.jpg, C:\\xxx\\xxx\\xx.xml']
param cc 邮件抄送人 ['xxx@126.com', 'xxx@163.com']
param bcc 密秘邮件抄送人 ['xxx@126.com', 'xxx@163.com']
param preview_only 只是查看信息不发送
param headers dict {"Form": "nick name"}
param newline_to_break bool if newline_to_break: content_string = content_string.replace("\n", "<br>")
:param kwargs:
:return:
"""
return self._server.send(**kwargs)

 

      base.py

 

        # -*- coding: utf-8 -*-
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: base.py
# time: 2020/3/28 14:07
""" Write an introduction to the module here """

from abc import abstractmethod, ABCMeta


class EmailBase(metaclass=ABCMeta):
"""邮箱基本类"""

def __init__(self, user, pwd, host, port=None, *args, **kwargs):
self._user = user
self._pwd = pwd
self._host = host
self._port = port
self._login(*args, **kwargs)

@abstractmethod
def _login(self, *args, **kwargs):
pass

@abstractmethod
def logout(self, *args, **kwargs):
pass

def __enter__(self):
return self

def __exit__(self, types, value, traceback):
self.logout()

 

imap_utf7.py

 

# -*- coding: utf-8 -*-
#
# This file contains two main methods used to encode and decode UTF-7
# string, described in the RFC 3501. There are some variations specific
# to IMAP4rev1, so the built-in Python UTF-7 codec can't be used instead.
#
# The main difference is the shift character (used to switch from ASCII to
# base64 encoding context), which is & in this modified UTF-7 convention,
# since + is considered as mainly used in mailbox names.
# Other variations and examples can be found in the RFC 3501, section 5.1.3.

""" Write an introduction to the module here """
from __future__ import unicode_literals

import binascii
from six import binary_type, text_type, byte2int, iterbytes, unichr


def encode(s):
"""Encode a folder name using IMAP modified UTF-7 encoding.

Input is unicode; output is bytes (Python 3) or str (Python 2). If
non-unicode input is provided, the input is returned unchanged.
"""
if not isinstance(s, text_type):
return s

res = bytearray()

b64_buffer = []

def consume_b64_buffer(buf):
"""
Consume the buffer by encoding it into a modified base 64 representation
and surround it with shift characters & and -
"""
if buf:
res.extend(b'&' + base64_utf7_encode(buf) + b'-')
del buf[:]

for c in s:
# printable ascii case should not be modified
o = ord(c)
if 0x20 <= o <= 0x7e:
consume_b64_buffer(b64_buffer)
# Special case: & is used as shift character so we need to escape it in ASCII
if o == 0x26: # & = 0x26
res.extend(b'&-')
else:
res.append(o)

# Bufferize characters that will be encoded in base64 and append them later
# in the result, when iterating over ASCII character or the end of string
else:
b64_buffer.append(c)

# Consume the remaining buffer if the string finish with non-ASCII characters
consume_b64_buffer(b64_buffer)

return bytes(res)


AMPERSAND_ORD = byte2int(b'&')
DASH_ORD = byte2int(b'-')


def decode(s):
"""Decode a folder name from IMAP modified UTF-7 encoding to unicode.

Input is bytes (Python 3) or str (Python 2); output is always
unicode. If non-bytes/str input is provided, the input is returned
unchanged.
"""
if not isinstance(s, binary_type):
return s

res = []
# Store base64 substring that will be decoded once stepping on end shift character
b64_buffer = bytearray()
for c in iterbytes(s):
# Shift character without anything in buffer -> starts storing base64 substring
if c == AMPERSAND_ORD and not b64_buffer:
b64_buffer.append(c)
# End shift char. -> append the decoded buffer to the result and reset it
elif c == DASH_ORD and b64_buffer:
# Special case &-, representing "&" escaped
if len(b64_buffer) == 1:
res.append('&')
else:
res.append(base64_utf7_decode(b64_buffer[1:]))
b64_buffer = bytearray()
# Still buffering between the shift character and the shift back to ASCII
elif b64_buffer:
b64_buffer.append(c)
# No buffer initialized yet, should be an ASCII printable char
else:
res.append(unichr(c))

# Decode the remaining buffer if any
if b64_buffer:
res.append(base64_utf7_decode(b64_buffer[1:]))

return ''.join(res)


def base64_utf7_encode(buffer):
"""

:param buffer:
:return:
"""
s = ''.join(buffer).encode('utf-16be')
return binascii.b2a_base64(s).rstrip(b'\n=').replace(b'/', b',')


def base64_utf7_decode(s):
"""

:param s:
:return:
"""
s_utf7 = b'+' + s.replace(b',', b'/') + b'-'
return s_utf7.decode('utf-7')

 

推荐阅读