| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- import warnings
- from rope.base import exceptions, resourceobserver
- from rope.base.oi import objectdb, memorydb, transform
- class ObjectInfoManager(object):
- """Stores object information
- It uses an instance of `objectdb.ObjectDB` for storing
- information.
- """
- def __init__(self, project):
- self.project = project
- self.to_textual = transform.PyObjectToTextual(project)
- self.to_pyobject = transform.TextualToPyObject(project)
- self.doi_to_pyobject = transform.DOITextualToPyObject(project)
- self._init_objectdb()
- if project.prefs.get('validate_objectdb', False):
- self._init_validation()
- def _init_objectdb(self):
- dbtype = self.project.get_prefs().get('objectdb_type', None)
- persist = None
- if dbtype is not None:
- warnings.warn(
- '"objectdb_type" project config is deprecated;\n'
- 'Use "save_objectdb" instead in your project '
- 'config file.\n(".ropeproject/config.py" by default)\n',
- DeprecationWarning)
- if dbtype != 'memory' and self.project.ropefolder is not None:
- persist = True
- self.validation = TextualValidation(self.to_pyobject)
- db = memorydb.MemoryDB(self.project, persist=persist)
- self.objectdb = objectdb.ObjectDB(db, self.validation)
- def _init_validation(self):
- self.objectdb.validate_files()
- observer = resourceobserver.ResourceObserver(
- changed=self._resource_changed, moved=self._resource_moved,
- removed=self._resource_moved)
- files = []
- for path in self.objectdb.get_files():
- resource = self.to_pyobject.path_to_resource(path)
- if resource is not None and resource.project == self.project:
- files.append(resource)
- self.observer = resourceobserver.FilteredResourceObserver(observer,
- files)
- self.objectdb.add_file_list_observer(_FileListObserver(self))
- self.project.add_observer(self.observer)
- def _resource_changed(self, resource):
- try:
- self.objectdb.validate_file(
- self.to_textual.resource_to_path(resource))
- except exceptions.ModuleSyntaxError:
- pass
- def _resource_moved(self, resource, new_resource=None):
- self.observer.remove_resource(resource)
- if new_resource is not None:
- old = self.to_textual.resource_to_path(resource)
- new = self.to_textual.resource_to_path(new_resource)
- self.objectdb.file_moved(old, new)
- self.observer.add_resource(new_resource)
- def get_returned(self, pyobject, args):
- result = self.get_exact_returned(pyobject, args)
- if result is not None:
- return result
- path, key = self._get_scope(pyobject)
- if path is None:
- return None
- for call_info in self.objectdb.get_callinfos(path, key):
- returned = call_info.get_returned()
- if returned and returned[0] not in ('unknown', 'none'):
- result = returned
- break
- if result is None:
- result = returned
- if result is not None:
- return self.to_pyobject(result)
- def get_exact_returned(self, pyobject, args):
- path, key = self._get_scope(pyobject)
- if path is not None:
- returned = self.objectdb.get_returned(
- path, key, self._args_to_textual(pyobject, args))
- if returned is not None:
- return self.to_pyobject(returned)
- def _args_to_textual(self, pyfunction, args):
- parameters = list(pyfunction.get_param_names(special_args=False))
- arguments = args.get_arguments(parameters)[:len(parameters)]
- textual_args = tuple([self.to_textual(arg)
- for arg in arguments])
- return textual_args
- def get_parameter_objects(self, pyobject):
- path, key = self._get_scope(pyobject)
- if path is None:
- return None
- arg_count = len(pyobject.get_param_names(special_args=False))
- unknowns = arg_count
- parameters = [None] * arg_count
- for call_info in self.objectdb.get_callinfos(path, key):
- args = call_info.get_parameters()
- for index, arg in enumerate(args[:arg_count]):
- old = parameters[index]
- if self.validation.is_more_valid(arg, old):
- parameters[index] = arg
- if self.validation.is_value_valid(arg):
- unknowns -= 1
- if unknowns == 0:
- break
- if unknowns < arg_count:
- return [self.to_pyobject(parameter)
- for parameter in parameters]
- def get_passed_objects(self, pyfunction, parameter_index):
- path, key = self._get_scope(pyfunction)
- if path is None:
- return []
- result = []
- for call_info in self.objectdb.get_callinfos(path, key):
- args = call_info.get_parameters()
- if len(args) > parameter_index:
- parameter = self.to_pyobject(args[parameter_index])
- if parameter is not None:
- result.append(parameter)
- return result
- def doa_data_received(self, data):
- def doi_to_normal(textual):
- pyobject = self.doi_to_pyobject(textual)
- return self.to_textual(pyobject)
- function = doi_to_normal(data[0])
- args = tuple([doi_to_normal(textual) for textual in data[1]])
- returned = doi_to_normal(data[2])
- if function[0] == 'defined' and len(function) == 3:
- self._save_data(function, args, returned)
- def function_called(self, pyfunction, params, returned=None):
- function_text = self.to_textual(pyfunction)
- params_text = tuple([self.to_textual(param)
- for param in params])
- returned_text = ('unknown',)
- if returned is not None:
- returned_text = self.to_textual(returned)
- self._save_data(function_text, params_text, returned_text)
- def save_per_name(self, scope, name, data):
- path, key = self._get_scope(scope.pyobject)
- if path is not None:
- self.objectdb.add_pername(path, key, name, self.to_textual(data))
- def get_per_name(self, scope, name):
- path, key = self._get_scope(scope.pyobject)
- if path is not None:
- result = self.objectdb.get_pername(path, key, name)
- if result is not None:
- return self.to_pyobject(result)
- def _save_data(self, function, args, returned=('unknown',)):
- self.objectdb.add_callinfo(function[1], function[2], args, returned)
- def _get_scope(self, pyobject):
- resource = pyobject.get_module().get_resource()
- if resource is None:
- return None, None
- textual = self.to_textual(pyobject)
- if textual[0] == 'defined':
- path = textual[1]
- if len(textual) == 3:
- key = textual[2]
- else:
- key = ''
- return path, key
- return None, None
- def sync(self):
- self.objectdb.sync()
- def __str__(self):
- return str(self.objectdb)
- class TextualValidation(object):
- def __init__(self, to_pyobject):
- self.to_pyobject = to_pyobject
- def is_value_valid(self, value):
- # ???: Should none and unknown be considered valid?
- if value is None or value[0] in ('none', 'unknown'):
- return False
- return self.to_pyobject(value) is not None
- def is_more_valid(self, new, old):
- if old is None:
- return True
- return new[0] not in ('unknown', 'none')
- def is_file_valid(self, path):
- return self.to_pyobject.path_to_resource(path) is not None
- def is_scope_valid(self, path, key):
- if key == '':
- textual = ('defined', path)
- else:
- textual = ('defined', path, key)
- return self.to_pyobject(textual) is not None
- class _FileListObserver(object):
- def __init__(self, object_info):
- self.object_info = object_info
- self.observer = self.object_info.observer
- self.to_pyobject = self.object_info.to_pyobject
- def removed(self, path):
- resource = self.to_pyobject.path_to_resource(path)
- if resource is not None:
- self.observer.remove_resource(resource)
- def added(self, path):
- resource = self.to_pyobject.path_to_resource(path)
- if resource is not None:
- self.observer.add_resource(resource)
|