| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- import bisect
- import difflib
- import sys
- import warnings
- import rope.base.oi.doa
- import rope.base.oi.objectinfo
- import rope.base.oi.soa
- from rope.base import ast, exceptions, taskhandle, utils, stdmods
- from rope.base.exceptions import ModuleNotFoundError
- from rope.base.pyobjectsdef import PyModule, PyPackage, PyClass
- import rope.base.resources
- import rope.base.resourceobserver
- from rope.base import builtins
- class PyCore(object):
- def __init__(self, project):
- self.project = project
- self._init_resource_observer()
- self.cache_observers = []
- self.module_cache = _ModuleCache(self)
- self.extension_cache = _ExtensionCache(self)
- self.object_info = rope.base.oi.objectinfo.ObjectInfoManager(project)
- self._init_python_files()
- self._init_automatic_soa()
- self._init_source_folders()
- def _init_python_files(self):
- self.python_matcher = None
- patterns = self.project.prefs.get('python_files', None)
- if patterns is not None:
- self.python_matcher = rope.base.resources._ResourceMatcher()
- self.python_matcher.set_patterns(patterns)
- def _init_resource_observer(self):
- callback = self._invalidate_resource_cache
- observer = rope.base.resourceobserver.ResourceObserver(
- changed=callback, moved=callback, removed=callback)
- self.observer = rope.base.resourceobserver.FilteredResourceObserver(observer)
- self.project.add_observer(self.observer)
- def _init_source_folders(self):
- self._custom_source_folders = []
- for path in self.project.prefs.get('source_folders', []):
- folder = self.project.get_resource(path)
- self._custom_source_folders.append(folder)
- def _init_automatic_soa(self):
- if not self.automatic_soa:
- return
- callback = self._file_changed_for_soa
- observer = rope.base.resourceobserver.ResourceObserver(
- changed=callback, moved=callback, removed=callback)
- self.project.add_observer(observer)
- @property
- def automatic_soa(self):
- auto_soa = self.project.prefs.get('automatic_soi', None)
- return self.project.prefs.get('automatic_soa', auto_soa)
- def _file_changed_for_soa(self, resource, new_resource=None):
- old_contents = self.project.history.\
- contents_before_current_change(resource)
- if old_contents is not None:
- perform_soa_on_changed_scopes(self.project, resource, old_contents)
- def is_python_file(self, resource):
- if resource.is_folder():
- return False
- if self.python_matcher is None:
- return resource.name.endswith('.py')
- return self.python_matcher.does_match(resource)
- def get_module(self, name, folder=None):
- """Returns a `PyObject` if the module was found."""
- # check if this is a builtin module
- pymod = self._builtin_module(name)
- if pymod is not None:
- return pymod
- module = self.find_module(name, folder)
- if module is None:
- raise ModuleNotFoundError('Module %s not found' % name)
- return self.resource_to_pyobject(module)
- def _builtin_submodules(self, modname):
- result = {}
- for extension in self.extension_modules:
- if extension.startswith(modname + '.'):
- name = extension[len(modname) + 1:]
- if '.' not in name:
- result[name] = self._builtin_module(extension)
- return result
- def _builtin_module(self, name):
- return self.extension_cache.get_pymodule(name)
- def get_relative_module(self, name, folder, level):
- module = self.find_relative_module(name, folder, level)
- if module is None:
- raise ModuleNotFoundError('Module %s not found' % name)
- return self.resource_to_pyobject(module)
- def get_string_module(self, code, resource=None, force_errors=False):
- """Returns a `PyObject` object for the given code
- If `force_errors` is `True`, `exceptions.ModuleSyntaxError` is
- raised if module has syntax errors. This overrides
- ``ignore_syntax_errors`` project config.
- """
- return PyModule(self, code, resource, force_errors=force_errors)
- def get_string_scope(self, code, resource=None):
- """Returns a `Scope` object for the given code"""
- return self.get_string_module(code, resource).get_scope()
- def _invalidate_resource_cache(self, resource, new_resource=None):
- for observer in self.cache_observers:
- observer(resource)
- def _find_module_in_folder(self, folder, modname):
- module = folder
- packages = modname.split('.')
- for pkg in packages[:-1]:
- if module.is_folder() and module.has_child(pkg):
- module = module.get_child(pkg)
- else:
- return None
- if module.is_folder():
- if module.has_child(packages[-1]) and \
- module.get_child(packages[-1]).is_folder():
- return module.get_child(packages[-1])
- elif module.has_child(packages[-1] + '.py') and \
- not module.get_child(packages[-1] + '.py').is_folder():
- return module.get_child(packages[-1] + '.py')
- def get_python_path_folders(self):
- import rope.base.project
- result = []
- for src in self.project.prefs.get('python_path', []) + sys.path:
- try:
- src_folder = rope.base.project.get_no_project().get_resource(src)
- result.append(src_folder)
- except rope.base.exceptions.ResourceNotFoundError:
- pass
- return result
- def find_module(self, modname, folder=None):
- """Returns a resource corresponding to the given module
- returns None if it can not be found
- """
- return self._find_module(modname, folder)
- def find_relative_module(self, modname, folder, level):
- for i in range(level - 1):
- folder = folder.parent
- if modname == '':
- return folder
- else:
- return self._find_module_in_folder(folder, modname)
- def _find_module(self, modname, folder=None):
- """Return `modname` module resource"""
- for src in self.get_source_folders():
- module = self._find_module_in_folder(src, modname)
- if module is not None:
- return module
- for src in self.get_python_path_folders():
- module = self._find_module_in_folder(src, modname)
- if module is not None:
- return module
- if folder is not None:
- module = self._find_module_in_folder(folder, modname)
- if module is not None:
- return module
- return None
- # INFO: It was decided not to cache source folders, since:
- # - Does not take much time when the root folder contains
- # packages, that is most of the time
- # - We need a separate resource observer; `self.observer`
- # does not get notified about module and folder creations
- def get_source_folders(self):
- """Returns project source folders"""
- if self.project.root is None:
- return []
- result = list(self._custom_source_folders)
- result.extend(self._find_source_folders(self.project.root))
- return result
- def resource_to_pyobject(self, resource, force_errors=False):
- return self.module_cache.get_pymodule(resource, force_errors)
- def get_python_files(self):
- """Returns all python files available in the project"""
- return [resource for resource in self.project.get_files()
- if self.is_python_file(resource)]
- def _is_package(self, folder):
- if folder.has_child('__init__.py') and \
- not folder.get_child('__init__.py').is_folder():
- return True
- else:
- return False
- def _find_source_folders(self, folder):
- for resource in folder.get_folders():
- if self._is_package(resource):
- return [folder]
- result = []
- for resource in folder.get_files():
- if resource.name.endswith('.py'):
- result.append(folder)
- break
- for resource in folder.get_folders():
- result.extend(self._find_source_folders(resource))
- return result
- def run_module(self, resource, args=None, stdin=None, stdout=None):
- """Run `resource` module
- Returns a `rope.base.oi.doa.PythonFileRunner` object for
- controlling the process.
- """
- perform_doa = self.project.prefs.get('perform_doi', True)
- perform_doa = self.project.prefs.get('perform_doa', perform_doa)
- receiver = self.object_info.doa_data_received
- if not perform_doa:
- receiver = None
- runner = rope.base.oi.doa.PythonFileRunner(
- self, resource, args, stdin, stdout, receiver)
- runner.add_finishing_observer(self.module_cache.forget_all_data)
- runner.run()
- return runner
- def analyze_module(self, resource, should_analyze=lambda py: True,
- search_subscopes=lambda py: True, followed_calls=None):
- """Analyze `resource` module for static object inference
- This function forces rope to analyze this module to collect
- information about function calls. `should_analyze` is a
- function that is called with a `PyDefinedObject` argument. If
- it returns `True` the element is analyzed. If it is `None` or
- returns `False` the element is not analyzed.
- `search_subscopes` is like `should_analyze`; The difference is
- that if it returns `False` the sub-scopes are all ignored.
- That is it is assumed that `should_analyze` returns `False`
- for all of its subscopes.
- `followed_calls` override the value of ``soa_followed_calls``
- project config.
- """
- if followed_calls is None:
- followed_calls = self.project.prefs.get('soa_followed_calls', 0)
- pymodule = self.resource_to_pyobject(resource)
- self.module_cache.forget_all_data()
- rope.base.oi.soa.analyze_module(
- self, pymodule, should_analyze, search_subscopes, followed_calls)
- def get_classes(self, task_handle=taskhandle.NullTaskHandle()):
- warnings.warn('`PyCore.get_classes()` is deprecated',
- DeprecationWarning, stacklevel=2)
- return []
- def __str__(self):
- return str(self.module_cache) + str(self.object_info)
- def modname(self, resource):
- if resource.is_folder():
- module_name = resource.name
- source_folder = resource.parent
- elif resource.name == '__init__.py':
- module_name = resource.parent.name
- source_folder = resource.parent.parent
- else:
- module_name = resource.name[:-3]
- source_folder = resource.parent
- while source_folder != source_folder.parent and \
- source_folder.has_child('__init__.py'):
- module_name = source_folder.name + '.' + module_name
- source_folder = source_folder.parent
- return module_name
- @property
- @utils.cacheit
- def extension_modules(self):
- result = set(self.project.prefs.get('extension_modules', []))
- if self.project.prefs.get('import_dynload_stdmods', False):
- result.update(stdmods.dynload_modules())
- return result
- class _ModuleCache(object):
- def __init__(self, pycore):
- self.pycore = pycore
- self.module_map = {}
- self.pycore.cache_observers.append(self._invalidate_resource)
- self.observer = self.pycore.observer
- def _invalidate_resource(self, resource):
- if resource in self.module_map:
- self.forget_all_data()
- self.observer.remove_resource(resource)
- del self.module_map[resource]
- def get_pymodule(self, resource, force_errors=False):
- if resource in self.module_map:
- return self.module_map[resource]
- if resource.is_folder():
- result = PyPackage(self.pycore, resource,
- force_errors=force_errors)
- else:
- result = PyModule(self.pycore, resource=resource,
- force_errors=force_errors)
- if result.has_errors:
- return result
- self.module_map[resource] = result
- self.observer.add_resource(resource)
- return result
- def forget_all_data(self):
- for pymodule in self.module_map.values():
- pymodule._forget_concluded_data()
- def __str__(self):
- return 'PyCore caches %d PyModules\n' % len(self.module_map)
- class _ExtensionCache(object):
- def __init__(self, pycore):
- self.pycore = pycore
- self.extensions = {}
- def get_pymodule(self, name):
- if name == '__builtin__':
- return builtins.builtins
- allowed = self.pycore.extension_modules
- if name not in self.extensions and name in allowed:
- self.extensions[name] = builtins.BuiltinModule(name, self.pycore)
- return self.extensions.get(name)
- def perform_soa_on_changed_scopes(project, resource, old_contents):
- pycore = project.pycore
- if resource.exists() and pycore.is_python_file(resource):
- try:
- new_contents = resource.read()
- # detecting changes in new_contents relative to old_contents
- detector = _TextChangeDetector(new_contents, old_contents)
- def search_subscopes(pydefined):
- scope = pydefined.get_scope()
- return detector.is_changed(scope.get_start(), scope.get_end())
- def should_analyze(pydefined):
- scope = pydefined.get_scope()
- start = scope.get_start()
- end = scope.get_end()
- return detector.consume_changes(start, end)
- pycore.analyze_module(resource, should_analyze, search_subscopes)
- except exceptions.ModuleSyntaxError:
- pass
- class _TextChangeDetector(object):
- def __init__(self, old, new):
- self.old = old
- self.new = new
- self._set_diffs()
- def _set_diffs(self):
- differ = difflib.Differ()
- self.lines = []
- lineno = 0
- for line in differ.compare(self.old.splitlines(True),
- self.new.splitlines(True)):
- if line.startswith(' '):
- lineno += 1
- elif line.startswith('-'):
- lineno += 1
- self.lines.append(lineno)
- def is_changed(self, start, end):
- """Tell whether any of start till end lines have changed
- The end points are inclusive and indices start from 1.
- """
- left, right = self._get_changed(start, end)
- if left < right:
- return True
- return False
- def consume_changes(self, start, end):
- """Clear the changed status of lines from start till end"""
- left, right = self._get_changed(start, end)
- if left < right:
- del self.lines[left:right]
- return left < right
- def _get_changed(self, start, end):
- left = bisect.bisect_left(self.lines, start)
- right = bisect.bisect_right(self.lines, end)
- return left, right
|