pyramid.config.assets 源代码

import os
import pkg_resources
import sys

from zope.interface import implementer

from pyramid.interfaces import IPackageOverrides, PHASE1_CONFIG

from pyramid.exceptions import ConfigurationError
from pyramid.threadlocal import get_current_registry

from pyramid.config.actions import action_method


class OverrideProvider(pkg_resources.DefaultProvider):
    def __init__(self, module):
        pkg_resources.DefaultProvider.__init__(self, module)
        self.module_name = module.__name__

    def _get_overrides(self):
        reg = get_current_registry()
        overrides = reg.queryUtility(IPackageOverrides, self.module_name)
        return overrides

    def get_resource_filename(self, manager, resource_name):
        """Return a true filesystem path for resource_name,
        co-ordinating the extraction with manager, if the resource
        must be unpacked to the filesystem.
        """
        overrides = self._get_overrides()
        if overrides is not None:
            filename = overrides.get_filename(resource_name)
            if filename is not None:
                return filename
        return pkg_resources.DefaultProvider.get_resource_filename(
            self, manager, resource_name
        )

    def get_resource_stream(self, manager, resource_name):
        """ Return a readable file-like object for resource_name."""
        overrides = self._get_overrides()
        if overrides is not None:
            stream = overrides.get_stream(resource_name)
            if stream is not None:
                return stream
        return pkg_resources.DefaultProvider.get_resource_stream(
            self, manager, resource_name
        )

    def get_resource_string(self, manager, resource_name):
        """ Return a string containing the contents of resource_name."""
        overrides = self._get_overrides()
        if overrides is not None:
            string = overrides.get_string(resource_name)
            if string is not None:
                return string
        return pkg_resources.DefaultProvider.get_resource_string(
            self, manager, resource_name
        )

    def has_resource(self, resource_name):
        overrides = self._get_overrides()
        if overrides is not None:
            result = overrides.has_resource(resource_name)
            if result is not None:
                return result
        return pkg_resources.DefaultProvider.has_resource(self, resource_name)

    def resource_isdir(self, resource_name):
        overrides = self._get_overrides()
        if overrides is not None:
            result = overrides.isdir(resource_name)
            if result is not None:
                return result
        return pkg_resources.DefaultProvider.resource_isdir(
            self, resource_name
        )

    def resource_listdir(self, resource_name):
        overrides = self._get_overrides()
        if overrides is not None:
            result = overrides.listdir(resource_name)
            if result is not None:
                return result
        return pkg_resources.DefaultProvider.resource_listdir(
            self, resource_name
        )


@implementer(IPackageOverrides)
class PackageOverrides(object):
    # pkg_resources arg in kw args below for testing
    def __init__(self, package, pkg_resources=pkg_resources):
        loader = self._real_loader = getattr(package, '__loader__', None)
        if isinstance(loader, self.__class__):
            self._real_loader = None
        # We register ourselves as a __loader__ *only* to support the
        # setuptools _find_adapter adapter lookup; this class doesn't
        # actually support the PEP 302 loader "API".  This is
        # excusable due to the following statement in the spec:
        # ... Loader objects are not
        # required to offer any useful functionality (any such functionality,
        # such as the zipimport get_data() method mentioned above, is
        # optional)...
        # A __loader__ attribute is basically metadata, and setuptools
        # uses it as such.
        package.__loader__ = self
        # we call register_loader_type for every instantiation of this
        # class; that's OK, it's idempotent to do it more than once.
        pkg_resources.register_loader_type(self.__class__, OverrideProvider)
        self.overrides = []
        self.overridden_package_name = package.__name__

    def insert(self, path, source):
        if not path or path.endswith('/'):
            override = DirectoryOverride(path, source)
        else:
            override = FileOverride(path, source)
        self.overrides.insert(0, override)
        return override

    def filtered_sources(self, resource_name):
        for override in self.overrides:
            o = override(resource_name)
            if o is not None:
                yield o

    def get_filename(self, resource_name):
        for source, path in self.filtered_sources(resource_name):
            result = source.get_filename(path)
            if result is not None:
                return result

    def get_stream(self, resource_name):
        for source, path in self.filtered_sources(resource_name):
            result = source.get_stream(path)
            if result is not None:
                return result

    def get_string(self, resource_name):
        for source, path in self.filtered_sources(resource_name):
            result = source.get_string(path)
            if result is not None:
                return result

    def has_resource(self, resource_name):
        for source, path in self.filtered_sources(resource_name):
            if source.exists(path):
                return True

    def isdir(self, resource_name):
        for source, path in self.filtered_sources(resource_name):
            result = source.isdir(path)
            if result is not None:
                return result

    def listdir(self, resource_name):
        for source, path in self.filtered_sources(resource_name):
            result = source.listdir(path)
            if result is not None:
                return result

    @property
    def real_loader(self):
        if self._real_loader is None:
            raise NotImplementedError()
        return self._real_loader

    def get_data(self, path):
        """See IPEP302Loader."""
        return self.real_loader.get_data(path)

    def is_package(self, fullname):
        """See IPEP302Loader."""
        return self.real_loader.is_package(fullname)

    def get_code(self, fullname):
        """See IPEP302Loader."""
        return self.real_loader.get_code(fullname)

    def get_source(self, fullname):
        """See IPEP302Loader."""
        return self.real_loader.get_source(fullname)


class DirectoryOverride:
    def __init__(self, path, source):
        self.path = path
        self.pathlen = len(self.path)
        self.source = source

    def __call__(self, resource_name):
        if resource_name.startswith(self.path):
            new_path = resource_name[self.pathlen :]
            return self.source, new_path


