1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """Unicode email support (extends email from stdlib).
19
20
21
22
23 """
24 __docformat__ = "restructuredtext en"
25
26 import email
27 from encodings import search_function
28 from email.Utils import parseaddr, parsedate
29 from email.Header import decode_header
30 from datetime import datetime
31
32 try:
33 from mx.DateTime import DateTime
34 except ImportError:
35 DateTime = datetime
36
37 import logilab.common as lgc
38
39
41 parts = []
42 for decoded, charset in decode_header(string):
43 if not charset :
44 charset = 'iso-8859-15'
45 parts.append(unicode(decoded, charset, 'replace'))
46
47 return u' '.join(parts)
48
54
60
62 """Encapsulates an email.Message instance and returns only unicode objects.
63 """
64
66 self.message = message
67
68
69
70 - def get(self, header, default=None):
71 value = self.message.get(header, default)
72 if value:
73 return decode_QP(value)
74 return value
75
76 - def get_all(self, header, default=()):
77 return [decode_QP(val) for val in self.message.get_all(header, default)
78 if val is not None]
79
81 message = self.message
82 if index is None:
83 payload = message.get_payload(index, decode)
84 if isinstance(payload, list):
85 return [UMessage(msg) for msg in payload]
86 if message.get_content_maintype() != 'text':
87 return payload
88
89 charset = message.get_content_charset() or 'iso-8859-1'
90 if search_function(charset) is None:
91 charset = 'iso-8859-1'
92 return unicode(payload or '', charset, "replace")
93 else:
94 payload = UMessage(message.get_payload(index, decode))
95 return payload
96
99
102
104 for part in self.message.walk():
105 yield UMessage(part)
106
108 return unicode(self.message.get_content_maintype())
109
111 return unicode(self.message.get_content_type())
112
114 value = self.message.get_filename(failobj)
115 if value is failobj:
116 return value
117 try:
118 return unicode(value)
119 except UnicodeDecodeError:
120 return u'error decoding filename'
121
122
123
125 """return an unicode string containing all the message's headers"""
126 values = []
127 for header in self.message.keys():
128 values.append(u'%s: %s' % (header, self.get(header)))
129 return '\n'.join(values)
130
132 """return a list of 2-uple (name, address) for the given address (which
133 is expected to be an header containing address such as from, to, cc...)
134 """
135 persons = []
136 for person in self.get_all(header, ()):
137 name, mail = parseaddr(person)
138 persons.append((name, mail))
139 return persons
140
141 - def date(self, alternative_source=False, return_str=False):
142 """return a datetime object for the email's date or None if no date is
143 set or if it can't be parsed
144 """
145 value = self.get('date')
146 if value is None and alternative_source:
147 unix_from = self.message.get_unixfrom()
148 if unix_from is not None:
149 try:
150 value = unix_from.split(" ", 2)[2]
151 except IndexError:
152 pass
153 if value is not None:
154 datetuple = parsedate(value)
155 if datetuple:
156 if lgc.USE_MX_DATETIME:
157 return DateTime(*datetuple[:6])
158 return datetime(*datetuple[:6])
159 elif not return_str:
160 return None
161 return value
162