Package logilab :: Package common :: Module umessage
[frames] | no frames]

Source Code for Module logilab.common.umessage

  1  # copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """Unicode email support (extends email from stdlib). 
 19   
 20   
 21   
 22   
 23  """ 
 24  __docformat__ = "restructuredtext en" 
 25   
 26  import email 
 27  from encodings import search_function 
 28  from email.Utils import parseaddr, parsedate 
 29  from email.Header import decode_header 
 30  from datetime import datetime 
 31   
 32  try: 
 33      from mx.DateTime import DateTime 
 34  except ImportError: 
 35      DateTime = datetime 
 36   
 37  import logilab.common as lgc 
 38   
 39   
40 -def decode_QP(string):
41 parts = [] 42 for decoded, charset in decode_header(string): 43 if not charset : 44 charset = 'iso-8859-15' 45 parts.append(unicode(decoded, charset, 'replace')) 46 47 return u' '.join(parts)
48
49 -def message_from_file(fd):
50 try: 51 return UMessage(email.message_from_file(fd)) 52 except email.Errors.MessageParseError: 53 return ''
54
55 -def message_from_string(string):
56 try: 57 return UMessage(email.message_from_string(string)) 58 except email.Errors.MessageParseError: 59 return ''
60
61 -class UMessage:
62 """Encapsulates an email.Message instance and returns only unicode objects. 63 """ 64
65 - def __init__(self, message):
66 self.message = message
67 68 # email.Message interface ################################################# 69
70 - def get(self, header, default=None):
71 value = self.message.get(header, default) 72 if value: 73 return decode_QP(value) 74 return value
75
76 - def get_all(self, header, default=()):
77 return [decode_QP(val) for val in self.message.get_all(header, default) 78 if val is not None]
79
80 - def get_payload(self, index=None, decode=False):
81 message = self.message 82 if index is None: 83 payload = message.get_payload(index, decode) 84 if isinstance(payload, list): 85 return [UMessage(msg) for msg in payload] 86 if message.get_content_maintype() != 'text': 87 return payload 88 89 charset = message.get_content_charset() or 'iso-8859-1' 90 if search_function(charset) is None: 91 charset = 'iso-8859-1' 92 return unicode(payload or '', charset, "replace") 93 else: 94 payload = UMessage(message.get_payload(index, decode)) 95 return payload
96
97 - def is_multipart(self):
98 return self.message.is_multipart()
99
100 - def get_boundary(self):
101 return self.message.get_boundary()
102
103 - def walk(self):
104 for part in self.message.walk(): 105 yield UMessage(part)
106
107 - def get_content_maintype(self):
108 return unicode(self.message.get_content_maintype())
109
110 - def get_content_type(self):
111 return unicode(self.message.get_content_type())
112
113 - def get_filename(self, failobj=None):
114 value = self.message.get_filename(failobj) 115 if value is failobj: 116 return value 117 try: 118 return unicode(value) 119 except UnicodeDecodeError: 120 return u'error decoding filename'
121 122 # other convenience methods ############################################### 123
124 - def headers(self):
125 """return an unicode string containing all the message's headers""" 126 values = [] 127 for header in self.message.keys(): 128 values.append(u'%s: %s' % (header, self.get(header))) 129 return '\n'.join(values)
130
131 - def multi_addrs(self, header):
132 """return a list of 2-uple (name, address) for the given address (which 133 is expected to be an header containing address such as from, to, cc...) 134 """ 135 persons = [] 136 for person in self.get_all(header, ()): 137 name, mail = parseaddr(person) 138 persons.append((name, mail)) 139 return persons
140
141 - def date(self, alternative_source=False, return_str=False):
142 """return a datetime object for the email's date or None if no date is 143 set or if it can't be parsed 144 """ 145 value = self.get('date') 146 if value is None and alternative_source: 147 unix_from = self.message.get_unixfrom() 148 if unix_from is not None: 149 try: 150 value = unix_from.split(" ", 2)[2] 151 except IndexError: 152 pass 153 if value is not None: 154 datetuple = parsedate(value) 155 if datetuple: 156 if lgc.USE_MX_DATETIME: 157 return DateTime(*datetuple[:6]) 158 return datetime(*datetuple[:6]) 159 elif not return_str: 160 return None 161 return value
162