class FileOverride:
    def __init__(self, path, source):
        self.path = path
        self.source = source

    def __call__(self, resource_name):
        if resource_name == self.path:
            return self.source, ''


class PackageAssetSource(object):
    """
    An asset source relative to a package.

    If this asset source is a file, then we expect the ``prefix`` to point
    to the new name of the file, and the incoming ``resource_name`` will be
    the empty string, as returned by the ``FileOverride``.

    """

    def __init__(self, package, prefix):
        self.package = package
        if hasattr(package, '__name__'):
            self.pkg_name = package.__name__
        else:
            self.pkg_name = package
        self.prefix = prefix

    def get_path(self, resource_name):
        return '%s%s' % (self.prefix, resource_name)

    def get_filename(self, resource_name):
        path = self.get_path(resource_name)
        if pkg_resources.resource_exists(self.pkg_name, path):
            return pkg_resources.resource_filename(self.pkg_name, path)

    def get_stream(self, resource_name):
        path = self.get_path(resource_name)
        if pkg_resources.resource_exists(self.pkg_name, path):
            return pkg_resources.resource_stream(self.pkg_name, path)

    def get_string(self, resource_name):
        path = self.get_path(resource_name)
        if pkg_resources.resource_exists(self.pkg_name, path):
            return pkg_resources.resource_string(self.pkg_name, path)

    def exists(self, resource_name):
        path = self.get_path(resource_name)
        if pkg_resources.resource_exists(self.pkg_name, path):
            return True

    def isdir(self, resource_name):
        path = self.get_path(resource_name)
        if pkg_resources.resource_exists(self.pkg_name, path):
            return pkg_resources.resource_isdir(self.pkg_name, path)

    def listdir(self, resource_name):
        path = self.get_path(resource_name)
        if pkg_resources.resource_exists(self.pkg_name, path):
            return pkg_resources.resource_listdir(self.pkg_name, path)


class FSAssetSource(object):
    """
    An asset source relative to a path in the filesystem.

    """

    def __init__(self, prefix):
        self.prefix = prefix

    def get_path(self, resource_name):
        if resource_name:
            path = os.path.join(self.prefix, resource_name)
        else:
            path = self.prefix
        return path

    def get_filename(self, resource_name):
        path = self.get_path(resource_name)
        if os.path.exists(path):
            return path

    def get_stream(self, resource_name):
        path = self.get_filename(resource_name)
        if path is not None:
            return open(path, 'rb')

    def get_string(self, resource_name):
        stream = self.get_stream(resource_name)
        if stream is not None:
            with stream:
                return stream.read()

    def exists(self, resource_name):
        path = self.get_filename(resource_name)
        if path is not None:
            return True

    def isdir(self, resource_name):
        path = self.get_filename(resource_name)
        if path is not None:
            return os.path.isdir(path)

    def listdir(self, resource_name):
        path = self.get_filename(resource_name)
        if path is not None:
            return os.listdir(path)


class AssetsConfiguratorMixin(object):
    def _override(
        self, package, path, override_source, PackageOverrides=PackageOverrides
    ):
        pkg_name = package.__name__
        override = self.registry.queryUtility(IPackageOverrides, name=pkg_name)
        if override is None:
            override = PackageOverrides(package)
            self.registry.registerUtility(
                override, IPackageOverrides, name=pkg_name
            )
        override.insert(path, override_source)

    @action_method
    def override_asset(self, to_override, override_with, _override=None):
        """Add a :app:`Pyramid` asset override to the current
        configuration state.

        ``to_override`` is an :term:`asset specification` to the
        asset being overridden.

        ``override_with`` is an :term:`asset specification` to the
        asset that is performing the override. This may also be an absolute
        path.

        See :ref:`assets_chapter` for more
        information about asset overrides."""
        if to_override == override_with:
            raise ConfigurationError(
                'You cannot override an asset with itself'
            )

        package = to_override
        path = ''
        if ':' in to_override:
            package, path = to_override.split(':', 1)

        # *_isdir = override is package or directory
        overridden_isdir = path == '' or path.endswith('/')

        if os.path.isabs(override_with):
            override_source = FSAssetSource(override_with)
            if not os.path.exists(override_with):
                raise ConfigurationError(
                    'Cannot override asset with an absolute path that does '
                    'not exist'
                )
            override_isdir = os.path.isdir(override_with)
            override_package = None
            override_prefix = override_with
        else:
            override_package = override_with
            override_prefix = ''
            if ':' in override_with:
                override_package, override_prefix = override_with.split(':', 1)

            __import__(override_package)
            to_package = sys.modules[override_package]
            override_source = PackageAssetSource(to_package, override_prefix)

            override_isdir = override_prefix == '' or override_with.endswith(
                '/'
            )

        if overridden_isdir and (not override_isdir):
            raise ConfigurationError(
                'A directory cannot be overridden with a file (put a '
                'slash at the end of override_with if necessary)'
            )

        if (not overridden_isdir) and override_isdir:
            raise ConfigurationError(
                'A file cannot be overridden with a directory (put a '
                'slash at the end of to_override if necessary)'
            )

        override = _override or self._override  # test jig

        def register():
            __import__(package)
            from_package = sys.modules[package]
            override(from_package, path, override_source)

        intr = self.introspectable(
            'asset overrides',
            (package, override_package, path, override_prefix),
            '%s -> %s' % (to_override, override_with),
            'asset override',
        )
        intr['to_override'] = to_override
        intr['override_with'] = override_with
        self.action(
            None, register, introspectables=(intr,), order=PHASE1_CONFIG
        )

    override_resource = override_asset  # bw compat