| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- """Project file system commands.
- This modules implements file system operations used by rope. Different
- version control systems can be supported by implementing the interface
- provided by `FileSystemCommands` class. See `SubversionCommands` and
- `MercurialCommands` for example.
- """
- import os
- import shutil
- import subprocess
- def create_fscommands(root):
- dirlist = os.listdir(root)
- commands = {'.hg': MercurialCommands,
- '.svn': SubversionCommands,
- '.git': GITCommands,
- '_svn': SubversionCommands,
- '_darcs': DarcsCommands}
- for key in commands:
- if key in dirlist:
- try:
- return commands[key](root)
- except (ImportError, OSError):
- pass
- return FileSystemCommands()
- class FileSystemCommands(object):
- def create_file(self, path):
- open(path, 'w').close()
- def create_folder(self, path):
- os.mkdir(path)
- def move(self, path, new_location):
- shutil.move(path, new_location)
- def remove(self, path):
- if os.path.isfile(path):
- os.remove(path)
- else:
- shutil.rmtree(path)
- def write(self, path, data):
- file_ = open(path, 'wb')
- try:
- file_.write(data)
- finally:
- file_.close()
- class SubversionCommands(object):
- def __init__(self, *args):
- self.normal_actions = FileSystemCommands()
- import pysvn
- self.client = pysvn.Client()
- def create_file(self, path):
- self.normal_actions.create_file(path)
- self.client.add(path, force=True)
- def create_folder(self, path):
- self.normal_actions.create_folder(path)
- self.client.add(path, force=True)
- def move(self, path, new_location):
- self.client.move(path, new_location, force=True)
- def remove(self, path):
- self.client.remove(path, force=True)
- def write(self, path, data):
- self.normal_actions.write(path, data)
- class MercurialCommands(object):
- def __init__(self, root):
- self.hg = self._import_mercurial()
- self.normal_actions = FileSystemCommands()
- try:
- self.ui = self.hg.ui.ui(
- verbose=False, debug=False, quiet=True,
- interactive=False, traceback=False, report_untrusted=False)
- except:
- self.ui = self.hg.ui.ui()
- self.ui.setconfig('ui', 'interactive', 'no')
- self.ui.setconfig('ui', 'debug', 'no')
- self.ui.setconfig('ui', 'traceback', 'no')
- self.ui.setconfig('ui', 'verbose', 'no')
- self.ui.setconfig('ui', 'report_untrusted', 'no')
- self.ui.setconfig('ui', 'quiet', 'yes')
- self.repo = self.hg.hg.repository(self.ui, root)
- def _import_mercurial(self):
- import mercurial.commands
- import mercurial.hg
- import mercurial.ui
- return mercurial
- def create_file(self, path):
- self.normal_actions.create_file(path)
- self.hg.commands.add(self.ui, self.repo, path)
- def create_folder(self, path):
- self.normal_actions.create_folder(path)
- def move(self, path, new_location):
- self.hg.commands.rename(self.ui, self.repo, path,
- new_location, after=False)
- def remove(self, path):
- self.hg.commands.remove(self.ui, self.repo, path)
- def write(self, path, data):
- self.normal_actions.write(path, data)
- class GITCommands(object):
- def __init__(self, root):
- self.root = root
- self._do(['version'])
- self.normal_actions = FileSystemCommands()
- def create_file(self, path):
- self.normal_actions.create_file(path)
- self._do(['add', self._in_dir(path)])
- def create_folder(self, path):
- self.normal_actions.create_folder(path)
- def move(self, path, new_location):
- self._do(['mv', self._in_dir(path), self._in_dir(new_location)])
- def remove(self, path):
- self._do(['rm', self._in_dir(path)])
- def write(self, path, data):
- # XXX: should we use ``git add``?
- self.normal_actions.write(path, data)
- def _do(self, args):
- _execute(['git'] + args, cwd=self.root)
- def _in_dir(self, path):
- if path.startswith(self.root):
- return path[len(self.root) + 1:]
- return self.root
- class DarcsCommands(object):
- def __init__(self, root):
- self.root = root
- self.normal_actions = FileSystemCommands()
- def create_file(self, path):
- self.normal_actions.create_file(path)
- self._do(['add', path])
- def create_folder(self, path):
- self.normal_actions.create_folder(path)
- self._do(['add', path])
- def move(self, path, new_location):
- self._do(['mv', path, new_location])
- def remove(self, path):
- self.normal_actions.remove(path)
- def write(self, path, data):
- self.normal_actions.write(path, data)
- def _do(self, args):
- _execute(['darcs'] + args, cwd=self.root)
- def _execute(args, cwd=None):
- process = subprocess.Popen(args, cwd=cwd, stdout=subprocess.PIPE)
- process.wait()
- return process.returncode
- def unicode_to_file_data(contents, encoding=None):
- if not isinstance(contents, unicode):
- return contents
- if encoding is None:
- encoding = read_str_coding(contents)
- if encoding is not None:
- return contents.encode(encoding)
- try:
- return contents.encode()
- except UnicodeEncodeError:
- return contents.encode('utf-8')
- def file_data_to_unicode(data, encoding=None):
- result = _decode_data(data, encoding)
- if '\r' in result:
- result = result.replace('\r\n', '\n').replace('\r', '\n')
- return result
- def _decode_data(data, encoding):
- if isinstance(data, unicode):
- return data
- if encoding is None:
- encoding = read_str_coding(data)
- if encoding is None:
- # there is no encoding tip, we need to guess.
- # PEP263 says that "encoding not explicitly defined" means it is ascii,
- # but we will use utf8 instead since utf8 fully covers ascii and btw is
- # the only non-latin sane encoding.
- encoding = 'utf-8'
- try:
- return data.decode(encoding)
- except (UnicodeError, LookupError):
- # fallback to latin1: it should never fail
- return data.decode('latin1')
- def read_file_coding(path):
- file = open(path, 'b')
- count = 0
- result = []
- buffsize = 10
- while True:
- current = file.read(10)
- if not current:
- break
- count += current.count('\n')
- result.append(current)
- file.close()
- return _find_coding(''.join(result))
- def read_str_coding(source):
- try:
- first = source.index('\n') + 1
- second = source.index('\n', first) + 1
- except ValueError:
- second = len(source)
- return _find_coding(source[:second])
- def _find_coding(text):
- coding = 'coding'
- try:
- start = text.index(coding) + len(coding)
- if text[start] not in '=:':
- return
- start += 1
- while start < len(text) and text[start].isspace():
- start += 1
- end = start
- while end < len(text):
- c = text[end]
- if not c.isalnum() and c not in '-_':
- break
- end += 1
- return text[start:end]
- except ValueError:
- pass
|