| """ |
| Find modules used by a script, using bytecode analysis. |
| |
| Based on the stdlib modulefinder by Thomas Heller and Just van Rossum, |
| but uses a graph data structure and 2.3 features |
| |
| XXX: Verify all calls to import_hook (and variants) to ensure that |
| imports are done in the right way. |
| """ |
| from __future__ import absolute_import, print_function |
| |
| import pkg_resources |
| |
| import dis |
| import imp |
| import marshal |
| import os |
| import sys |
| import struct |
| import zipimport |
| import re |
| from collections import deque, namedtuple |
| import ast |
| |
| from altgraph.ObjectGraph import ObjectGraph |
| from altgraph import GraphError |
| |
| from itertools import count |
| |
| from modulegraph import util |
| from modulegraph import zipio |
| |
| if sys.version_info[0] == 2: |
| from StringIO import StringIO as BytesIO |
| from StringIO import StringIO |
| from urllib import pathname2url |
| def _Bchr(value): |
| return chr(value) |
| |
| else: |
| from urllib.request import pathname2url |
| from io import BytesIO, StringIO |
| |
| def _Bchr(value): |
| return value |
| |
| |
| # File open mode for reading (univeral newlines) |
| if sys.version_info[0] == 2: |
| _READ_MODE = "rU" |
| else: |
| _READ_MODE = "r" |
| |
| |
| |
| |
| # Modulegraph does a good job at simulating Python's, but it can not |
| # handle packagepath modifications packages make at runtime. Therefore there |
| # is a mechanism whereby you can register extra paths in this map for a |
| # package, and it will be honored. |
| # |
| # Note this is a mapping is lists of paths. |
| _packagePathMap = {} |
| |
| # Prefix used in magic .pth files used by setuptools to create namespace |
| # packages without an __init__.py file. |
| # |
| # The value is a list of such prefixes as the prefix varies with versions of |
| # setuptools. |
| _SETUPTOOLS_NAMESPACEPKG_PTHs=( |
| "import sys,types,os; p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('", |
| "import sys,new,os; p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('", |
| "import sys, types, os;p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('", |
| ) |
| |
| |
| def _namespace_package_path(fqname, pathnames, path=None): |
| """ |
| Return the __path__ for the python package in *fqname*. |
| |
| This function uses setuptools metadata to extract information |
| about namespace packages from installed eggs. |
| """ |
| working_set = pkg_resources.WorkingSet(path) |
| |
| path = list(pathnames) |
| |
| for dist in working_set: |
| if dist.has_metadata('namespace_packages.txt'): |
| namespaces = dist.get_metadata( |
| 'namespace_packages.txt').splitlines() |
| if fqname in namespaces: |
| nspath = os.path.join(dist.location, *fqname.split('.')) |
| if nspath not in path: |
| path.append(nspath) |
| |
| return path |
| |
| _strs = re.compile(r'''^\s*["']([A-Za-z0-9_]+)["'],?\s*''') # "<- emacs happy |
| |
| def _eval_str_tuple(value): |
| """ |
| Input is the repr of a tuple of strings, output |
| is that tuple. |
| |
| This only works with a tuple where the members are |
| python identifiers. |
| """ |
| if not (value.startswith('(') and value.endswith(')')): |
| raise ValueError(value) |
| |
| orig_value = value |
| value = value[1:-1] |
| |
| result = [] |
| while value: |
| m = _strs.match(value) |
| if m is None: |
| raise ValueError(orig_value) |
| |
| result.append(m.group(1)) |
| value = value[len(m.group(0)):] |
| |
| return tuple(result) |
| |
| def _path_from_importerror(exc, default): |
| # This is a hack, but sadly enough the necessary information |
| # isn't available otherwise. |
| m = re.match('^No module named (\S+)$', str(exc)) |
| if m is not None: |
| return m.group(1) |
| |
| return default |
| |
| def os_listdir(path): |
| """ |
| Deprecated name |
| """ |
| warnings.warn("Use zipio.listdir instead of os_listdir", |
| DeprecationWarning) |
| return zipio.listdir(path) |
| |
| |
| def _code_to_file(co): |
| """ Convert code object to a .pyc pseudo-file """ |
| return BytesIO( |
| imp.get_magic() + b'\0\0\0\0' + marshal.dumps(co)) |
| |
| |
| def find_module(name, path=None): |
| """ |
| A version of imp.find_module that works with zipped packages. |
| """ |
| if path is None: |
| path = sys.path |
| |
| # Support for the PEP302 importer for normal imports: |
| # - Python 2.5 has pkgutil.ImpImporter |
| # - In setuptools 0.7 and later there's _pkgutil.ImpImporter |
| # - In earlier setuptools versions you pkg_resources.ImpWrapper |
| # |
| # XXX: This is a bit of a hack, should check if we can just rely on |
| # PEP302's get_code() method with all recent versions of pkgutil and/or |
| # setuptools (setuptools 0.6.latest, setuptools trunk and python2.[45]) |
| # |
| # For python 3.3 this code should be replaced by code using importlib, |
| # for python 3.2 and 2.7 this should be cleaned up a lot. |
| try: |
| from pkgutil import ImpImporter |
| except ImportError: |
| try: |
| from _pkgutil import ImpImporter |
| except ImportError: |
| ImpImporter = pkg_resources.ImpWrapper |
| |
| namespace_path =[] |
| fp = None |
| for entry in path: |
| importer = pkg_resources.get_importer(entry) |
| if importer is None: |
| continue |
| |
| if sys.version_info[:2] >= (3,3) and hasattr(importer, 'find_loader'): |
| loader, portions = importer.find_loader(name) |
| |
| else: |
| loader = importer.find_module(name) |
| portions = [] |
| |
| namespace_path.extend(portions) |
| |
| if loader is None: continue |
| |
| if isinstance(importer, ImpImporter): |
| filename = loader.filename |
| if filename.endswith('.pyc') or filename.endswith('.pyo'): |
| fp = open(filename, 'rb') |
| description = ('.pyc', 'rb', imp.PY_COMPILED) |
| return (fp, filename, description) |
| |
| elif filename.endswith('.py'): |
| if sys.version_info[0] == 2: |
| fp = open(filename, _READ_MODE) |
| else: |
| with open(filename, 'rb') as fp: |
| encoding = util.guess_encoding(fp) |
| |
| fp = open(filename, _READ_MODE, encoding=encoding) |
| description = ('.py', _READ_MODE, imp.PY_SOURCE) |
| return (fp, filename, description) |
| |
| else: |
| for _sfx, _mode, _type in imp.get_suffixes(): |
| if _type == imp.C_EXTENSION and filename.endswith(_sfx): |
| description = (_sfx, 'rb', imp.C_EXTENSION) |
| break |
| else: |
| description = ('', '', imp.PKG_DIRECTORY) |
| |
| return (None, filename, description) |
| |
| if hasattr(loader, 'path'): |
| if loader.path.endswith('.pyc') or loader.path.endswith('.pyo'): |
| fp = open(loader.path, 'rb') |
| description = ('.pyc', 'rb', imp.PY_COMPILED) |
| return (fp, loader.path, description) |
| |
| |
| if hasattr(loader, 'get_source'): |
| source = loader.get_source(name) |
| fp = StringIO(source) |
| co = None |
| |
| else: |
| source = None |
| |
| if source is None: |
| if hasattr(loader, 'get_code'): |
| co = loader.get_code(name) |
| fp = _code_to_file(co) |
| |
| else: |
| fp = None |
| co = None |
| |
| pathname = os.path.join(entry, *name.split('.')) |
| |
| if isinstance(loader, zipimport.zipimporter): |
| # Check if this happens to be a wrapper module introduced by |
| # setuptools, if it is we return the actual extension. |
| zn = '/'.join(name.split('.')) |
| for _sfx, _mode, _type in imp.get_suffixes(): |
| if _type == imp.C_EXTENSION: |
| p = loader.prefix + zn + _sfx |
| if loader._files is None: |
| loader_files = zipimport._zip_directory_cache[loader.archive] |
| else: |
| loader_files = loader._files |
| |
| if p in loader_files: |
| description = (_sfx, 'rb', imp.C_EXTENSION) |
| return (None, pathname + _sfx, description) |
| |
| if hasattr(loader, 'is_package') and loader.is_package(name): |
| return (None, pathname, ('', '', imp.PKG_DIRECTORY)) |
| |
| if co is None: |
| if hasattr(loader, 'path'): |
| filename = loader.path |
| elif hasattr(loader, 'get_filename'): |
| filename = loader.get_filename(name) |
| if source is not None: |
| if filename.endswith(".pyc") or filename.endswith(".pyo"): |
| filename = filename[:-1] |
| else: |
| filename = None |
| |
| if filename is not None and (filename.endswith('.py') or filename.endswith('.pyw')): |
| return (fp, filename, ('.py', 'rU', imp.PY_SOURCE)) |
| else: |
| if fp is not None: |
| fp.close() |
| return (None, filename, (os.path.splitext(filename)[-1], 'rb', imp.C_EXTENSION)) |
| |
| else: |
| if hasattr(loader, 'path'): |
| return (fp, loader.path, ('.pyc', 'rb', imp.PY_COMPILED)) |
| else: |
| return (fp, pathname + '.pyc', ('.pyc', 'rb', imp.PY_COMPILED)) |
| |
| if namespace_path: |
| if fp is not None: |
| fp.close() |
| return (None, namespace_path[0], ('', namespace_path, imp.PKG_DIRECTORY)) |
| |
| raise ImportError(name) |
| |
| def moduleInfoForPath(path): |
| for (ext, readmode, typ) in imp.get_suffixes(): |
| if path.endswith(ext): |
| return os.path.basename(path)[:-len(ext)], readmode, typ |
| return None |
| |
| # A Public interface |
| import warnings |
| def AddPackagePath(packagename, path): |
| warnings.warn("Use addPackagePath instead of AddPackagePath", |
| DeprecationWarning) |
| |
| addPackagePath(packagename, path) |
| |
| def addPackagePath(packagename, path): |
| paths = _packagePathMap.get(packagename, []) |
| paths.append(path) |
| _packagePathMap[packagename] = paths |
| |
| _replacePackageMap = {} |
| |
| # This ReplacePackage mechanism allows modulefinder to work around the |
| # way the _xmlplus package injects itself under the name "xml" into |
| # sys.modules at runtime by calling ReplacePackage("_xmlplus", "xml") |
| # before running ModuleGraph. |
| def ReplacePackage(oldname, newname): |
| warnings.warn("use replacePackage instead of ReplacePackage", |
| DeprecationWarning) |
| replacePackage(oldname, newname) |
| |
| def replacePackage(oldname, newname): |
| _replacePackageMap[oldname] = newname |
| |
| |
| class DependencyInfo (namedtuple("DependencyInfo", ["conditional", "function", "tryexcept", "fromlist"])): |
| __slots__ = () |
| |
| def _merged(self, other): |
| if (not self.conditional and not self.function and not self.tryexcept) \ |
| or (not other.conditional and not other.function and not other.tryexcept): |
| return DependencyInfo(conditional=False, function=False, tryexcept=False, fromlist=self.fromlist and other.fromlist) |
| |
| else: |
| return DependencyInfo( |
| conditional=self.conditional or other.conditional, |
| function=self.function or other.function, |
| tryexcept=self.tryexcept or other.tryexcept, |
| fromlist=self.fromlist and other.fromlist) |
| |
| |
| class Node(object): |
| def __init__(self, identifier): |
| self.debug = 0 |
| self.graphident = identifier |
| self.identifier = identifier |
| self._namespace = {} |
| self.filename = None |
| self.packagepath = None |
| self.code = None |
| # The set of global names that are assigned to in the module. |
| # This includes those names imported through starimports of |
| # Python modules. |
| self.globalnames = set() |
| # The set of starimports this module did that could not be |
| # resolved, ie. a starimport from a non-Python module. |
| self.starimports = set() |
| |
| def __contains__(self, name): |
| return name in self._namespace |
| |
| def __getitem__(self, name): |
| return self._namespace[name] |
| |
| def __setitem__(self, name, value): |
| self._namespace[name] = value |
| |
| def get(self, *args): |
| return self._namespace.get(*args) |
| |
| def __cmp__(self, other): |
| try: |
| otherIdent = getattr(other, 'graphident') |
| except AttributeError: |
| return NotImplemented |
| |
| return cmp(self.graphident, otherIdent) |
| |
| def __eq__(self, other): |
| try: |
| otherIdent = getattr(other, 'graphident') |
| except AttributeError: |
| return False |
| |
| return self.graphident == otherIdent |
| |
| def __ne__(self, other): |
| try: |
| otherIdent = getattr(other, 'graphident') |
| except AttributeError: |
| return True |
| |
| return self.graphident != otherIdent |
| |
| def __lt__(self, other): |
| try: |
| otherIdent = getattr(other, 'graphident') |
| except AttributeError: |
| return NotImplemented |
| |
| return self.graphident < otherIdent |
| |
| def __le__(self, other): |
| try: |
| otherIdent = getattr(other, 'graphident') |
| except AttributeError: |
| return NotImplemented |
| |
| return self.graphident <= otherIdent |
| |
| def __gt__(self, other): |
| try: |
| otherIdent = getattr(other, 'graphident') |
| except AttributeError: |
| return NotImplemented |
| |
| return self.graphident > otherIdent |
| |
| def __ge__(self, other): |
| try: |
| otherIdent = getattr(other, 'graphident') |
| except AttributeError: |
| return NotImplemented |
| |
| return self.graphident >= otherIdent |
| |
| |
| def __hash__(self): |
| return hash(self.graphident) |
| |
| def infoTuple(self): |
| return (self.identifier,) |
| |
| def __repr__(self): |
| return '%s%r' % (type(self).__name__, self.infoTuple()) |
| |
| class Alias(str): |
| pass |
| |
| class AliasNode(Node): |
| def __init__(self, name, node): |
| super(AliasNode, self).__init__(name) |
| for k in 'identifier', 'packagepath', '_namespace', 'globalnames', 'starimports': |
| setattr(self, k, getattr(node, k, None)) |
| |
| def infoTuple(self): |
| return (self.graphident, self.identifier) |
| |
| class BadModule(Node): |
| pass |
| |
| class ExcludedModule(BadModule): |
| pass |
| |
| class MissingModule(BadModule): |
| pass |
| |
| class Script(Node): |
| def __init__(self, filename): |
| super(Script, self).__init__(filename) |
| self.filename = filename |
| |
| def infoTuple(self): |
| return (self.filename,) |
| |
| class BaseModule(Node): |
| def __init__(self, name, filename=None, path=None): |
| super(BaseModule, self).__init__(name) |
| self.filename = filename |
| self.packagepath = path |
| |
| def infoTuple(self): |
| return tuple(filter(None, (self.identifier, self.filename, self.packagepath))) |
| |
| class BuiltinModule(BaseModule): |
| pass |
| |
| class SourceModule(BaseModule): |
| pass |
| |
| class InvalidSourceModule(SourceModule): |
| pass |
| |
| class CompiledModule(BaseModule): |
| pass |
| |
| class InvalidCompiledModule(BaseModule): |
| pass |
| |
| class Package(BaseModule): |
| pass |
| |
| class NamespacePackage(Package): |
| pass |
| |
| class Extension(BaseModule): |
| pass |
| |
| class FlatPackage(BaseModule): # nocoverage |
| def __init__(self, *args, **kwds): |
| warnings.warn("This class will be removed in a future version of modulegraph", |
| DeprecationWarning) |
| super(FlatPackage, *args, **kwds) |
| |
| class ArchiveModule(BaseModule): # nocoverage |
| def __init__(self, *args, **kwds): |
| warnings.warn("This class will be removed in a future version of modulegraph", |
| DeprecationWarning) |
| super(FlatPackage, *args, **kwds) |
| |
| # HTML templates for ModuleGraph generator |
| header = """\ |
| <html> |
| <head> |
| <title>%(TITLE)s</title> |
| <style> |
| .node { margin:1em 0; } |
| </style> |
| </head> |
| <body> |
| <h1>%(TITLE)s</h1>""" |
| entry = """ |
| <div class="node"> |
| <a name="%(NAME)s" /> |
| %(CONTENT)s |
| </div>""" |
| contpl = """<tt>%(NAME)s</tt> %(TYPE)s""" |
| contpl_linked = """\ |
| <a target="code" href="%(URL)s" type="text/plain"><tt>%(NAME)s</tt></a>""" |
| imports = """\ |
| <div class="import"> |
| %(HEAD)s: |
| %(LINKS)s |
| </div> |
| """ |
| footer = """ |
| </body> |
| </html>""" |
| |
| def _ast_names(names): |
| result = [] |
| for nm in names: |
| if isinstance(nm, ast.alias): |
| result.append(nm.name) |
| else: |
| result.append(nm) |
| return result |
| |
| |
| if sys.version_info[0] == 2: |
| DEFAULT_IMPORT_LEVEL= -1 |
| else: |
| DEFAULT_IMPORT_LEVEL= 0 |
| |
| class _Visitor (ast.NodeVisitor): |
| def __init__(self, graph, module): |
| self._graph = graph |
| self._module = module |
| self._level = DEFAULT_IMPORT_LEVEL |
| self._in_if = [False] |
| self._in_def = [False] |
| self._in_tryexcept = [False] |
| |
| @property |
| def in_if(self): |
| return self._in_if[-1] |
| |
| @property |
| def in_def(self): |
| return self._in_def[-1] |
| |
| @property |
| def in_tryexcept(self): |
| return self._in_tryexcept[-1] |
| |
| def _process_import(self, name, fromlist, level): |
| |
| if sys.version_info[0] == 2: |
| if name == '__future__' and 'absolute_import' in (fromlist or ()): |
| self._level = 0 |
| |
| have_star = False |
| if fromlist is not None: |
| fromlist = set(fromlist) |
| if '*' in fromlist: |
| fromlist.remove('*') |
| have_star = True |
| |
| imported_module = self._graph._safe_import_hook(name, |
| self._module, fromlist, level, attr=DependencyInfo( |
| conditional=self.in_if, |
| tryexcept=self.in_tryexcept, |
| function=self.in_def, |
| fromlist=False, |
| ))[0] |
| if have_star: |
| self._module.globalnames.update(imported_module.globalnames) |
| self._module.starimports.update(imported_module.starimports) |
| if imported_module.code is None: |
| self._module.starimports.add(name) |
| |
| |
| def visit_Import(self, node): |
| for nm in _ast_names(node.names): |
| self._process_import(nm, None, self._level) |
| |
| def visit_ImportFrom(self, node): |
| level = node.level if node.level != 0 else self._level |
| self._process_import(node.module or '', _ast_names(node.names), level) |
| |
| def visit_If(self, node): |
| self._in_if.append(True) |
| self.generic_visit(node) |
| self._in_if.pop() |
| |
| def visit_FunctionDef(self, node): |
| self._in_def.append(True) |
| self.generic_visit(node) |
| self._in_def.pop() |
| |
| def visit_Try(self, node): |
| self._in_tryexcept.append(True) |
| self.generic_visit(node) |
| self._in_tryexcept.pop() |
| |
| def visit_ExceptHandler(self, node): |
| self._in_tryexcept.append(True) |
| self.generic_visit(node) |
| self._in_tryexcept.pop() |
| |
| def visit_TryExcept(self, node): |
| self._in_tryexcept.append(True) |
| self.generic_visit(node) |
| self._in_tryexcept.pop() |
| |
| def visit_ExceptHandler(self, node): |
| self._in_tryexcept.append(True) |
| self.generic_visit(node) |
| self._in_tryexcept.pop() |
| |
| def visit_Expression(self, node): |
| # Expression node's cannot contain import statements or |
| # other nodes that are relevant for us. |
| pass |
| |
| # Expression isn't actually used as such in AST trees, |
| # therefore define visitors for all kinds of expression nodes. |
| visit_BoolOp = visit_Expression |
| visit_BinOp = visit_Expression |
| visit_UnaryOp = visit_Expression |
| visit_Lambda = visit_Expression |
| visit_IfExp = visit_Expression |
| visit_Dict = visit_Expression |
| visit_Set = visit_Expression |
| visit_ListComp = visit_Expression |
| visit_SetComp = visit_Expression |
| visit_ListComp = visit_Expression |
| visit_GeneratorExp = visit_Expression |
| visit_Compare = visit_Expression |
| visit_Yield = visit_Expression |
| visit_YieldFrom = visit_Expression |
| visit_Await = visit_Expression |
| visit_Call = visit_Expression |
| |
| |
| |
| class ModuleGraph(ObjectGraph): |
| def __init__(self, path=None, excludes=(), replace_paths=(), implies=(), graph=None, debug=0): |
| super(ModuleGraph, self).__init__(graph=graph, debug=debug) |
| if path is None: |
| path = sys.path |
| self.path = path |
| self.lazynodes = {} |
| # excludes is stronger than implies |
| self.lazynodes.update(dict(implies)) |
| for m in excludes: |
| self.lazynodes[m] = None |
| self.replace_paths = replace_paths |
| |
| self.nspackages = self._calc_setuptools_nspackages() |
| |
| def _calc_setuptools_nspackages(self): |
| # Setuptools has some magic handling for namespace |
| # packages when using 'install --single-version-externally-managed' |
| # (used by system packagers and also by pip) |
| # |
| # When this option is used namespace packages are writting to |
| # disk *without* an __init__.py file, which means the regular |
| # import machinery will not find them. |
| # |
| # We therefore explicitly look for the hack used by |
| # setuptools to get this kind of namespace packages to work. |
| |
| pkgmap = {} |
| |
| try: |
| from pkgutil import ImpImporter |
| except ImportError: |
| try: |
| from _pkgutil import ImpImporter |
| except ImportError: |
| ImpImporter = pkg_resources.ImpWrapper |
| |
| if sys.version_info[:2] >= (3,3): |
| import importlib.machinery |
| ImpImporter = importlib.machinery.FileFinder |
| |
| for entry in self.path: |
| importer = pkg_resources.get_importer(entry) |
| |
| if isinstance(importer, ImpImporter): |
| try: |
| ldir = os.listdir(entry) |
| except os.error: |
| continue |
| |
| for fn in ldir: |
| if fn.endswith('-nspkg.pth'): |
| fp = open(os.path.join(entry, fn), 'rU') |
| try: |
| for ln in fp: |
| for pfx in _SETUPTOOLS_NAMESPACEPKG_PTHs: |
| if ln.startswith(pfx): |
| try: |
| start = len(pfx)-2 |
| stop = ln.index(')', start)+1 |
| except ValueError: |
| continue |
| |
| pkg = _eval_str_tuple(ln[start:stop]) |
| identifier = ".".join(pkg) |
| subdir = os.path.join(entry, *pkg) |
| if os.path.exists(os.path.join(subdir, '__init__.py')): |
| # There is a real __init__.py, ignore the setuptools hack |
| continue |
| |
| if identifier in pkgmap: |
| pkgmap[identifier].append(subdir) |
| else: |
| pkgmap[identifier] = [subdir] |
| break |
| finally: |
| fp.close() |
| |
| return pkgmap |
| |
| def implyNodeReference(self, node, other, edge_data=None): |
| """ |
| Imply that one node depends on another. |
| other may be a module name or another node. |
| |
| For use by extension modules and tricky import code |
| """ |
| if isinstance(other, Node): |
| self._updateReference(node, other, edge_data) |
| |
| else: |
| if isinstance(other, tuple): |
| raise ValueError(other) |
| |
| others = self._safe_import_hook(other, node, None) |
| for other in others: |
| self._updateReference(node, other, edge_data) |
| |
| |
| def getReferences(self, fromnode): |
| """ |
| Yield all nodes that 'fromnode' dependes on (that is, |
| all modules that 'fromnode' imports. |
| """ |
| node = self.findNode(fromnode) |
| out_edges, _ = self.get_edges(node) |
| return out_edges |
| |
| def getReferers(self, tonode, collapse_missing_modules=True): |
| node = self.findNode(tonode) |
| _, in_edges = self.get_edges(node) |
| |
| if collapse_missing_modules: |
| for n in in_edges: |
| if isinstance(n, MissingModule): |
| for n in self.getReferers(n, False): |
| yield n |
| |
| else: |
| yield n |
| |
| else: |
| for n in in_edges: |
| yield n |
| |
| def hasEdge(self, fromnode, tonode): |
| """ Return True iff there is an edge from 'fromnode' to 'tonode' """ |
| fromnode = self.findNode(fromnode) |
| tonode = self.findNode(tonode) |
| |
| return self.graph.edge_by_node(fromnode, tonode) is not None |
| |
| |
| def foldReferences(self, packagenode): |
| """ |
| Create edges to/from 'packagenode' based on the |
| edges to/from modules in package. The module nodes |
| are then hidden. |
| """ |
| pkg = self.findNode(packagenode) |
| |
| for n in self.nodes(): |
| if not n.identifier.startswith(pkg.identifier + '.'): |
| continue |
| |
| iter_out, iter_inc = n.get_edges() |
| for other in iter_out: |
| if other.identifier.startswith(pkg.identifier + '.'): |
| continue |
| |
| if not self.hasEdge(pkg, other): |
| # Ignore circular dependencies |
| self._updateReference(pkg, other, 'pkg-internal-import') |
| |
| for other in iter_in: |
| if other.identifier.startswith(pkg.identifier + '.'): |
| # Ignore circular dependencies |
| continue |
| |
| if not self.hasEdge(other, pkg): |
| self._updateReference(other, pkg, 'pkg-import') |
| |
| self.graph.hide_node(n) |
| |
| # TODO: unfoldReferences(pkg) that restore the submodule nodes and |
| # removes 'pkg-import' and 'pkg-internal-import' edges. Care should |
| # be taken to ensure that references are correct if multiple packages |
| # are folded and then one of them in unfolded |
| |
| |
| def _updateReference(self, fromnode, tonode, edge_data): |
| try: |
| ed = self.edgeData(fromnode, tonode) |
| except (KeyError, GraphError): # XXX: Why 'GraphError' |
| return self.createReference(fromnode, tonode, edge_data) |
| |
| if not (isinstance(ed, DependencyInfo) and isinstance(edge_data, DependencyInfo)): |
| self.updateEdgeData(fromnode, tonode, edge_data) |
| else: |
| self.updateEdgeData(fromnode, tonode, ed._merged(edge_data)) |
| |
| |
| def createReference(self, fromnode, tonode, edge_data='direct'): |
| """ |
| Create a reference from fromnode to tonode |
| """ |
| return super(ModuleGraph, self).createReference(fromnode, tonode, edge_data=edge_data) |
| |
| def findNode(self, name): |
| """ |
| Find a node by identifier. If a node by that identifier exists, |
| it will be returned. |
| |
| If a lazy node exists by that identifier with no dependencies (excluded), |
| it will be instantiated and returned. |
| |
| If a lazy node exists by that identifier with dependencies, it and its |
| dependencies will be instantiated and scanned for additional dependencies. |
| """ |
| data = super(ModuleGraph, self).findNode(name) |
| if data is not None: |
| return data |
| if name in self.lazynodes: |
| deps = self.lazynodes.pop(name) |
| if deps is None: |
| # excluded module |
| m = self.createNode(ExcludedModule, name) |
| elif isinstance(deps, Alias): |
| other = self._safe_import_hook(deps, None, None).pop() |
| m = self.createNode(AliasNode, name, other) |
| self.implyNodeReference(m, other) |
| |
| else: |
| m = self._safe_import_hook(name, None, None).pop() |
| for dep in deps: |
| self.implyNodeReference(m, dep) |
| return m |
| |
| if name in self.nspackages: |
| # name is a --single-version-externally-managed |
| # namespace package (setuptools/distribute) |
| pathnames = self.nspackages.pop(name) |
| m = self.createNode(NamespacePackage, name) |
| |
| # FIXME: The filename must be set to a string to ensure that py2app |
| # works, it is not clear yet why that is. Setting to None would be |
| # cleaner. |
| m.filename = '-' |
| m.packagepath = _namespace_package_path(name, pathnames, self.path) |
| |
| # As per comment at top of file, simulate runtime packagepath additions. |
| m.packagepath = m.packagepath + _packagePathMap.get(name, []) |
| return m |
| |
| return None |
| |
| def run_script(self, pathname, caller=None): |
| """ |
| Create a node by path (not module name). It is expected to be a Python |
| source file, and will be scanned for dependencies. |
| """ |
| self.msg(2, "run_script", pathname) |
| pathname = os.path.realpath(pathname) |
| m = self.findNode(pathname) |
| if m is not None: |
| return m |
| |
| if sys.version_info[0] != 2: |
| with open(pathname, 'rb') as fp: |
| encoding = util.guess_encoding(fp) |
| |
| with open(pathname, _READ_MODE, encoding=encoding) as fp: |
| contents = fp.read() + '\n' |
| |
| else: |
| with open(pathname, _READ_MODE) as fp: |
| contents = fp.read() + '\n' |
| |
| co = compile(contents, pathname, 'exec', ast.PyCF_ONLY_AST, True) |
| m = self.createNode(Script, pathname) |
| self._updateReference(caller, m, None) |
| self._scan_code(co, m) |
| m.code = compile(co, pathname, 'exec', 0, True) |
| if self.replace_paths: |
| m.code = self._replace_paths_in_code(m.code) |
| return m |
| |
| def import_hook(self, name, caller=None, fromlist=None, level=DEFAULT_IMPORT_LEVEL, attr=None): |
| """ |
| Import a module |
| |
| Return the set of modules that are imported |
| """ |
| self.msg(3, "import_hook", name, caller, fromlist, level) |
| parent = self._determine_parent(caller) |
| q, tail = self._find_head_package(parent, name, level) |
| m = self._load_tail(q, tail) |
| modules = [m] |
| if fromlist and m.packagepath: |
| for s in self._ensure_fromlist(m, fromlist): |
| if s not in modules: |
| modules.append(s) |
| for m in modules: |
| self._updateReference(caller, m, edge_data=attr) |
| return modules |
| |
| def _determine_parent(self, caller): |
| """ |
| Determine the package containing a node |
| """ |
| self.msgin(4, "determine_parent", caller) |
| parent = None |
| if caller: |
| pname = caller.identifier |
| |
| if isinstance(caller, Package): |
| parent = caller |
| |
| elif '.' in pname: |
| pname = pname[:pname.rfind('.')] |
| parent = self.findNode(pname) |
| |
| elif caller.packagepath: |
| # XXX: I have no idea why this line |
| # is necessary. |
| parent = self.findNode(pname) |
| |
| |
| self.msgout(4, "determine_parent ->", parent) |
| return parent |
| |
| def _find_head_package(self, parent, name, level=DEFAULT_IMPORT_LEVEL): |
| """ |
| Given a calling parent package and an import name determine the containing |
| package for the name |
| """ |
| self.msgin(4, "find_head_package", parent, name, level) |
| if '.' in name: |
| head, tail = name.split('.', 1) |
| else: |
| head, tail = name, '' |
| |
| if level == -1: |
| if parent: |
| qname = parent.identifier + '.' + head |
| else: |
| qname = head |
| |
| elif level == 0: |
| qname = head |
| |
| # Absolute import, ignore the parent |
| parent = None |
| |
| else: |
| if parent is None: |
| self.msg(2, "Relative import outside of package") |
| raise ImportError("Relative import outside of package (name=%r, parent=%r, level=%r)"%(name, parent, level)) |
| |
| for i in range(level-1): |
| if '.' not in parent.identifier: |
| self.msg(2, "Relative import outside of package") |
| raise ImportError("Relative import outside of package (name=%r, parent=%r, level=%r)"%(name, parent, level)) |
| |
| p_fqdn = parent.identifier.rsplit('.', 1)[0] |
| new_parent = self.findNode(p_fqdn) |
| if new_parent is None: |
| self.msg(2, "Relative import outside of package") |
| raise ImportError("Relative import outside of package (name=%r, parent=%r, level=%r)"%(name, parent, level)) |
| |
| assert new_parent is not parent, (new_parent, parent) |
| parent = new_parent |
| |
| if head: |
| qname = parent.identifier + '.' + head |
| else: |
| qname = parent.identifier |
| |
| |
| q = self._import_module(head, qname, parent) |
| if q: |
| self.msgout(4, "find_head_package ->", (q, tail)) |
| return q, tail |
| if parent: |
| qname = head |
| parent = None |
| q = self._import_module(head, qname, parent) |
| if q: |
| self.msgout(4, "find_head_package ->", (q, tail)) |
| return q, tail |
| self.msgout(4, "raise ImportError: No module named", qname) |
| raise ImportError("No module named " + qname) |
| |
| def _load_tail(self, mod, tail): |
| self.msgin(4, "load_tail", mod, tail) |
| result = mod |
| while tail: |
| i = tail.find('.') |
| if i < 0: i = len(tail) |
| head, tail = tail[:i], tail[i+1:] |
| mname = "%s.%s" % (result.identifier, head) |
| result = self._import_module(head, mname, result) |
| if result is None: |
| result = self.createNode(MissingModule, mname) |
| #self.msgout(4, "raise ImportError: No module named", mname) |
| #raise ImportError("No module named " + mname) |
| self.msgout(4, "load_tail ->", result) |
| return result |
| |
| def _ensure_fromlist(self, m, fromlist): |
| fromlist = set(fromlist) |
| self.msg(4, "ensure_fromlist", m, fromlist) |
| if '*' in fromlist: |
| fromlist.update(self._find_all_submodules(m)) |
| fromlist.remove('*') |
| for sub in fromlist: |
| submod = m.get(sub) |
| if submod is None: |
| if sub in m.globalnames: |
| # Name is a global in the module |
| continue |
| # XXX: ^^^ need something simular for names imported |
| # by 'm'. |
| |
| fullname = m.identifier + '.' + sub |
| submod = self._import_module(sub, fullname, m) |
| if submod is None: |
| raise ImportError("No module named " + fullname) |
| yield submod |
| |
| def _find_all_submodules(self, m): |
| if not m.packagepath: |
| return |
| # 'suffixes' used to be a list hardcoded to [".py", ".pyc", ".pyo"]. |
| # But we must also collect Python extension modules - although |
| # we cannot separate normal dlls from Python extensions. |
| suffixes = [triple[0] for triple in imp.get_suffixes()] |
| for path in m.packagepath: |
| try: |
| names = zipio.listdir(path) |
| except (os.error, IOError): |
| self.msg(2, "can't list directory", path) |
| continue |
| for info in (moduleInfoForPath(p) for p in names): |
| if info is None: continue |
| if info[0] != '__init__': |
| yield info[0] |
| |
| def _import_module(self, partname, fqname, parent): |
| # XXX: Review me for use with absolute imports. |
| self.msgin(3, "import_module", partname, fqname, parent) |
| m = self.findNode(fqname) |
| if m is not None: |
| self.msgout(3, "import_module ->", m) |
| if parent: |
| self._updateReference(m, parent, edge_data=DependencyInfo( |
| conditional=False, fromlist=False, function=False, tryexcept=False |
| )) |
| return m |
| |
| if parent and parent.packagepath is None: |
| self.msgout(3, "import_module -> None") |
| return None |
| |
| try: |
| searchpath = None |
| if parent is not None and parent.packagepath: |
| searchpath = parent.packagepath |
| |
| fp, pathname, stuff = self._find_module(partname, |
| searchpath, parent) |
| |
| except ImportError: |
| self.msgout(3, "import_module ->", None) |
| return None |
| |
| try: |
| m = self._load_module(fqname, fp, pathname, stuff) |
| |
| finally: |
| if fp is not None: |
| fp.close() |
| |
| if parent: |
| self.msgout(4, "create reference", m, "->", parent) |
| self._updateReference(m, parent, edge_data=DependencyInfo( |
| conditional=False, fromlist=False, function=False, tryexcept=False |
| )) |
| parent[partname] = m |
| |
| self.msgout(3, "import_module ->", m) |
| return m |
| |
| def _load_module(self, fqname, fp, pathname, info): |
| suffix, mode, typ = info |
| self.msgin(2, "load_module", fqname, fp and "fp", pathname) |
| |
| if typ == imp.PKG_DIRECTORY: |
| if isinstance(mode, (list, tuple)): |
| packagepath = mode |
| else: |
| packagepath = [] |
| |
| m = self._load_package(fqname, pathname, packagepath) |
| self.msgout(2, "load_module ->", m) |
| return m |
| |
| if typ == imp.PY_SOURCE: |
| contents = fp.read() |
| if isinstance(contents, bytes): |
| contents += b'\n' |
| else: |
| contents += '\n' |
| |
| try: |
| co = compile(contents, pathname, 'exec', ast.PyCF_ONLY_AST, True) |
| #co = compile(contents, pathname, 'exec', 0, True) |
| except SyntaxError: |
| co = None |
| cls = InvalidSourceModule |
| |
| else: |
| cls = SourceModule |
| |
| elif typ == imp.PY_COMPILED: |
| if fp.read(4) != imp.get_magic(): |
| self.msgout(2, "raise ImportError: Bad magic number", pathname) |
| co = None |
| cls = InvalidCompiledModule |
| |
| else: |
| fp.read(4) |
| try: |
| co = marshal.loads(fp.read()) |
| cls = CompiledModule |
| except Exception: |
| co = None |
| cls = InvalidCompiledModule |
| |
| elif typ == imp.C_BUILTIN: |
| cls = BuiltinModule |
| co = None |
| |
| else: |
| cls = Extension |
| co = None |
| |
| m = self.createNode(cls, fqname) |
| m.filename = pathname |
| if co is not None: |
| self._scan_code(co, m) |
| |
| if isinstance(co, ast.AST): |
| co = compile(co, pathname, 'exec', 0, True) |
| if self.replace_paths: |
| co = self._replace_paths_in_code(co) |
| m.code = co |
| |
| |
| self.msgout(2, "load_module ->", m) |
| return m |
| |
| def _safe_import_hook(self, name, caller, fromlist, level=DEFAULT_IMPORT_LEVEL, attr=None): |
| # wrapper for self.import_hook() that won't raise ImportError |
| try: |
| mods = self.import_hook(name, caller, level=level, attr=attr) |
| except ImportError as msg: |
| self.msg(2, "ImportError:", str(msg)) |
| m = self.createNode(MissingModule, _path_from_importerror(msg, name)) |
| self._updateReference(caller, m, edge_data=attr) |
| |
| else: |
| assert len(mods) == 1 |
| m = list(mods)[0] |
| |
| subs = [m] |
| if isinstance(attr, DependencyInfo): |
| attr = attr._replace(fromlist=True) |
| for sub in (fromlist or ()): |
| # If this name is in the module namespace already, |
| # then add the entry to the list of substitutions |
| if sub in m: |
| sm = m[sub] |
| if sm is not None: |
| if sm not in subs: |
| self._updateReference(caller, sm, edge_data=attr) |
| subs.append(sm) |
| continue |
| |
| elif sub in m.globalnames: |
| # Global variable in the module, ignore |
| continue |
| |
| |
| # See if we can load it |
| # fullname = name + '.' + sub |
| fullname = m.identifier + '.' + sub |
| #else: |
| # print("XXX", repr(name), repr(sub), repr(caller), repr(m)) |
| sm = self.findNode(fullname) |
| if sm is None: |
| try: |
| sm = self.import_hook(name, caller, fromlist=[sub], level=level, attr=attr) |
| except ImportError as msg: |
| self.msg(2, "ImportError:", str(msg)) |
| #sm = self.createNode(MissingModule, _path_from_importerror(msg, fullname)) |
| sm = self.createNode(MissingModule, fullname) |
| else: |
| sm = self.findNode(fullname) |
| if sm is None: |
| sm = self.createNode(MissingModule, fullname) |
| |
| m[sub] = sm |
| if sm is not None: |
| self._updateReference(m, sm, edge_data=attr) |
| self._updateReference(caller, sm, edge_data=attr) |
| if sm not in subs: |
| subs.append(sm) |
| return subs |
| |
| def _scan_code(self, co, m): |
| if isinstance(co, ast.AST): |
| #return self._scan_bytecode(compile(co, '-', 'exec', 0, True), m) |
| self._scan_ast(co, m) |
| self._scan_bytecode_stores( |
| compile(co, '-', 'exec', 0, True), m) |
| |
| else: |
| self._scan_bytecode(co, m) |
| |
| def _scan_ast(self, co, m): |
| visitor = _Visitor(self, m) |
| visitor.visit(co) |
| |
| def _scan_bytecode_stores(self, co, m, |
| STORE_NAME=_Bchr(dis.opname.index('STORE_NAME')), |
| STORE_GLOBAL=_Bchr(dis.opname.index('STORE_GLOBAL')), |
| HAVE_ARGUMENT=_Bchr(dis.HAVE_ARGUMENT), |
| unpack=struct.unpack): |
| |
| extended_import = bool(sys.version_info[:2] >= (2,5)) |
| |
| code = co.co_code |
| constants = co.co_consts |
| n = len(code) |
| i = 0 |
| |
| while i < n: |
| c = code[i] |
| i += 1 |
| if c >= HAVE_ARGUMENT: |
| i = i+2 |
| |
| if c == STORE_NAME or c == STORE_GLOBAL: |
| # keep track of all global names that are assigned to |
| oparg = unpack('<H', code[i - 2:i])[0] |
| name = co.co_names[oparg] |
| m.globalnames.add(name) |
| |
| cotype = type(co) |
| for c in constants: |
| if isinstance(c, cotype): |
| self._scan_bytecode_stores(c, m) |
| |
| def _scan_bytecode(self, co, m, |
| HAVE_ARGUMENT=_Bchr(dis.HAVE_ARGUMENT), |
| LOAD_CONST=_Bchr(dis.opname.index('LOAD_CONST')), |
| IMPORT_NAME=_Bchr(dis.opname.index('IMPORT_NAME')), |
| IMPORT_FROM=_Bchr(dis.opname.index('IMPORT_FROM')), |
| STORE_NAME=_Bchr(dis.opname.index('STORE_NAME')), |
| STORE_GLOBAL=_Bchr(dis.opname.index('STORE_GLOBAL')), |
| unpack=struct.unpack): |
| |
| # Python >=2.5: LOAD_CONST flags, LOAD_CONST names, IMPORT_NAME name |
| # Python < 2.5: LOAD_CONST names, IMPORT_NAME name |
| extended_import = bool(sys.version_info[:2] >= (2,5)) |
| |
| code = co.co_code |
| constants = co.co_consts |
| n = len(code) |
| i = 0 |
| |
| level = None |
| fromlist = None |
| |
| while i < n: |
| c = code[i] |
| i += 1 |
| if c >= HAVE_ARGUMENT: |
| i = i+2 |
| |
| if c == IMPORT_NAME: |
| if extended_import: |
| assert code[i-9] == LOAD_CONST |
| assert code[i-6] == LOAD_CONST |
| arg1, arg2 = unpack('<xHxH', code[i-9:i-3]) |
| level = co.co_consts[arg1] |
| fromlist = co.co_consts[arg2] |
| else: |
| assert code[-6] == LOAD_CONST |
| arg1, = unpack('<xH', code[i-6:i-3]) |
| level = -1 |
| fromlist = co.co_consts[arg1] |
| |
| assert fromlist is None or type(fromlist) is tuple |
| oparg, = unpack('<H', code[i - 2:i]) |
| name = co.co_names[oparg] |
| have_star = False |
| if fromlist is not None: |
| fromlist = set(fromlist) |
| if '*' in fromlist: |
| fromlist.remove('*') |
| have_star = True |
| |
| #self.msgin(2, "Before import hook", repr(name), repr(m), repr(fromlist), repr(level)) |
| |
| imported_module = self._safe_import_hook(name, m, fromlist, level)[0] |
| |
| if have_star: |
| m.globalnames.update(imported_module.globalnames) |
| m.starimports.update(imported_module.starimports) |
| if imported_module.code is None: |
| m.starimports.add(name) |
| |
| elif c == STORE_NAME or c == STORE_GLOBAL: |
| # keep track of all global names that are assigned to |
| oparg = unpack('<H', code[i - 2:i])[0] |
| name = co.co_names[oparg] |
| m.globalnames.add(name) |
| |
| cotype = type(co) |
| for c in constants: |
| if isinstance(c, cotype): |
| self._scan_bytecode(c, m) |
| |
| def _load_package(self, fqname, pathname, pkgpath): |
| """ |
| Called only when an imp.PACKAGE_DIRECTORY is found |
| """ |
| self.msgin(2, "load_package", fqname, pathname, pkgpath) |
| newname = _replacePackageMap.get(fqname) |
| if newname: |
| fqname = newname |
| |
| ns_pkgpath = _namespace_package_path(fqname, pkgpath or [], self.path) |
| if ns_pkgpath or pkgpath: |
| # this is a namespace package |
| m = self.createNode(NamespacePackage, fqname) |
| m.filename = '-' |
| m.packagepath = ns_pkgpath |
| else: |
| m = self.createNode(Package, fqname) |
| m.filename = pathname |
| m.packagepath = [pathname] + ns_pkgpath |
| |
| # As per comment at top of file, simulate runtime packagepath additions. |
| m.packagepath = m.packagepath + _packagePathMap.get(fqname, []) |
| |
| |
| |
| try: |
| self.msg(2, "find __init__ for %s"%(m.packagepath,)) |
| fp, buf, stuff = self._find_module("__init__", m.packagepath, parent=m) |
| except ImportError: |
| pass |
| |
| else: |
| try: |
| self.msg(2, "load __init__ for %s"%(m.packagepath,)) |
| self._load_module(fqname, fp, buf, stuff) |
| finally: |
| if fp is not None: |
| fp.close() |
| self.msgout(2, "load_package ->", m) |
| return m |
| |
| def _find_module(self, name, path, parent=None): |
| if parent is not None: |
| # assert path is not None |
| fullname = parent.identifier + '.' + name |
| else: |
| fullname = name |
| |
| node = self.findNode(fullname) |
| if node is not None: |
| self.msgout(3, "find_module -> already included?", node) |
| raise ImportError(name) |
| |
| if path is None: |
| if name in sys.builtin_module_names: |
| return (None, None, ("", "", imp.C_BUILTIN)) |
| |
| path = self.path |
| |
| fp, buf, stuff = find_module(name, path) |
| try: |
| if buf: |
| buf = os.path.realpath(buf) |
| |
| return (fp, buf, stuff) |
| except: |
| fp.close() |
| raise |
| |
| def create_xref(self, out=None): |
| global header, footer, entry, contpl, contpl_linked, imports |
| if out is None: |
| out = sys.stdout |
| scripts = [] |
| mods = [] |
| for mod in self.flatten(): |
| name = os.path.basename(mod.identifier) |
| if isinstance(mod, Script): |
| scripts.append((name, mod)) |
| else: |
| mods.append((name, mod)) |
| scripts.sort() |
| mods.sort() |
| scriptnames = [name for name, m in scripts] |
| scripts.extend(mods) |
| mods = scripts |
| |
| title = "modulegraph cross reference for " + ', '.join(scriptnames) |
| print(header % {"TITLE": title}, file=out) |
| |
| def sorted_namelist(mods): |
| lst = [os.path.basename(mod.identifier) for mod in mods if mod] |
| lst.sort() |
| return lst |
| for name, m in mods: |
| content = "" |
| if isinstance(m, BuiltinModule): |
| content = contpl % {"NAME": name, |
| "TYPE": "<i>(builtin module)</i>"} |
| elif isinstance(m, Extension): |
| content = contpl % {"NAME": name,\ |
| "TYPE": "<tt>%s</tt>" % m.filename} |
| else: |
| url = pathname2url(m.filename or "") |
| content = contpl_linked % {"NAME": name, "URL": url} |
| oute, ince = map(sorted_namelist, self.get_edges(m)) |
| if oute: |
| links = "" |
| for n in oute: |
| links += """ <a href="#%s">%s</a>\n""" % (n, n) |
| content += imports % {"HEAD": "imports", "LINKS": links} |
| if ince: |
| links = "" |
| for n in ince: |
| links += """ <a href="#%s">%s</a>\n""" % (n, n) |
| content += imports % {"HEAD": "imported by", "LINKS": links} |
| print(entry % {"NAME": name,"CONTENT": content}, file=out) |
| print(footer, file=out) |
| |
| |
| def itergraphreport(self, name='G', flatpackages=()): |
| # XXX: Can this be implemented using Dot()? |
| nodes = map(self.graph.describe_node, self.graph.iterdfs(self)) |
| describe_edge = self.graph.describe_edge |
| edges = deque() |
| packagenodes = set() |
| packageidents = {} |
| nodetoident = {} |
| inpackages = {} |
| mainedges = set() |
| |
| # XXX - implement |
| flatpackages = dict(flatpackages) |
| |
| def nodevisitor(node, data, outgoing, incoming): |
| if not isinstance(data, Node): |
| return {'label': str(node)} |
| #if isinstance(d, (ExcludedModule, MissingModule, BadModule)): |
| # return None |
| s = '<f0> ' + type(data).__name__ |
| for i,v in enumerate(data.infoTuple()[:1], 1): |
| s += '| <f%d> %s' % (i,v) |
| return {'label':s, 'shape':'record'} |
| |
| |
| def edgevisitor(edge, data, head, tail): |
| # XXX: This method nonsense, the edge |
| # data is never initialized. |
| if data == 'orphan': |
| return {'style':'dashed'} |
| elif data == 'pkgref': |
| return {'style':'dotted'} |
| return {} |
| |
| yield 'digraph %s {\n' % (name,) |
| attr = dict(rankdir='LR', concentrate='true') |
| cpatt = '%s="%s"' |
| for item in attr.items(): |
| yield '\t%s;\n' % (cpatt % item,) |
| |
| # find all packages (subgraphs) |
| for (node, data, outgoing, incoming) in nodes: |
| nodetoident[node] = getattr(data, 'identifier', None) |
| if isinstance(data, Package): |
| packageidents[data.identifier] = node |
| inpackages[node] = set([node]) |
| packagenodes.add(node) |
| |
| |
| # create sets for subgraph, write out descriptions |
| for (node, data, outgoing, incoming) in nodes: |
| # update edges |
| for edge in (describe_edge(e) for e in outgoing): |
| edges.append(edge) |
| |
| # describe node |
| yield '\t"%s" [%s];\n' % ( |
| node, |
| ','.join([ |
| (cpatt % item) for item in |
| nodevisitor(node, data, outgoing, incoming).items() |
| ]), |
| ) |
| |
| inside = inpackages.get(node) |
| if inside is None: |
| inside = inpackages[node] = set() |
| ident = nodetoident[node] |
| if ident is None: |
| continue |
| pkgnode = packageidents.get(ident[:ident.rfind('.')]) |
| if pkgnode is not None: |
| inside.add(pkgnode) |
| |
| |
| graph = [] |
| subgraphs = {} |
| for key in packagenodes: |
| subgraphs[key] = [] |
| |
| while edges: |
| edge, data, head, tail = edges.popleft() |
| if ((head, tail)) in mainedges: |
| continue |
| mainedges.add((head, tail)) |
| tailpkgs = inpackages[tail] |
| common = inpackages[head] & tailpkgs |
| if not common and tailpkgs: |
| usepkgs = sorted(tailpkgs) |
| if len(usepkgs) != 1 or usepkgs[0] != tail: |
| edges.append((edge, data, head, usepkgs[0])) |
| edges.append((edge, 'pkgref', usepkgs[-1], tail)) |
| continue |
| if common: |
| common = common.pop() |
| if tail == common: |
| edges.append((edge, data, tail, head)) |
| elif head == common: |
| subgraphs[common].append((edge, 'pkgref', head, tail)) |
| else: |
| edges.append((edge, data, common, head)) |
| edges.append((edge, data, common, tail)) |
| |
| else: |
| graph.append((edge, data, head, tail)) |
| |
| def do_graph(edges, tabs): |
| edgestr = tabs + '"%s" -> "%s" [%s];\n' |
| # describe edge |
| for (edge, data, head, tail) in edges: |
| attribs = edgevisitor(edge, data, head, tail) |
| yield edgestr % ( |
| head, |
| tail, |
| ','.join([(cpatt % item) for item in attribs.items()]), |
| ) |
| |
| for g, edges in subgraphs.items(): |
| yield '\tsubgraph "cluster_%s" {\n' % (g,) |
| yield '\t\tlabel="%s";\n' % (nodetoident[g],) |
| for s in do_graph(edges, '\t\t'): |
| yield s |
| yield '\t}\n' |
| |
| for s in do_graph(graph, '\t'): |
| yield s |
| |
| yield '}\n' |
| |
| |
| def graphreport(self, fileobj=None, flatpackages=()): |
| if fileobj is None: |
| fileobj = sys.stdout |
| fileobj.writelines(self.itergraphreport(flatpackages=flatpackages)) |
| |
| def report(self): |
| """Print a report to stdout, listing the found modules with their |
| paths, as well as modules that are missing, or seem to be missing. |
| """ |
| print() |
| print("%-15s %-25s %s" % ("Class", "Name", "File")) |
| print("%-15s %-25s %s" % ("-----", "----", "----")) |
| # Print modules found |
| sorted = [(os.path.basename(mod.identifier), mod) for mod in self.flatten()] |
| sorted.sort() |
| for (name, m) in sorted: |
| print("%-15s %-25s %s" % (type(m).__name__, name, m.filename or "")) |
| |
| def _replace_paths_in_code(self, co): |
| new_filename = original_filename = os.path.normpath(co.co_filename) |
| for f, r in self.replace_paths: |
| f = os.path.join(f, '') |
| r = os.path.join(r, '') |
| if original_filename.startswith(f): |
| new_filename = r + original_filename[len(f):] |
| break |
| |
| else: |
| return co |
| |
| consts = list(co.co_consts) |
| for i in range(len(consts)): |
| if isinstance(consts[i], type(co)): |
| consts[i] = self._replace_paths_in_code(consts[i]) |
| |
| code_func = type(co) |
| |
| if hasattr(co, 'co_kwonlyargcount'): |
| return code_func(co.co_argcount, co.co_kwonlyargcount, co.co_nlocals, co.co_stacksize, |
| co.co_flags, co.co_code, tuple(consts), co.co_names, |
| co.co_varnames, new_filename, co.co_name, |
| co.co_firstlineno, co.co_lnotab, |
| co.co_freevars, co.co_cellvars) |
| else: |
| return code_func(co.co_argcount, co.co_nlocals, co.co_stacksize, |
| co.co_flags, co.co_code, tuple(consts), co.co_names, |
| co.co_varnames, new_filename, co.co_name, |
| co.co_firstlineno, co.co_lnotab, |
| co.co_freevars, co.co_cellvars) |