| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
- # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
- #
- # This file is part of logilab-common.
- #
- # logilab-common is free software: you can redistribute it and/or modify it under
- # the terms of the GNU Lesser General Public License as published by the Free
- # Software Foundation, either version 2.1 of the License, or (at your option) any
- # later version.
- #
- # logilab-common is distributed in the hope that it will be useful, but WITHOUT
- # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- # details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with logilab-common. If not, see <http://www.gnu.org/licenses/>.
- """shell/term utilities, useful to write some python scripts instead of shell
- scripts.
- """
- __docformat__ = "restructuredtext en"
- import os
- import glob
- import shutil
- import stat
- import sys
- import tempfile
- import time
- import fnmatch
- import errno
- import string
- import random
- from os.path import exists, isdir, islink, basename, join
- from logilab.common import STD_BLACKLIST, _handle_blacklist
- from logilab.common.compat import raw_input
- from logilab.common.compat import str_to_bytes
- try:
- from logilab.common.proc import ProcInfo, NoSuchProcess
- except ImportError:
- # windows platform
- class NoSuchProcess(Exception): pass
- def ProcInfo(pid):
- raise NoSuchProcess()
- class tempdir(object):
- def __enter__(self):
- self.path = tempfile.mkdtemp()
- return self.path
- def __exit__(self, exctype, value, traceback):
- # rmtree in all cases
- shutil.rmtree(self.path)
- return traceback is None
- class pushd(object):
- def __init__(self, directory):
- self.directory = directory
- def __enter__(self):
- self.cwd = os.getcwd()
- os.chdir(self.directory)
- return self.directory
- def __exit__(self, exctype, value, traceback):
- os.chdir(self.cwd)
- def chown(path, login=None, group=None):
- """Same as `os.chown` function but accepting user login or group name as
- argument. If login or group is omitted, it's left unchanged.
- Note: you must own the file to chown it (or be root). Otherwise OSError is raised.
- """
- if login is None:
- uid = -1
- else:
- try:
- uid = int(login)
- except ValueError:
- import pwd # Platforms: Unix
- uid = pwd.getpwnam(login).pw_uid
- if group is None:
- gid = -1
- else:
- try:
- gid = int(group)
- except ValueError:
- import grp
- gid = grp.getgrnam(group).gr_gid
- os.chown(path, uid, gid)
- def mv(source, destination, _action=shutil.move):
- """A shell-like mv, supporting wildcards.
- """
- sources = glob.glob(source)
- if len(sources) > 1:
- assert isdir(destination)
- for filename in sources:
- _action(filename, join(destination, basename(filename)))
- else:
- try:
- source = sources[0]
- except IndexError:
- raise OSError('No file matching %s' % source)
- if isdir(destination) and exists(destination):
- destination = join(destination, basename(source))
- try:
- _action(source, destination)
- except OSError, ex:
- raise OSError('Unable to move %r to %r (%s)' % (
- source, destination, ex))
- def rm(*files):
- """A shell-like rm, supporting wildcards.
- """
- for wfile in files:
- for filename in glob.glob(wfile):
- if islink(filename):
- os.remove(filename)
- elif isdir(filename):
- shutil.rmtree(filename)
- else:
- os.remove(filename)
- def cp(source, destination):
- """A shell-like cp, supporting wildcards.
- """
- mv(source, destination, _action=shutil.copy)
- def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
- """Recursively find files ending with the given extensions from the directory.
- :type directory: str
- :param directory:
- directory where the search should start
- :type exts: basestring or list or tuple
- :param exts:
- extensions or lists or extensions to search
- :type exclude: boolean
- :param exts:
- if this argument is True, returning files NOT ending with the given
- extensions
- :type blacklist: list or tuple
- :param blacklist:
- optional list of files or directory to ignore, default to the value of
- `logilab.common.STD_BLACKLIST`
- :rtype: list
- :return:
- the list of all matching files
- """
- if isinstance(exts, basestring):
- exts = (exts,)
- if exclude:
- def match(filename, exts):
- for ext in exts:
- if filename.endswith(ext):
- return False
- return True
- else:
- def match(filename, exts):
- for ext in exts:
- if filename.endswith(ext):
- return True
- return False
- files = []
- for dirpath, dirnames, filenames in os.walk(directory):
- _handle_blacklist(blacklist, dirnames, filenames)
- # don't append files if the directory is blacklisted
- dirname = basename(dirpath)
- if dirname in blacklist:
- continue
- files.extend([join(dirpath, f) for f in filenames if match(f, exts)])
- return files
- def globfind(directory, pattern, blacklist=STD_BLACKLIST):
- """Recursively finds files matching glob `pattern` under `directory`.
- This is an alternative to `logilab.common.shellutils.find`.
- :type directory: str
- :param directory:
- directory where the search should start
- :type pattern: basestring
- :param pattern:
- the glob pattern (e.g *.py, foo*.py, etc.)
- :type blacklist: list or tuple
- :param blacklist:
- optional list of files or directory to ignore, default to the value of
- `logilab.common.STD_BLACKLIST`
- :rtype: iterator
- :return:
- iterator over the list of all matching files
- """
- for curdir, dirnames, filenames in os.walk(directory):
- _handle_blacklist(blacklist, dirnames, filenames)
- for fname in fnmatch.filter(filenames, pattern):
- yield join(curdir, fname)
- def unzip(archive, destdir):
- import zipfile
- if not exists(destdir):
- os.mkdir(destdir)
- zfobj = zipfile.ZipFile(archive)
- for name in zfobj.namelist():
- if name.endswith('/'):
- os.mkdir(join(destdir, name))
- else:
- outfile = open(join(destdir, name), 'wb')
- outfile.write(zfobj.read(name))
- outfile.close()
- class Execute:
- """This is a deadlock safe version of popen2 (no stdin), that returns
- an object with errorlevel, out and err.
- """
- def __init__(self, command):
- outfile = tempfile.mktemp()
- errfile = tempfile.mktemp()
- self.status = os.system("( %s ) >%s 2>%s" %
- (command, outfile, errfile)) >> 8
- self.out = open(outfile, "r").read()
- self.err = open(errfile, "r").read()
- os.remove(outfile)
- os.remove(errfile)
- def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
- """Acquire a lock represented by a file on the file system
- If the process written in lock file doesn't exist anymore, we remove the
- lock file immediately
- If age of the lock_file is greater than max_delay, then we raise a UserWarning
- """
- count = abs(max_try)
- while count:
- try:
- fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT)
- os.write(fd, str_to_bytes(str(os.getpid())) )
- os.close(fd)
- return True
- except OSError, e:
- if e.errno == errno.EEXIST:
- try:
- fd = open(lock_file, "r")
- pid = int(fd.readline())
- pi = ProcInfo(pid)
- age = (time.time() - os.stat(lock_file)[stat.ST_MTIME])
- if age / max_delay > 1 :
- raise UserWarning("Command '%s' (pid %s) has locked the "
- "file '%s' for %s minutes"
- % (pi.name(), pid, lock_file, age/60))
- except UserWarning:
- raise
- except NoSuchProcess:
- os.remove(lock_file)
- except Exception:
- # The try block is not essential. can be skipped.
- # Note: ProcInfo object is only available for linux
- # process information are not accessible...
- # or lock_file is no more present...
- pass
- else:
- raise
- count -= 1
- time.sleep(delay)
- else:
- raise Exception('Unable to acquire %s' % lock_file)
- def release_lock(lock_file):
- """Release a lock represented by a file on the file system."""
- os.remove(lock_file)
- class ProgressBar(object):
- """A simple text progression bar."""
- def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
- if title:
- self._fstr = '\r%s [%%-%ss]' % (title, int(size))
- else:
- self._fstr = '\r[%%-%ss]' % int(size)
- self._stream = stream
- self._total = nbops
- self._size = size
- self._current = 0
- self._progress = 0
- self._current_text = None
- self._last_text_write_size = 0
- def _get_text(self):
- return self._current_text
- def _set_text(self, text=None):
- if text != self._current_text:
- self._current_text = text
- self.refresh()
- def _del_text(self):
- self.text = None
- text = property(_get_text, _set_text, _del_text)
- def update(self, offset=1, exact=False):
- """Move FORWARD to new cursor position (cursor will never go backward).
- :offset: fraction of ``size``
- :exact:
- - False: offset relative to current cursor position if True
- - True: offset as an asbsolute position
- """
- if exact:
- self._current = offset
- else:
- self._current += offset
- progress = int((float(self._current)/float(self._total))*self._size)
- if progress > self._progress:
- self._progress = progress
- self.refresh()
- def refresh(self):
- """Refresh the progression bar display."""
- self._stream.write(self._fstr % ('.' * min(self._progress, self._size)) )
- if self._last_text_write_size or self._current_text:
- template = ' %%-%is' % (self._last_text_write_size)
- text = self._current_text
- if text is None:
- text = ''
- self._stream.write(template % text)
- self._last_text_write_size = len(text.rstrip())
- self._stream.flush()
- def finish(self):
- self._stream.write('\n')
- self._stream.flush()
- class DummyProgressBar(object):
- __slot__ = ('text',)
- def refresh(self):
- pass
- def update(self):
- pass
- def finish(self):
- pass
- _MARKER = object()
- class progress(object):
- def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKER, enabled=True):
- self.nbops = nbops
- self.size = size
- self.stream = stream
- self.title = title
- self.enabled = enabled
- def __enter__(self):
- if self.enabled:
- kwargs = {}
- for attr in ('nbops', 'size', 'stream', 'title'):
- value = getattr(self, attr)
- if value is not _MARKER:
- kwargs[attr] = value
- self.pb = ProgressBar(**kwargs)
- else:
- self.pb = DummyProgressBar()
- return self.pb
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.pb.finish()
- class RawInput(object):
- def __init__(self, input=None, printer=None):
- self._input = input or raw_input
- self._print = printer
- def ask(self, question, options, default):
- assert default in options
- choices = []
- for option in options:
- if option == default:
- label = option[0].upper()
- else:
- label = option[0].lower()
- if len(option) > 1:
- label += '(%s)' % option[1:].lower()
- choices.append((option, label))
- prompt = "%s [%s]: " % (question,
- '/'.join([opt[1] for opt in choices]))
- tries = 3
- while tries > 0:
- answer = self._input(prompt).strip().lower()
- if not answer:
- return default
- possible = [option for option, label in choices
- if option.lower().startswith(answer)]
- if len(possible) == 1:
- return possible[0]
- elif len(possible) == 0:
- msg = '%s is not an option.' % answer
- else:
- msg = ('%s is an ambiguous answer, do you mean %s ?' % (
- answer, ' or '.join(possible)))
- if self._print:
- self._print(msg)
- else:
- print msg
- tries -= 1
- raise Exception('unable to get a sensible answer')
- def confirm(self, question, default_is_yes=True):
- default = default_is_yes and 'y' or 'n'
- answer = self.ask(question, ('y', 'n'), default)
- return answer == 'y'
- ASK = RawInput()
- def getlogin():
- """avoid using os.getlogin() because of strange tty / stdin problems
- (man 3 getlogin)
- Another solution would be to use $LOGNAME, $USER or $USERNAME
- """
- if sys.platform != 'win32':
- import pwd # Platforms: Unix
- return pwd.getpwuid(os.getuid())[0]
- else:
- return os.environ['USERNAME']
- def generate_password(length=8, vocab=string.ascii_letters + string.digits):
- """dumb password generation function"""
- pwd = ''
- for i in xrange(length):
- pwd += random.choice(vocab)
- return pwd
|