##################################################################
# Copyright 2018 Open Source Geospatial Foundation and others #
# licensed under MIT, Please consult LICENSE.txt for details #
##################################################################
import logging
import lxml
from pywps import xml_util as etree
from werkzeug.exceptions import MethodNotAllowed
from pywps import get_ElementMakerForVersion
import base64
import datetime
from pywps.app.basic import get_xpath_ns, parse_http_url
from pywps.inout.inputs import input_from_json
from pywps.exceptions import NoApplicableCode, OperationNotSupported, MissingParameterValue, VersionNegotiationFailed, \
InvalidParameterValue, FileSizeExceeded
from pywps import configuration
from pywps.configuration import wps_strict
from pywps import get_version_from_ns
import json
from urllib.parse import unquote
LOGGER = logging.getLogger("PYWPS")
default_version = '1.0.0'
[文档]class WPSRequest(object):
def __init__(self, http_request=None, preprocessors=None):
self.http_request = http_request
self.operation = None
self.version = None
self.api = None
self.default_mimetype = None
self.language = None
self.identifier = None
self.identifiers = None
self.store_execute = None
self.status = None
self.lineage = None
self.inputs = {}
self.output_ids = None
self.outputs = {}
self.raw = None
self.WPS = None
self.OWS = None
self.xpath_ns = None
self.preprocessors = preprocessors or dict()
self.preprocess_request = None
self.preprocess_response = None
if http_request:
d = parse_http_url(http_request)
self.operation = d.get('operation')
self.identifier = d.get('identifier')
self.output_ids = d.get('output_ids')
self.api = d.get('api')
self.default_mimetype = d.get('default_mimetype')
request_parser = self._get_request_parser_method(http_request.method)
request_parser()
def _get_request_parser_method(self, method):
if method == 'GET':
return self._get_request
elif method == 'POST':
return self._post_request
else:
raise MethodNotAllowed()
def _get_request(self):
"""HTTP GET request parser
"""
# service shall be WPS
service = _get_get_param(self.http_request, 'service', None if wps_strict else 'wps')
if service:
if str(service).lower() != 'wps':
raise InvalidParameterValue(
'parameter SERVICE [{}] not supported'.format(service), 'service')
else:
raise MissingParameterValue('service', 'service')
self.operation = _get_get_param(self.http_request, 'request', self.operation)
language = _get_get_param(self.http_request, 'language')
self.check_and_set_language(language)
request_parser = self._get_request_parser(self.operation)
request_parser(self.http_request)
def _post_request(self):
"""HTTP GET request parser
"""
# check if input file size was not exceeded
maxsize = configuration.get_config_value('server', 'maxrequestsize')
maxsize = configuration.get_size_mb(maxsize) * 1024 * 1024
if self.http_request.content_length > maxsize:
raise FileSizeExceeded('File size for input exceeded.'
' Maximum request size allowed: {} megabytes'.format(maxsize / 1024 / 1024))
content_type = self.http_request.content_type or [] # or self.http_request.mimetype
json_input = 'json' in content_type
if not json_input:
try:
doc = etree.fromstring(self.http_request.get_data())
except Exception as e:
raise NoApplicableCode(str(e))
operation = doc.tag
version = get_version_from_ns(doc.nsmap[doc.prefix])
self.set_version(version)
language = doc.attrib.get('language')
self.check_and_set_language(language)
request_parser = self._post_request_parser(operation)
request_parser(doc)
else:
try:
jdoc = json.loads(self.http_request.get_data())
except Exception as e:
raise NoApplicableCode(str(e))
if self.identifier is not None:
jdoc = {'inputs': jdoc}
else:
self.identifier = jdoc.get('identifier', None)
self.operation = jdoc.get('operation', self.operation)
preprocessor_tuple = self.preprocessors.get(self.identifier, None)
if preprocessor_tuple:
self.identifier = preprocessor_tuple[0]
self.preprocess_request = preprocessor_tuple[1]
self.preprocess_response = preprocessor_tuple[2]
jdoc['operation'] = self.operation
jdoc['identifier'] = self.identifier
jdoc['api'] = self.api
jdoc['default_mimetype'] = self.default_mimetype
if self.preprocess_request is not None:
jdoc = self.preprocess_request(jdoc, http_request=self.http_request)
self.json = jdoc
version = jdoc.get('version')
self.set_version(version)
language = jdoc.get('language')
self.check_and_set_language(language)
request_parser = self._post_json_request_parser()
request_parser(jdoc)
def _get_request_parser(self, operation):
"""Factory function returing propper parsing function
"""
wpsrequest = self
def parse_get_getcapabilities(http_request):
"""Parse GET GetCapabilities request
"""
acceptedversions = _get_get_param(http_request, 'acceptversions')
wpsrequest.check_accepted_versions(acceptedversions)
wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype)
def parse_get_describeprocess(http_request):
"""Parse GET DescribeProcess request
"""
version = _get_get_param(http_request, 'version')
wpsrequest.check_and_set_version(version)
wpsrequest.identifiers = _get_get_param(
http_request, 'identifier', wpsrequest.identifiers, aslist=True)
if wpsrequest.identifiers is None and self.identifier is not None:
wpsrequest.identifiers = [wpsrequest.identifier]
wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype)
def parse_get_execute(http_request):
"""Parse GET Execute request
"""
version = _get_get_param(http_request, 'version')
wpsrequest.check_and_set_version(version)
wpsrequest.identifier = _get_get_param(http_request, 'identifier', wpsrequest.identifier)
wpsrequest.store_execute = _get_get_param(
http_request, 'storeExecuteResponse', 'false')
wpsrequest.status = _get_get_param(http_request, 'status', 'false')
wpsrequest.lineage = _get_get_param(
http_request, 'lineage', 'false')
wpsrequest.inputs = get_data_from_kvp(
_get_get_param(http_request, 'DataInputs'), 'DataInputs')
if self.inputs is None:
self.inputs = {}
# take responseDocument preferably
raw, output_ids = False, _get_get_param(http_request, 'ResponseDocument')
if output_ids is None:
raw, output_ids = True, _get_get_param(http_request, 'RawDataOutput')
if output_ids is not None:
wpsrequest.raw, wpsrequest.output_ids = raw, output_ids
elif wpsrequest.raw is None:
wpsrequest.raw = wpsrequest.output_ids is not None
wpsrequest.default_mimetype = _get_get_param(http_request, 'f', wpsrequest.default_mimetype)
wpsrequest.outputs = get_data_from_kvp(wpsrequest.output_ids) or {}
if wpsrequest.raw:
# executeResponse XML will not be stored and no updating of
# status
wpsrequest.store_execute = 'false'
wpsrequest.status = 'false'
if operation:
self.operation = operation.lower()
else:
if wps_strict:
raise MissingParameterValue('Missing request value', 'request')
self.operation = 'execute'
if self.operation == 'getcapabilities':
return parse_get_getcapabilities
elif self.operation == 'describeprocess':
return parse_get_describeprocess
elif self.operation == 'execute':
return parse_get_execute
else:
raise OperationNotSupported(
'Unknown request {}'.format(self.operation), operation)
def _post_request_parser(self, tagname):
"""Factory function returning a proper parsing function
according to tagname and sets self.operation to the correct operation
"""
wpsrequest = self
def parse_post_getcapabilities(doc):
"""Parse POST GetCapabilities request
"""
acceptedversions = self.xpath_ns(
doc, '/wps:GetCapabilities/ows:AcceptVersions/ows:Version')
acceptedversions = ','.join(
[v.text for v in acceptedversions])
wpsrequest.check_accepted_versions(acceptedversions)
def parse_post_describeprocess(doc):
"""Parse POST DescribeProcess request
"""
version = doc.attrib.get('version')
wpsrequest.check_and_set_version(version)
wpsrequest.operation = 'describeprocess'
wpsrequest.identifiers = [identifier_el.text for identifier_el in
self.xpath_ns(doc, './ows:Identifier')]
def parse_post_execute(doc):
"""Parse POST Execute request
"""
version = doc.attrib.get('version')
wpsrequest.check_and_set_version(version)
wpsrequest.operation = 'execute'
identifier = self.xpath_ns(doc, './ows:Identifier')
if not identifier:
raise MissingParameterValue(
'Process identifier not set', 'Identifier')
wpsrequest.identifier = identifier[0].text
wpsrequest.lineage = 'false'
wpsrequest.store_execute = 'false'
wpsrequest.status = 'false'
wpsrequest.inputs = get_inputs_from_xml(doc)
wpsrequest.outputs = get_output_from_xml(doc)
wpsrequest.raw = False
if self.xpath_ns(doc, '/wps:Execute/wps:ResponseForm/wps:RawDataOutput'):
wpsrequest.raw = True
# executeResponse XML will not be stored
wpsrequest.store_execute = 'false'
# check if response document tag has been set then retrieve
response_document = self.xpath_ns(
doc, './wps:ResponseForm/wps:ResponseDocument')
if len(response_document) > 0:
wpsrequest.lineage = response_document[
0].attrib.get('lineage', 'false')
wpsrequest.store_execute = response_document[
0].attrib.get('storeExecuteResponse', 'false')
wpsrequest.status = response_document[
0].attrib.get('status', 'false')
if tagname == self.WPS.GetCapabilities().tag:
self.operation = 'getcapabilities'
return parse_post_getcapabilities
elif tagname == self.WPS.DescribeProcess().tag:
self.operation = 'describeprocess'
return parse_post_describeprocess
elif tagname == self.WPS.Execute().tag:
self.operation = 'execute'
return parse_post_execute
else:
raise InvalidParameterValue(
'Unknown request {}'.format(tagname), 'request')
def _post_json_request_parser(self):
"""
Factory function returning a proper parsing function
according to self.operation.
self.operation is modified to be lowercase
or the default 'execute' operation if self.operation is None
"""
wpsrequest = self
def parse_json_post_getcapabilities(jdoc):
"""Parse POST GetCapabilities request
"""
acceptedversions = jdoc.get('acceptedversions')
wpsrequest.check_accepted_versions(acceptedversions)
def parse_json_post_describeprocess(jdoc):
"""Parse POST DescribeProcess request
"""
version = jdoc.get('version')
wpsrequest.check_and_set_version(version)
wpsrequest.identifiers = [identifier_el.text for identifier_el in
self.xpath_ns(jdoc, './ows:Identifier')]
def parse_json_post_execute(jdoc):
"""Parse POST Execute request
"""
version = jdoc.get('version')
wpsrequest.check_and_set_version(version)
wpsrequest.identifier = jdoc.get('identifier')
if wpsrequest.identifier is None:
raise MissingParameterValue(
'Process identifier not set', 'Identifier')
wpsrequest.lineage = 'false'
wpsrequest.store_execute = 'false'
wpsrequest.status = 'false'
wpsrequest.inputs = get_inputs_from_json(jdoc)
if wpsrequest.output_ids is None:
wpsrequest.output_ids = jdoc.get('outputs', {})
wpsrequest.raw = jdoc.get('raw', False)
wpsrequest.raw, wpsrequest.outputs = get_output_from_dict(wpsrequest.output_ids, wpsrequest.raw)
if wpsrequest.raw:
# executeResponse XML will not be stored
wpsrequest.store_execute = 'false'
# todo: parse response_document like in the xml version?
self.operation = 'execute' if self.operation is None else self.operation.lower()
if self.operation == 'getcapabilities':
return parse_json_post_getcapabilities
elif self.operation == 'describeprocess':
return parse_json_post_describeprocess
elif self.operation == 'execute':
return parse_json_post_execute
else:
raise InvalidParameterValue(
'Unknown request {}'.format(self.operation), 'request')
def set_version(self, version):
self.version = version
self.xpath_ns = get_xpath_ns(version)
self.WPS, self.OWS = get_ElementMakerForVersion(self.version)
[文档] def check_accepted_versions(self, acceptedversions):
"""
:param acceptedversions: string
"""
version = None
if acceptedversions:
acceptedversions_array = acceptedversions.split(',')
for aversion in acceptedversions_array:
if _check_version(aversion):
version = aversion
else:
version = '1.0.0'
if version:
self.check_and_set_version(version)
else:
raise VersionNegotiationFailed(
'The requested version "{}" is not supported by this server'.format(acceptedversions), 'version')
[文档] def check_and_set_version(self, version, allow_default=True):
"""set this.version
"""
if not version:
if allow_default:
version = default_version
else:
raise MissingParameterValue('Missing version', 'version')
if not _check_version(version):
raise VersionNegotiationFailed(
'The requested version "{}" is not supported by this server'.format(version), 'version')
else:
self.set_version(version)
[文档] def check_and_set_language(self, language):
"""set this.language
"""
supported_languages = configuration.get_config_value('server', 'language').split(',')
supported_languages = [lang.strip() for lang in supported_languages]
if not language:
# default to the first supported language
language = supported_languages[0]
if language not in supported_languages:
raise InvalidParameterValue(
'The requested language "{}" is not supported by this server'.format(language),
'language',
)
self.language = language
@property
def json(self):
"""Return JSON encoded representation of the request
"""
class ExtendedJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.date) or isinstance(obj, datetime.time):
encoded_object = obj.isoformat()
else:
encoded_object = json.JSONEncoder.default(self, obj)
return encoded_object
obj = {
'operation': self.operation,
'version': self.version,
'api': self.api,
'default_mimetype': self.default_mimetype,
'language': self.language,
'identifier': self.identifier,
'identifiers': self.identifiers,
'store_execute': self.store_execute,
'status': self.status,
'lineage': self.lineage,
'inputs': dict((i, [inpt.json for inpt in self.inputs[i]]) for i in self.inputs),
'outputs': self.outputs,
'raw': self.raw
}
return json.dumps(obj, allow_nan=False, cls=ExtendedJSONEncoder)
@json.setter
def json(self, value):
"""init this request from json back again
:param value: the json (not string) representation
"""
self.operation = value.get('operation')
self.version = value.get('version')
self.api = value.get('api')
self.default_mimetype = value.get('default_mimetype')
self.language = value.get('language')
self.identifier = value.get('identifier')
self.identifiers = value.get('identifiers')
self.store_execute = value.get('store_execute')
self.status = value.get('status', False)
self.lineage = value.get('lineage', False)
self.outputs = value.get('outputs')
self.raw = value.get('raw', False)
self.inputs = {}
for identifier in value.get('inputs', []):
inpt_defs = value['inputs'][identifier]
if not isinstance(inpt_defs, (list, tuple)):
inpt_defs = [inpt_defs]
self.inputs[identifier] = []
for inpt_def in inpt_defs:
if not isinstance(inpt_def, dict):
inpt_def = {"data": inpt_def}
if 'identifier' not in inpt_def:
inpt_def['identifier'] = identifier
try:
inpt = input_from_json(inpt_def)
self.inputs[identifier].append(inpt)
except Exception as e:
LOGGER.warning(e)
LOGGER.warning(f'skipping input: {identifier}')
pass
def get_inputs_from_xml(doc):
the_inputs = {}
version = get_version_from_ns(doc.nsmap[doc.prefix])
xpath_ns = get_xpath_ns(version)
for input_el in xpath_ns(doc, '/wps:Execute/wps:DataInputs/wps:Input'):
[identifier_el] = xpath_ns(input_el, './ows:Identifier')
identifier = identifier_el.text
if identifier not in the_inputs:
the_inputs[identifier] = []
literal_data = xpath_ns(input_el, './wps:Data/wps:LiteralData')
if literal_data:
value_el = literal_data[0]
inpt = {}
inpt['identifier'] = identifier_el.text
inpt['data'] = str(value_el.text)
inpt['uom'] = value_el.attrib.get('uom', '')
inpt['datatype'] = value_el.attrib.get('datatype', '')
the_inputs[identifier].append(inpt)
continue
complex_data = xpath_ns(input_el, './wps:Data/wps:ComplexData')
if complex_data:
complex_data_el = complex_data[0]
inpt = {}
inpt['identifier'] = identifier_el.text
inpt['mimeType'] = complex_data_el.attrib.get('mimeType', None)
inpt['encoding'] = complex_data_el.attrib.get('encoding', '').lower()
inpt['schema'] = complex_data_el.attrib.get('schema', '')
inpt['method'] = complex_data_el.attrib.get('method', 'GET')
if len(complex_data_el.getchildren()) > 0:
value_el = complex_data_el[0]
inpt['data'] = _get_dataelement_value(value_el)
else:
inpt['data'] = _get_rawvalue_value(
complex_data_el.text, inpt['encoding'])
the_inputs[identifier].append(inpt)
continue
reference_data = xpath_ns(input_el, './wps:Reference')
if reference_data:
reference_data_el = reference_data[0]
inpt = {}
inpt['identifier'] = identifier_el.text
inpt[identifier_el.text] = reference_data_el.text
inpt['href'] = reference_data_el.attrib.get(
'{http://www.w3.org/1999/xlink}href', '')
inpt['mimeType'] = reference_data_el.attrib.get('mimeType', None)
inpt['method'] = reference_data_el.attrib.get('method', 'GET')
header_element = xpath_ns(reference_data_el, './wps:Header')
if header_element:
inpt['header'] = _get_reference_header(header_element)
body_element = xpath_ns(reference_data_el, './wps:Body')
if body_element:
inpt['body'] = _get_reference_body(body_element[0])
bodyreference_element = xpath_ns(reference_data_el,
'./wps:BodyReference')
if bodyreference_element:
inpt['bodyreference'] = _get_reference_bodyreference(
bodyreference_element[0])
the_inputs[identifier].append(inpt)
continue
# Using OWSlib BoundingBox
from owslib.ows import BoundingBox
bbox_datas = xpath_ns(input_el, './wps:Data/wps:BoundingBoxData')
if bbox_datas:
for bbox_data in bbox_datas:
bbox = BoundingBox(bbox_data)
LOGGER.debug("parse bbox: minx={}, miny={}, maxx={},maxy={}".format(
bbox.minx, bbox.miny, bbox.maxx, bbox.maxy))
inpt = {}
inpt['identifier'] = identifier_el.text
inpt['data'] = [bbox.minx, bbox.miny, bbox.maxx, bbox.maxy]
inpt['crs'] = bbox.crs.getcodeurn() if bbox.crs else None
inpt['dimensions'] = bbox.dimensions
the_inputs[identifier].append(inpt)
return the_inputs
def get_output_from_xml(doc):
the_output = {}
version = get_version_from_ns(doc.nsmap[doc.prefix])
xpath_ns = get_xpath_ns(version)
if xpath_ns(doc, '/wps:Execute/wps:ResponseForm/wps:ResponseDocument'):
for output_el in xpath_ns(doc, '/wps:Execute/wps:ResponseForm/wps:ResponseDocument/wps:Output'):
[identifier_el] = xpath_ns(output_el, './ows:Identifier')
outpt = {}
outpt[identifier_el.text] = ''
outpt['mimetype'] = output_el.attrib.get('mimeType', None)
outpt['encoding'] = output_el.attrib.get('encoding', '')
outpt['schema'] = output_el.attrib.get('schema', '')
outpt['uom'] = output_el.attrib.get('uom', '')
outpt['asReference'] = output_el.attrib.get('asReference', 'false')
the_output[identifier_el.text] = outpt
elif xpath_ns(doc, '/wps:Execute/wps:ResponseForm/wps:RawDataOutput'):
for output_el in xpath_ns(doc, '/wps:Execute/wps:ResponseForm/wps:RawDataOutput'):
[identifier_el] = xpath_ns(output_el, './ows:Identifier')
outpt = {}
outpt[identifier_el.text] = ''
outpt['mimetype'] = output_el.attrib.get('mimeType', None)
outpt['encoding'] = output_el.attrib.get('encoding', '')
outpt['schema'] = output_el.attrib.get('schema', '')
outpt['uom'] = output_el.attrib.get('uom', '')
the_output[identifier_el.text] = outpt
return the_output
def get_inputs_from_json(jdoc):
the_inputs = {}
inputs_dict = jdoc.get('inputs', {})
for identifier, inpt_defs in inputs_dict.items():
if not isinstance(inpt_defs, (list, tuple)):
inpt_defs = [inpt_defs]
the_inputs[identifier] = []
for inpt_def in inpt_defs:
if not isinstance(inpt_def, dict):
inpt_def = {"data": inpt_def}
data_type = inpt_def.get('type', 'literal')
inpt = {'identifier': identifier}
if data_type == 'literal':
inpt['data'] = inpt_def.get('data')
inpt['uom'] = inpt_def.get('uom', '')
inpt['datatype'] = inpt_def.get('datatype', '')
the_inputs[identifier].append(inpt)
elif data_type == 'complex':
inpt['mimeType'] = inpt_def.get('mimeType', None)
inpt['encoding'] = inpt_def.get('encoding', '').lower()
inpt['schema'] = inpt_def.get('schema', '')
inpt['method'] = inpt_def.get('method', 'GET')
inpt['data'] = _get_rawvalue_value(inpt_def.get('data', ''), inpt['encoding'])
the_inputs[identifier].append(inpt)
elif data_type == 'reference':
inpt[identifier] = inpt_def
inpt['href'] = inpt_def.get('href', '')
inpt['mimeType'] = inpt_def.get('mimeType', None)
inpt['method'] = inpt_def.get('method', 'GET')
inpt['header'] = inpt_def.get('header', '')
inpt['body'] = inpt_def.get('body', '')
inpt['bodyreference'] = inpt_def.get('bodyreference', '')
the_inputs[identifier].append(inpt)
elif data_type == 'bbox':
inpt['data'] = inpt_def['bbox']
inpt['crs'] = inpt_def.get('crs', 'urn:ogc:def:crs:EPSG::4326')
inpt['dimensions'] = inpt_def.get('dimensions', 2)
the_inputs[identifier].append(inpt)
return the_inputs
def get_output_from_dict(output_ids, raw):
the_output = {}
if isinstance(output_ids, dict):
pass
elif isinstance(output_ids, (tuple, list)):
output_ids = {x: {} for x in output_ids}
else:
output_ids = {output_ids: {}}
raw = True # single non-dict output means raw output
for identifier, output_el in output_ids.items():
if isinstance(output_el, list):
output_el = output_el[0]
outpt = {}
outpt[identifier] = ''
outpt['mimetype'] = output_el.get('mimeType', None)
outpt['encoding'] = output_el.get('encoding', '')
outpt['schema'] = output_el.get('schema', '')
outpt['uom'] = output_el.get('uom', '')
if not raw:
outpt['asReference'] = output_el.get('asReference', 'false')
the_output[identifier] = outpt
return raw, the_output
def get_data_from_kvp(data, part=None):
"""Get execute DataInputs and ResponseDocument from URL (key-value-pairs) encoding
:param data: key:value pair list of the datainputs and responseDocument parameter
:param part: DataInputs or similar part of input url
"""
the_data = {}
if data is None:
return None
for d in data.split(";"):
try:
io = {}
fields = d.split('@')
# First field is identifier and its value
(identifier, val) = fields[0].split("=")
io['identifier'] = identifier
io['data'] = unquote(val)
# Get the attributes of the data
for attr in fields[1:]:
(attribute, attr_val) = attr.split('=', 1)
if attribute == 'xlink:href':
io['href'] = unquote(attr_val)
else:
io[attribute] = unquote(attr_val)
# Add the input/output with all its attributes and values to the
# dictionary
if part == 'DataInputs':
if identifier not in the_data:
the_data[identifier] = []
the_data[identifier].append(io)
else:
the_data[identifier] = io
except Exception as e:
LOGGER.warning(e)
the_data[d] = {'identifier': d, 'data': ''}
return the_data
def _check_version(version):
""" check given version
"""
if version not in ['1.0.0', '2.0.0']:
return False
else:
return True
def _get_get_param(http_request, key, default=None, aslist=False):
"""Returns value from the key:value pair, of the HTTP GET request, for
example 'service' or 'request'
:param http_request: http_request object
:param key: key value you need to dig out of the HTTP GET request
"""
key = key.lower()
value = default
# http_request.args.keys will make + sign disappear in GET url if not
# urlencoded
for k in http_request.args.keys():
if k.lower() == key:
value = http_request.args.get(k)
if aslist:
value = value.split(",")
return value
def _get_dataelement_value(value_el):
"""Return real value of XML Element (e.g. convert Element.FeatureCollection
to String
"""
if isinstance(value_el, lxml.etree._Element):
return etree.tostring(value_el, encoding=str)
else:
return value_el
def _get_rawvalue_value(data, encoding=None):
"""Return real value of CDATA section"""
try:
LOGGER.debug("encoding={}".format(encoding))
if encoding is None or encoding == "":
return data
elif encoding == "utf-8":
return data
elif encoding == 'base64':
return base64.b64decode(data)
return base64.b64decode(data)
except Exception:
LOGGER.warning("failed to decode base64")
return data
def _get_reference_header(header_element):
"""Parses ReferenceInput Header element
"""
header = {}
header['key'] = header_element.attrib('key')
header['value'] = header_element.attrib('value')
return header
def _get_reference_body(body_element):
"""Parses ReferenceInput Body element
"""
body = None
if len(body_element.getchildren()) > 0:
value_el = body_element[0]
body = _get_dataelement_value(value_el)
else:
body = _get_rawvalue_value(body_element.text)
return body
def _get_reference_bodyreference(referencebody_element):
"""Parse ReferenceInput BodyReference element
"""
return referencebody_element.attrib.get(
'{http://www.w3.org/1999/xlink}href', '')