##################################################################
# Copyright 2018 Open Source Geospatial Foundation and others #
# licensed under MIT, Please consult LICENSE.txt for details #
##################################################################
"""Validator classes are used for ComplexInputs, to validate the content
"""
import logging
from pywps.validator.mode import MODE
from pywps.inout.formats import FORMATS
from lxml.etree import XMLSchema
from pywps import xml_util as etree
from urllib.request import urlopen
import mimetypes
import os
LOGGER = logging.getLogger('PYWPS')
[文档]def validategml(data_input, mode):
"""GML validation function
:param data_input: :class:`ComplexInput`
:param pywps.validator.mode.MODE mode:
This function validates GML input based on given validation mode. Following
happens, if `mode` parameter is given:
`MODE.NONE`
it will return always `True`
`MODE.SIMPLE`
the mimetype will be checked
`MODE.STRICT`
`Fiona` is used for getting the proper format.
`MODE.VERYSTRICT`
the :class:`lxml.etree` is used along with given input `schema` and the
GML file is properly validated against given schema.
"""
LOGGER.info('validating GML; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.GML.mime_type}
if mode >= MODE.STRICT:
try:
import fiona
data_source = fiona.open(data_input.file)
passed = (data_source.driver == "GML")
except (ModuleNotFoundError, ImportError):
passed = False
if mode >= MODE.VERYSTRICT:
try:
schema_url = data_input.data_format.schema
gmlschema_doc = etree.parse(urlopen(schema_url))
gmlschema = XMLSchema(gmlschema_doc)
passed = gmlschema.validate(etree.parse(data_input.stream))
except Exception as e:
LOGGER.warning(e)
passed = False
return passed
def validategpx(data_input, mode):
"""GPX validation function
:param data_input: :class:`ComplexInput`
:param pywps.validator.mode.MODE mode:
This function validates GPX input based on given validation mode. Following
happens, if `mode` parameter is given:
`MODE.NONE`
it will return always `True`
`MODE.SIMPLE`
the mimetype will be checked
`MODE.STRICT`
`Fiona` is used for getting the proper format.
`MODE.VERYSTRICT`
the :class:`lxml.etree` is used along with given input `schema` and the
GPX file is properly validated against given schema.
"""
LOGGER.info('validating GPX; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.GPX.mime_type}
if mode >= MODE.STRICT:
try:
import fiona
data_source = fiona.open(data_input.file)
passed = (data_source.driver == "GPX")
except (ModuleNotFoundError, ImportError):
passed = False
if mode >= MODE.VERYSTRICT:
try:
schema_url = data_input.data_format.schema
gpxschema_doc = etree.parse(urlopen(schema_url))
gpxschema = XMLSchema(gpxschema_doc)
passed = gpxschema.validate(etree.parse(data_input.stream))
except Exception as e:
LOGGER.warning(e)
passed = False
return passed
def validatexml(data_input, mode):
"""XML validation function
:param data_input: :class:`ComplexInput`
:param pywps.validator.mode.MODE mode:
This function validates XML input based on given validation mode. Following
happens, if `mode` parameter is given:
`MODE.NONE`
it will return always `True`
`MODE.SIMPLE`
the mimetype will be checked
`MODE.STRICT` and `MODE.VERYSTRICT`
the :class:`lxml.etree` is used along with given input `schema` and the
XML file is properly validated against given schema.
"""
LOGGER.info('validating XML; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.GML.mime_type}
if mode >= MODE.STRICT:
# TODO: Raise the actual validation exception to make it easier to spot the error.
# xml = etree.parse(data_input.file)
# schema.assertValid(xml)
try:
fn = os.path.join(_get_schemas_home(), data_input.data_format.schema)
schema_doc = etree.parse(fn)
schema = XMLSchema(schema_doc)
passed = schema.validate(etree.parse(data_input.file))
except Exception as e:
LOGGER.warning(e)
passed = False
return passed
def validatejson(data_input, mode):
"""JSON validation function
:param data_input: :class:`ComplexInput`
:param pywps.validator.mode.MODE mode:
This function validates JSON input based on given validation mode. Following
happens, if `mode` parameter is given:
`MODE.NONE`
No validation, returns `True`.
`MODE.SIMPLE`
Returns `True` if the mime type is correct.
`MODE.STRICT`
Returns `True` if the content can be interpreted as a json object.
"""
LOGGER.info('validating JSON; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.JSON.mime_type}
if mode >= MODE.STRICT:
import json
try:
with open(data_input.file) as f:
json.load(f)
passed = True
except ValueError:
passed = False
return passed
def validategeojson(data_input, mode):
"""GeoJSON validation example
>>> import StringIO
>>> class FakeInput(object):
... json = open('point.geojson','w')
... json.write('''{"type":"Feature", "properties":{}, "geometry":{"type":"Point", "coordinates":[8.5781228542328, 22.87500500679]}, "crs":{"type":"name", "properties":{"name":"urn:ogc:def:crs:OGC:1.3:CRS84"}}}''') # noqa
... json.close()
... file = 'point.geojson'
>>> class fake_data_format(object):
... mimetype = 'application/geojson'
>>> fake_input = FakeInput()
>>> fake_input.data_format = fake_data_format()
>>> validategeojson(fake_input, MODE.SIMPLE)
True
"""
LOGGER.info('validating GeoJSON; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.GEOJSON.mime_type}
if mode >= MODE.STRICT:
try:
import fiona
data_source = fiona.open(data_input.file)
passed = (data_source.driver == "GeoJSON")
except (ModuleNotFoundError, ImportError):
passed = False
if mode >= MODE.VERYSTRICT:
import jsonschema
import json
# this code comes from
# https://github.com/om-henners/GeoJSON_Validation/blob/master/geojsonvalidation/geojson_validation.py
schema_home = os.path.join(_get_schemas_home(), "geojson")
base_schema = os.path.join(schema_home, "geojson.json")
with open(base_schema) as fh:
geojson_base = json.load(fh)
with open(os.path.join(schema_home, "crs.json")) as fh:
crs_json = json.load(fh)
with open(os.path.join(schema_home, "bbox.json")) as fh:
bbox_json = json.load(fh)
with open(os.path.join(schema_home, "geometry.json")) as fh:
geometry_json = json.load(fh)
cached_json = {
"http://json-schema.org/geojson/crs.json": crs_json,
"http://json-schema.org/geojson/bbox.json": bbox_json,
"http://json-schema.org/geojson/geometry.json": geometry_json
}
resolver = jsonschema.RefResolver(
"http://json-schema.org/geojson/geojson.json",
geojson_base, store=cached_json)
validator = jsonschema.Draft4Validator(geojson_base, resolver=resolver)
try:
validator.validate(json.loads(data_input.stream.read()))
passed = True
except jsonschema.ValidationError:
passed = False
return passed
def validateshapefile(data_input, mode):
"""ESRI Shapefile validation example
"""
LOGGER.info('validating Shapefile; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.SHP.mime_type}
if mode >= MODE.STRICT:
try:
import fiona
sf = fiona.open(data_input.file)
passed = (sf.driver == "ESRI Shapefile")
except (ModuleNotFoundError, ImportError):
passed = False
return passed
def validategeotiff(data_input, mode):
"""GeoTIFF validation example
"""
LOGGER.info('Validating Shapefile; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.GEOTIFF.mime_type}
if mode >= MODE.STRICT:
try:
from geotiff import GeoTiff
data_source = GeoTiff(data_input.file)
passed = (data_source.crs_code > 0)
except (ModuleNotFoundError, ImportError):
passed = False
return passed
def validatenetcdf(data_input, mode):
"""netCDF validation.
"""
LOGGER.info('Validating netCDF; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.file
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.NETCDF.mime_type}
if mode >= MODE.STRICT:
try:
from pywps.dependencies import netCDF4 as nc
nc.Dataset(data_input.file)
passed = True
except ImportError as e:
passed = False
LOGGER.exception("ImportError while validating netCDF4 file {}:\n {}".format(data_input.file, e))
except IOError as e:
passed = False
LOGGER.exception("IOError while validating netCDF4 file {}:\n {}".format(data_input.file, e))
return passed
def validatedods(data_input, mode):
"""OPeNDAP validation.
"""
LOGGER.info('Validating OPeNDAP; Mode: {}'.format(mode))
passed = False
if mode >= MODE.NONE:
passed = True
if mode >= MODE.SIMPLE:
name = data_input.url
(mtype, encoding) = mimetypes.guess_type(name, strict=False)
passed = data_input.data_format.mime_type in {mtype, FORMATS.DODS.mime_type}
if mode >= MODE.STRICT:
try:
from pywps.dependencies import netCDF4 as nc
nc.Dataset(data_input.url)
passed = True
except ImportError as e:
passed = False
LOGGER.exception("ImportError while validating OPeNDAP link {}:\n {}".format(data_input.url, e))
except IOError as e:
passed = False
LOGGER.exception("IOError while validating OPeNDAP link {}:\n {}".format(data_input.url, e))
return passed
def _get_schemas_home():
"""Get path to schemas directory
"""
schema_dir = os.path.join(
os.path.abspath(
os.path.dirname(__file__)
),
os.path.pardir,
"schemas")
LOGGER.debug('Schemas directory: {}'.format(schema_dir))
return schema_dir