1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """shell/term utilities, useful to write some python scripts instead of shell
19 scripts.
20 """
21 __docformat__ = "restructuredtext en"
22
23 import os
24 import glob
25 import shutil
26 import stat
27 import sys
28 import tempfile
29 import time
30 import fnmatch
31 import errno
32 from os.path import exists, isdir, islink, basename, join, walk
33
34 from logilab.common import STD_BLACKLIST
35 try:
36 from logilab.common.proc import ProcInfo, NoSuchProcess
37 except ImportError:
40
43
44
45 -def chown(path, login=None, group=None):
46 """Same as `os.chown` function but accepting user login or group name as
47 argument. If login or group is omitted, it's left unchanged.
48
49 Note: you must own the file to chown it (or be root). Otherwise OSError is raised.
50 """
51 if login is None:
52 uid = -1
53 else:
54 try:
55 uid = int(login)
56 except ValueError:
57 import pwd
58 uid = pwd.getpwnam(login).pw_uid
59 if group is None:
60 gid = -1
61 else:
62 try:
63 gid = int(group)
64 except ValueError:
65 import grp
66 gid = grp.getgrnam(group).gr_gid
67 os.chown(path, uid, gid)
68
69 -def mv(source, destination, _action=shutil.move):
70 """A shell-like mv, supporting wildcards.
71 """
72 sources = glob.glob(source)
73 if len(sources) > 1:
74 assert isdir(destination)
75 for filename in sources:
76 _action(filename, join(destination, basename(filename)))
77 else:
78 try:
79 source = sources[0]
80 except IndexError:
81 raise OSError('No file matching %s' % source)
82 if isdir(destination) and exists(destination):
83 destination = join(destination, basename(source))
84 try:
85 _action(source, destination)
86 except OSError, ex:
87 raise OSError('Unable to move %r to %r (%s)' % (
88 source, destination, ex))
89
91 """A shell-like rm, supporting wildcards.
92 """
93 for wfile in files:
94 for filename in glob.glob(wfile):
95 if islink(filename):
96 os.remove(filename)
97 elif isdir(filename):
98 shutil.rmtree(filename)
99 else:
100 os.remove(filename)
101
102 -def cp(source, destination):
103 """A shell-like cp, supporting wildcards.
104 """
105 mv(source, destination, _action=shutil.copy)
106
108 """Recursively find files ending with the given extensions from the directory.
109
110 :type directory: str
111 :param directory:
112 directory where the search should start
113
114 :type exts: basestring or list or tuple
115 :param exts:
116 extensions or lists or extensions to search
117
118 :type exclude: boolean
119 :param exts:
120 if this argument is True, returning files NOT ending with the given
121 extensions
122
123 :type blacklist: list or tuple
124 :param blacklist:
125 optional list of files or directory to ignore, default to the value of
126 `logilab.common.STD_BLACKLIST`
127
128 :rtype: list
129 :return:
130 the list of all matching files
131 """
132 if isinstance(exts, basestring):
133 exts = (exts,)
134 if exclude:
135 def match(filename, exts):
136 for ext in exts:
137 if filename.endswith(ext):
138 return False
139 return True
140 else:
141 def match(filename, exts):
142 for ext in exts:
143 if filename.endswith(ext):
144 return True
145 return False
146 def func(files, directory, fnames):
147 """walk handler"""
148
149 for norecurs in blacklist:
150 try:
151 fnames.remove(norecurs)
152 except ValueError:
153 continue
154 for filename in fnames:
155 src = join(directory, filename)
156 if isdir(src):
157 continue
158 if match(filename, exts):
159 files.append(src)
160 files = []
161 walk(directory, func, files)
162 return files
163
166 """Recursively finds files matching glob `pattern` under `directory`.
167
168 This is an alternative to `logilab.common.shellutils.find`.
169
170 :type directory: str
171 :param directory:
172 directory where the search should start
173
174 :type pattern: basestring
175 :param pattern:
176 the glob pattern (e.g *.py, foo*.py, etc.)
177
178 :type blacklist: list or tuple
179 :param blacklist:
180 optional list of files or directory to ignore, default to the value of
181 `logilab.common.STD_BLACKLIST`
182
183 :rtype: iterator
184 :return:
185 iterator over the list of all matching files
186 """
187 for curdir, dirnames, filenames in os.walk(directory):
188 for fname in fnmatch.filter(filenames, pattern):
189 yield join(curdir, fname)
190 for skipped in blacklist:
191 if skipped in dirnames:
192 dirnames.remove(skipped)
193
194 -def unzip(archive, destdir):
195 import zipfile
196 if not exists(destdir):
197 os.mkdir(destdir)
198 zfobj = zipfile.ZipFile(archive)
199 for name in zfobj.namelist():
200 if name.endswith('/'):
201 os.mkdir(join(destdir, name))
202 else:
203 outfile = open(join(destdir, name), 'wb')
204 outfile.write(zfobj.read(name))
205 outfile.close()
206
208 """This is a deadlock safe version of popen2 (no stdin), that returns
209 an object with errorlevel, out and err.
210 """
211
213 outfile = tempfile.mktemp()
214 errfile = tempfile.mktemp()
215 self.status = os.system("( %s ) >%s 2>%s" %
216 (command, outfile, errfile)) >> 8
217 self.out = open(outfile,"r").read()
218 self.err = open(errfile,"r").read()
219 os.remove(outfile)
220 os.remove(errfile)
221
222 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
223 """Acquire a lock represented by a file on the file system
224
225 If the process written in lock file doesn't exist anymore, we remove the
226 lock file immediately
227 If age of the lock_file is greater than max_delay, then we raise a UserWarning
228 """
229 count = abs(max_try)
230 while count:
231 try:
232 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT)
233 os.write(fd, str(os.getpid()))
234 os.close(fd)
235 return True
236 except OSError, e:
237 if e.errno == errno.EEXIST:
238 try:
239 fd = open(lock_file, "r")
240 pid = int(fd.readline())
241 pi = ProcInfo(pid)
242 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME])
243 if age / max_delay > 1 :
244 raise UserWarning("Command '%s' (pid %s) has locked the "
245 "file '%s' for %s minutes"
246 % (pi.name(), pid, lock_file, age/60))
247 except UserWarning:
248 raise
249 except NoSuchProcess:
250 os.remove(lock_file)
251 except Exception:
252
253
254
255
256 pass
257 else:
258 raise
259 count -= 1
260 time.sleep(delay)
261 else:
262 raise Exception('Unable to acquire %s' % lock_file)
263
265 """Release a lock represented by a file on the file system."""
266 os.remove(lock_file)
267
270 """A simple text progression bar."""
271
272 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
273 if title:
274 self._fstr = '\r%s [%%-%ss]' % (title, int(size))
275 else:
276 self._fstr = '\r[%%-%ss]' % int(size)
277 self._stream = stream
278 self._total = nbops
279 self._size = size
280 self._current = 0
281 self._progress = 0
282 self._current_text = None
283 self._last_text_write_size = 0
284
285 - def _get_text(self):
286 return self._current_text
287
288 - def _set_text(self, text=None):
289 self._current_text = text
290 self.refresh()
291
292 - def _del_text(self):
294
295 text = property(_get_text, _set_text, _del_text)
296
298 """Update the progression bar."""
299 self._current += 1
300 progress = int((float(self._current)/float(self._total))*self._size)
301 if progress > self._progress:
302 self._progress = progress
303 self.refresh()
304
306 """Refresh the progression bar display."""
307 self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) )
308 if self._last_text_write_size or self._current_text:
309 template = ' %%-%is' % (self._last_text_write_size)
310 text = self._current_text
311 if text is None:
312 text = ''
313 self._stream.write(template % text)
314 self._last_text_write_size = len(text.rstrip())
315 self._stream.flush()
316
317 from logilab.common.deprecation import deprecated
318
319 @deprecated('confirm() is deprecated, use RawInput.confirm() instead')
320 -def confirm(question, default_is_yes=True):
321 """ask for confirmation and return true on positive answer"""
322 return RawInput().confirm(question, default_is_yes)
323
369
370 ASK = RawInput()
374 """avoid using os.getlogin() because of strange tty / stdin problems
375 (man 3 getlogin)
376 Another solution would be to use $LOGNAME, $USER or $USERNAME
377 """
378 if sys.platform != 'win32':
379 import pwd
380 return pwd.getpwuid(os.getuid())[0]
381 else:
382 return os.environ['USERNAME']
383