Package logilab :: Package common :: Module shellutils
[frames] | no frames]

Source Code for Module logilab.common.shellutils

  1  # copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """shell/term utilities, useful to write some python scripts instead of shell 
 19  scripts. 
 20  """ 
 21  __docformat__ = "restructuredtext en" 
 22   
 23  import os 
 24  import glob 
 25  import shutil 
 26  import stat 
 27  import sys 
 28  import tempfile 
 29  import time 
 30  import fnmatch 
 31  import errno 
 32  from os.path import exists, isdir, islink, basename, join, walk 
 33   
 34  from logilab.common import STD_BLACKLIST 
 35  try: 
 36      from logilab.common.proc import ProcInfo, NoSuchProcess 
 37  except ImportError: 
38 # windows platform 39 - class NoSuchProcess(Exception): pass
40
41 - def ProcInfo(pid):
42 raise NoSuchProcess()
43
44 45 -def chown(path, login=None, group=None):
46 """Same as `os.chown` function but accepting user login or group name as 47 argument. If login or group is omitted, it's left unchanged. 48 49 Note: you must own the file to chown it (or be root). Otherwise OSError is raised. 50 """ 51 if login is None: 52 uid = -1 53 else: 54 try: 55 uid = int(login) 56 except ValueError: 57 import pwd # Platforms: Unix 58 uid = pwd.getpwnam(login).pw_uid 59 if group is None: 60 gid = -1 61 else: 62 try: 63 gid = int(group) 64 except ValueError: 65 import grp 66 gid = grp.getgrnam(group).gr_gid 67 os.chown(path, uid, gid)
68
69 -def mv(source, destination, _action=shutil.move):
70 """A shell-like mv, supporting wildcards. 71 """ 72 sources = glob.glob(source) 73 if len(sources) > 1: 74 assert isdir(destination) 75 for filename in sources: 76 _action(filename, join(destination, basename(filename))) 77 else: 78 try: 79 source = sources[0] 80 except IndexError: 81 raise OSError('No file matching %s' % source) 82 if isdir(destination) and exists(destination): 83 destination = join(destination, basename(source)) 84 try: 85 _action(source, destination) 86 except OSError, ex: 87 raise OSError('Unable to move %r to %r (%s)' % ( 88 source, destination, ex))
89
90 -def rm(*files):
91 """A shell-like rm, supporting wildcards. 92 """ 93 for wfile in files: 94 for filename in glob.glob(wfile): 95 if islink(filename): 96 os.remove(filename) 97 elif isdir(filename): 98 shutil.rmtree(filename) 99 else: 100 os.remove(filename)
101
102 -def cp(source, destination):
103 """A shell-like cp, supporting wildcards. 104 """ 105 mv(source, destination, _action=shutil.copy)
106
107 -def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
108 """Recursively find files ending with the given extensions from the directory. 109 110 :type directory: str 111 :param directory: 112 directory where the search should start 113 114 :type exts: basestring or list or tuple 115 :param exts: 116 extensions or lists or extensions to search 117 118 :type exclude: boolean 119 :param exts: 120 if this argument is True, returning files NOT ending with the given 121 extensions 122 123 :type blacklist: list or tuple 124 :param blacklist: 125 optional list of files or directory to ignore, default to the value of 126 `logilab.common.STD_BLACKLIST` 127 128 :rtype: list 129 :return: 130 the list of all matching files 131 """ 132 if isinstance(exts, basestring): 133 exts = (exts,) 134 if exclude: 135 def match(filename, exts): 136 for ext in exts: 137 if filename.endswith(ext): 138 return False 139 return True
140 else: 141 def match(filename, exts): 142 for ext in exts: 143 if filename.endswith(ext): 144 return True 145 return False 146 def func(files, directory, fnames): 147 """walk handler""" 148 # remove files/directories in the black list 149 for norecurs in blacklist: 150 try: 151 fnames.remove(norecurs) 152 except ValueError: 153 continue 154 for filename in fnames: 155 src = join(directory, filename) 156 if isdir(src): 157 continue 158 if match(filename, exts): 159 files.append(src) 160 files = [] 161 walk(directory, func, files) 162 return files 163
164 165 -def globfind(directory, pattern, blacklist=STD_BLACKLIST):
166 """Recursively finds files matching glob `pattern` under `directory`. 167 168 This is an alternative to `logilab.common.shellutils.find`. 169 170 :type directory: str 171 :param directory: 172 directory where the search should start 173 174 :type pattern: basestring 175 :param pattern: 176 the glob pattern (e.g *.py, foo*.py, etc.) 177 178 :type blacklist: list or tuple 179 :param blacklist: 180 optional list of files or directory to ignore, default to the value of 181 `logilab.common.STD_BLACKLIST` 182 183 :rtype: iterator 184 :return: 185 iterator over the list of all matching files 186 """ 187 for curdir, dirnames, filenames in os.walk(directory): 188 for fname in fnmatch.filter(filenames, pattern): 189 yield join(curdir, fname) 190 for skipped in blacklist: 191 if skipped in dirnames: 192 dirnames.remove(skipped)
193
194 -def unzip(archive, destdir):
195 import zipfile 196 if not exists(destdir): 197 os.mkdir(destdir) 198 zfobj = zipfile.ZipFile(archive) 199 for name in zfobj.namelist(): 200 if name.endswith('/'): 201 os.mkdir(join(destdir, name)) 202 else: 203 outfile = open(join(destdir, name), 'wb') 204 outfile.write(zfobj.read(name)) 205 outfile.close()
206
207 -class Execute:
208 """This is a deadlock safe version of popen2 (no stdin), that returns 209 an object with errorlevel, out and err. 210 """ 211
212 - def __init__(self, command):
213 outfile = tempfile.mktemp() 214 errfile = tempfile.mktemp() 215 self.status = os.system("( %s ) >%s 2>%s" % 216 (command, outfile, errfile)) >> 8 217 self.out = open(outfile,"r").read() 218 self.err = open(errfile,"r").read() 219 os.remove(outfile) 220 os.remove(errfile)
221
222 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
223 """Acquire a lock represented by a file on the file system 224 225 If the process written in lock file doesn't exist anymore, we remove the 226 lock file immediately 227 If age of the lock_file is greater than max_delay, then we raise a UserWarning 228 """ 229 count = abs(max_try) 230 while count: 231 try: 232 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT) 233 os.write(fd, str(os.getpid())) 234 os.close(fd) 235 return True 236 except OSError, e: 237 if e.errno == errno.EEXIST: 238 try: 239 fd = open(lock_file, "r") 240 pid = int(fd.readline()) 241 pi = ProcInfo(pid) 242 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME]) 243 if age / max_delay > 1 : 244 raise UserWarning("Command '%s' (pid %s) has locked the " 245 "file '%s' for %s minutes" 246 % (pi.name(), pid, lock_file, age/60)) 247 except UserWarning: 248 raise 249 except NoSuchProcess: 250 os.remove(lock_file) 251 except Exception: 252 # The try block is not essential. can be skipped. 253 # Note: ProcInfo object is only available for linux 254 # process information are not accessible... 255 # or lock_file is no more present... 256 pass 257 else: 258 raise 259 count -= 1 260 time.sleep(delay) 261 else: 262 raise Exception('Unable to acquire %s' % lock_file)
263
264 -def release_lock(lock_file):
265 """Release a lock represented by a file on the file system.""" 266 os.remove(lock_file)
267
268 269 -class ProgressBar(object):
270 """A simple text progression bar.""" 271
272 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
273 if title: 274 self._fstr = '\r%s [%%-%ss]' % (title, int(size)) 275 else: 276 self._fstr = '\r[%%-%ss]' % int(size) 277 self._stream = stream 278 self._total = nbops 279 self._size = size 280 self._current = 0 281 self._progress = 0 282 self._current_text = None 283 self._last_text_write_size = 0
284
285 - def _get_text(self):
286 return self._current_text
287
288 - def _set_text(self, text=None):
289 self._current_text = text 290 self.refresh()
291
292 - def _del_text(self):
293 self.text = None
294 295 text = property(_get_text, _set_text, _del_text) 296
297 - def update(self):
298 """Update the progression bar.""" 299 self._current += 1 300 progress = int((float(self._current)/float(self._total))*self._size) 301 if progress > self._progress: 302 self._progress = progress 303 self.refresh()
304
305 - def refresh(self):
306 """Refresh the progression bar display.""" 307 self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) ) 308 if self._last_text_write_size or self._current_text: 309 template = ' %%-%is' % (self._last_text_write_size) 310 text = self._current_text 311 if text is None: 312 text = '' 313 self._stream.write(template % text) 314 self._last_text_write_size = len(text.rstrip()) 315 self._stream.flush()
316 317 from logilab.common.deprecation import deprecated
318 319 @deprecated('confirm() is deprecated, use RawInput.confirm() instead') 320 -def confirm(question, default_is_yes=True):
321 """ask for confirmation and return true on positive answer""" 322 return RawInput().confirm(question, default_is_yes)
323
324 325 -class RawInput(object):
326
327 - def __init__(self, input=None, printer=None):
328 self._input = input or raw_input 329 self._print = printer
330
331 - def ask(self, question, options, default):
332 assert default in options 333 choices = [] 334 for option in options: 335 if option == default: 336 label = option[0].upper() 337 else: 338 label = option[0].lower() 339 if len(option) > 1: 340 label += '(%s)' % option[1:].lower() 341 choices.append((option, label)) 342 prompt = "%s [%s]: " % (question, 343 '/'.join(opt[1] for opt in choices)) 344 tries = 3 345 while tries > 0: 346 answer = self._input(prompt).strip().lower() 347 if not answer: 348 return default 349 possible = [option for option, label in choices 350 if option.lower().startswith(answer)] 351 if len(possible) == 1: 352 return possible[0] 353 elif len(possible) == 0: 354 msg = '%s is not an option.' % answer 355 else: 356 msg = ('%s is an ambiguous answer, do you mean %s ?' % ( 357 answer, ' or '.join(possible))) 358 if self._print: 359 self._print(msg) 360 else: 361 print msg 362 tries -= 1 363 raise Exception('unable to get a sensible answer')
364
365 - def confirm(self, question, default_is_yes=True):
366 default = default_is_yes and 'y' or 'n' 367 answer = self.ask(question, ('y','n'), default) 368 return answer == 'y'
369 370 ASK = RawInput()
371 372 373 -def getlogin():
374 """avoid using os.getlogin() because of strange tty / stdin problems 375 (man 3 getlogin) 376 Another solution would be to use $LOGNAME, $USER or $USERNAME 377 """ 378 if sys.platform != 'win32': 379 import pwd # Platforms: Unix 380 return pwd.getpwuid(os.getuid())[0] 381 else: 382 return os.environ['USERNAME']
383