| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
- # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
- #
- # This file is part of logilab-common.
- #
- # logilab-common is free software: you can redistribute it and/or modify it under
- # the terms of the GNU Lesser General Public License as published by the Free
- # Software Foundation, either version 2.1 of the License, or (at your option) any
- # later version.
- #
- # logilab-common is distributed in the hope that it will be useful, but WITHOUT
- # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- # details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with logilab-common. If not, see <http://www.gnu.org/licenses/>.
- """Unicode email support (extends email from stdlib).
- """
- __docformat__ = "restructuredtext en"
- import email
- from encodings import search_function
- import sys
- if sys.version_info >= (2, 5):
- from email.utils import parseaddr, parsedate
- from email.header import decode_header
- else:
- from email.Utils import parseaddr, parsedate
- from email.Header import decode_header
- from datetime import datetime
- try:
- from mx.DateTime import DateTime
- except ImportError:
- DateTime = datetime
- import logilab.common as lgc
- def decode_QP(string):
- parts = []
- for decoded, charset in decode_header(string):
- if not charset :
- charset = 'iso-8859-15'
- parts.append(unicode(decoded, charset, 'replace'))
- return u' '.join(parts)
- def message_from_file(fd):
- try:
- return UMessage(email.message_from_file(fd))
- except email.Errors.MessageParseError:
- return ''
- def message_from_string(string):
- try:
- return UMessage(email.message_from_string(string))
- except email.Errors.MessageParseError:
- return ''
- class UMessage:
- """Encapsulates an email.Message instance and returns only unicode objects.
- """
- def __init__(self, message):
- self.message = message
- # email.Message interface #################################################
- def get(self, header, default=None):
- value = self.message.get(header, default)
- if value:
- return decode_QP(value)
- return value
- def get_all(self, header, default=()):
- return [decode_QP(val) for val in self.message.get_all(header, default)
- if val is not None]
- def get_payload(self, index=None, decode=False):
- message = self.message
- if index is None:
- payload = message.get_payload(index, decode)
- if isinstance(payload, list):
- return [UMessage(msg) for msg in payload]
- if message.get_content_maintype() != 'text':
- return payload
- charset = message.get_content_charset() or 'iso-8859-1'
- if search_function(charset) is None:
- charset = 'iso-8859-1'
- return unicode(payload or '', charset, "replace")
- else:
- payload = UMessage(message.get_payload(index, decode))
- return payload
- def is_multipart(self):
- return self.message.is_multipart()
- def get_boundary(self):
- return self.message.get_boundary()
- def walk(self):
- for part in self.message.walk():
- yield UMessage(part)
- def get_content_maintype(self):
- return unicode(self.message.get_content_maintype())
- def get_content_type(self):
- return unicode(self.message.get_content_type())
- def get_filename(self, failobj=None):
- value = self.message.get_filename(failobj)
- if value is failobj:
- return value
- try:
- return unicode(value)
- except UnicodeDecodeError:
- return u'error decoding filename'
- # other convenience methods ###############################################
- def headers(self):
- """return an unicode string containing all the message's headers"""
- values = []
- for header in self.message.keys():
- values.append(u'%s: %s' % (header, self.get(header)))
- return '\n'.join(values)
- def multi_addrs(self, header):
- """return a list of 2-uple (name, address) for the given address (which
- is expected to be an header containing address such as from, to, cc...)
- """
- persons = []
- for person in self.get_all(header, ()):
- name, mail = parseaddr(person)
- persons.append((name, mail))
- return persons
- def date(self, alternative_source=False, return_str=False):
- """return a datetime object for the email's date or None if no date is
- set or if it can't be parsed
- """
- value = self.get('date')
- if value is None and alternative_source:
- unix_from = self.message.get_unixfrom()
- if unix_from is not None:
- try:
- value = unix_from.split(" ", 2)[2]
- except IndexError:
- pass
- if value is not None:
- datetuple = parsedate(value)
- if datetuple:
- if lgc.USE_MX_DATETIME:
- return DateTime(*datetuple[:6])
- return datetime(*datetuple[:6])
- elif not return_str:
- return None
- return value
|