pywps.exceptions 源代码

##################################################################
# Copyright 2018 Open Source Geospatial Foundation and others    #
# licensed under MIT, Please consult LICENSE.txt for details     #
##################################################################

"""
OGC OWS and WPS Exceptions

Based on OGC OWS, WPS and

http://lists.opengeospatial.org/pipermail/wps-dev/2013-October/000335.html
"""

import json

from werkzeug.datastructures import MIMEAccept
from werkzeug.http import parse_accept_header
from werkzeug.wrappers import Response
from werkzeug.exceptions import HTTPException
from markupsafe import escape

import logging

from pywps import __version__
from pywps.app.basic import get_json_indent, get_response_type, parse_http_url

__author__ = "Alex Morega & Calin Ciociu"

LOGGER = logging.getLogger('PYWPS')


[文档]class NoApplicableCode(HTTPException): """No applicable code exception implementation also Base exception class """ code = 500 locator = "" def __init__(self, description, locator="", code=400): self.code = code self.description = description self.locator = locator msg = 'Exception: code: {}, description: {}, locator: {}'.format(self.code, self.description, self.locator) LOGGER.exception(msg) HTTPException.__init__(self) @property def name(self): """The status name.""" return self.__class__.__name__ def get_description(self, environ=None): """Get the description.""" if self.description: return escape(self.description) else: return '' def get_response(self, environ=None): args = { 'version': __version__, 'code': self.code, 'locator': escape(self.locator), 'name': escape(self.name), 'description': self.get_description(environ) } accept_mimetypes = parse_accept_header(environ.get("HTTP_ACCEPT"), MIMEAccept) request = environ.get('werkzeug.request', None) default_mimetype = None if not request else request.args.get('f', None) if default_mimetype is None: default_mimetype = parse_http_url(request).get('default_mimetype') json_response, mimetype = get_response_type(accept_mimetypes, default_mimetype) if json_response: doc = json.dumps(args, indent=get_json_indent()) else: doc = str(( '<?xml version="1.0" encoding="UTF-8"?>\n' '<!-- PyWPS {version} -->\n' '<ows:ExceptionReport xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/ows/1.1 http://schemas.opengis.net/ows/1.1.0/owsExceptionReport.xsd" version="1.0.0">\n' # noqa ' <ows:Exception exceptionCode="{name}" locator="{locator}" >\n' ' <ows:ExceptionText>{description}</ows:ExceptionText>\n' ' </ows:Exception>\n' '</ows:ExceptionReport>' ).format(**args)) return Response(doc, self.code, mimetype=mimetype)
[文档]class InvalidParameterValue(NoApplicableCode): """Invalid parameter value exception implementation """ code = 400
[文档]class MissingParameterValue(NoApplicableCode): """Missing parameter value exception implementation """ code = 400
[文档]class FileSizeExceeded(NoApplicableCode): """File size exceeded exception implementation """ code = 400
[文档]class VersionNegotiationFailed(NoApplicableCode): """Version negotiation exception implementation """ code = 400
[文档]class OperationNotSupported(NoApplicableCode): """Operation not supported exception implementation """ code = 501
[文档]class StorageNotSupported(NoApplicableCode): """Storage not supported exception implementation """ code = 400
[文档]class NotEnoughStorage(NoApplicableCode): """Not enough storage exception implementation """ code = 400
class FileStorageError(NoApplicableCode): """File storage exception implementation """ code = 400 class ServerBusy(NoApplicableCode): """Max number of operations exceeded """ code = 400 description = 'Maximum number of processes exceeded' def get_body(self, environ=None): """Get the XML body.""" args = { 'name': escape(self.name), 'description': self.get_description(environ) } return str(( '<?xml version="1.0" encoding="UTF-8"?>\n' '<ows:ExceptionReport xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/ows/1.1 ../../../ows/1.1.0/owsExceptionReport.xsd" version="1.0.0">' # noqa '<ows:Exception exceptionCode="{name}">' '{description}' '</ows:Exception>' '</ows:ExceptionReport>' ).format(**args)) class FileURLNotSupported(NoApplicableCode): """File URL not supported exception implementation """ code = 400 description = 'File URL not supported as input.' def __init__(self, description="", locator="", code=400): description = description or self.description NoApplicableCode.__init__(self, description=description, locator=locator, code=code) class SchedulerNotAvailable(NoApplicableCode): """Job scheduler not available exception implementation """ code = 400