pygeoapi.provider.postgresql 源代码

# =================================================================
#
# Authors: Jorge Samuel Mendes de Jesus <jorge.dejesus@protonmail.com>
#          Tom Kralidis <tomkralidis@gmail.com>
#          Mary Bucknell <mbucknell@usgs.gov>
#          John A Stevenson <jostev@bgs.ac.uk>
#          Colin Blackburn <colb@bgs.ac.uk>
#          Francesco Bartoli <xbartolone@gmail.com>
#
# Copyright (c) 2018 Jorge Samuel Mendes de Jesus
# Copyright (c) 2023 Tom Kralidis
# Copyright (c) 2022 John A Stevenson and Colin Blackburn
# Copyright (c) 2023 Francesco Bartoli
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

# Testing local docker:
# docker run --name "postgis" \
# -v postgres_data:/var/lib/postgresql -p 5432:5432 \
# -e ALLOW_IP_RANGE=0.0.0.0/0 \
# -e POSTGRES_USER=postgres \
# -e POSTGRES_PASS=postgres \
# -e POSTGRES_DBNAME=test \
# -d -t kartoza/postgis

# Import dump:
# gunzip < tests/data/hotosm_bdi_waterways.sql.gz |
#  psql -U postgres -h 127.0.0.1 -p 5432 test

import logging

from copy import deepcopy
from geoalchemy2 import Geometry  # noqa - this isn't used explicitly but is needed to process Geometry columns
from geoalchemy2.functions import ST_MakeEnvelope
from geoalchemy2.shape import to_shape
from pygeofilter.backends.sqlalchemy.evaluate import to_filter
import pyproj
import shapely
from sqlalchemy import create_engine, MetaData, PrimaryKeyConstraint, asc, desc
from sqlalchemy.engine import URL
from sqlalchemy.exc import InvalidRequestError, OperationalError
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session, load_only
from sqlalchemy.sql.expression import and_

from pygeoapi.provider.base import BaseProvider, \
    ProviderConnectionError, ProviderQueryError, ProviderItemNotFoundError
from pygeoapi.util import get_transform_from_crs


_ENGINE_STORE = {}
_TABLE_MODEL_STORE = {}
LOGGER = logging.getLogger(__name__)


[文档]class PostgreSQLProvider(BaseProvider): """Generic provider for Postgresql based on psycopg2 using sync approach and server side cursor (using support class DatabaseCursor) """ def __init__(self, provider_def): """ PostgreSQLProvider Class constructor :param provider_def: provider definitions from yml pygeoapi-config. data,id_field, name set in parent class data contains the connection information for class DatabaseCursor :returns: pygeoapi.provider.base.PostgreSQLProvider """ LOGGER.debug('Initialising PostgreSQL provider.') super().__init__(provider_def) self.table = provider_def['table'] self.id_field = provider_def['id_field'] self.geom = provider_def.get('geom_field', 'geom') LOGGER.debug(f'Name: {self.name}') LOGGER.debug(f'Table: {self.table}') LOGGER.debug(f'ID field: {self.id_field}') LOGGER.debug(f'Geometry field: {self.geom}') # Read table information from database options = None if provider_def.get('options'): options = provider_def['options'] self._store_db_parameters(provider_def['data'], options) self._engine, self.table_model = self._get_engine_and_table_model() LOGGER.debug(f'DB connection: {repr(self._engine.url)}') self.fields = self.get_fields()
[文档] def query(self, offset=0, limit=10, resulttype='results', bbox=[], datetime_=None, properties=[], sortby=[], select_properties=[], skip_geometry=False, q=None, filterq=None, crs_transform_spec=None, **kwargs): """ Query Postgis for all the content. e,g: http://localhost:5000/collections/hotosm_bdi_waterways/items? limit=1&resulttype=results :param offset: starting record to return (default 0) :param limit: number of records to return (default 10) :param resulttype: return results or hit limit (default results) :param bbox: bounding box [minx,miny,maxx,maxy] :param datetime_: temporal (datestamp or extent) :param properties: list of tuples (name, value) :param sortby: list of dicts (property, order) :param select_properties: list of property names :param skip_geometry: bool of whether to skip geometry (default False) :param q: full-text search term(s) :param filterq: CQL query as text string :param crs_transform_spec: `CrsTransformSpec` instance, optional :returns: GeoJSON FeatureCollection """ LOGGER.debug('Preparing filters') property_filters = self._get_property_filters(properties) cql_filters = self._get_cql_filters(filterq) bbox_filter = self._get_bbox_filter(bbox) order_by_clauses = self._get_order_by_clauses(sortby, self.table_model) selected_properties = self._select_properties_clause(select_properties, skip_geometry) LOGGER.debug('Querying PostGIS') # Execute query within self-closing database Session context with Session(self._engine) as session: results = (session.query(self.table_model) .filter(property_filters) .filter(cql_filters) .filter(bbox_filter) .order_by(*order_by_clauses) .options(selected_properties) .offset(offset)) matched = results.count() if limit < matched: returned = limit else: returned = matched LOGGER.debug(f'Found {matched} result(s)') LOGGER.debug('Preparing response') response = { 'type': 'FeatureCollection', 'features': [], 'numberMatched': matched, 'numberReturned': returned } if resulttype == "hits" or not results: response['numberReturned'] = 0 return response crs_transform_out = self._get_crs_transform(crs_transform_spec) for item in results.limit(limit): response['features'].append( self._sqlalchemy_to_feature(item, crs_transform_out) ) return response
[文档] def get_fields(self): """ Return fields (columns) from PostgreSQL table :returns: dict of fields """ LOGGER.debug('Get available fields/properties') # sql-schema only allows these types, so we need to map from sqlalchemy # string, number, integer, object, array, boolean, null, # https://json-schema.org/understanding-json-schema/reference/type.html column_type_map = { str: 'string', float: 'number', int: 'integer', bool: 'boolean', } default_value = 'string' def _column_type_to_json_schema_type(column_type): try: python_type = column_type.python_type except NotImplementedError: LOGGER.warning(f'Unsupported column type {column_type}') return default_value else: try: return column_type_map[python_type] except KeyError: LOGGER.warning(f'Unsupported column type {column_type}') return default_value return { str(column.name): { 'type': _column_type_to_json_schema_type(column.type) } for column in self.table_model.__table__.columns if column.name != self.geom # Exclude geometry column }
[文档] def get(self, identifier, crs_transform_spec=None, **kwargs): """ Query the provider for a specific feature id e.g: /collections/hotosm_bdi_waterways/items/13990765 :param identifier: feature id :param crs_transform_spec: `CrsTransformSpec` instance, optional :returns: GeoJSON FeatureCollection """ LOGGER.debug(f'Get item by ID: {identifier}') # Execute query within self-closing database Session context with Session(self._engine) as session: # Retrieve data from database as feature query = session.query(self.table_model) item = query.get(identifier) if item is None: msg = f"No such item: {self.id_field}={identifier}." raise ProviderItemNotFoundError(msg) crs_transform_out = self._get_crs_transform(crs_transform_spec) feature = self._sqlalchemy_to_feature(item, crs_transform_out) # Drop non-defined properties if self.properties: props = feature['properties'] dropping_keys = deepcopy(props).keys() for item in dropping_keys: if item not in self.properties: props.pop(item) # Add fields for previous and next items id_field = getattr(self.table_model, self.id_field) prev_item = (session.query(self.table_model) .order_by(id_field.desc()) .filter(id_field < identifier) .first()) next_item = (session.query(self.table_model) .order_by(id_field.asc()) .filter(id_field > identifier) .first()) feature['prev'] = (getattr(prev_item, self.id_field) if prev_item is not None else identifier) feature['next'] = (getattr(next_item, self.id_field) if next_item is not None else identifier) return feature
def _store_db_parameters(self, parameters, options): self.db_user = parameters.get('user') self.db_host = parameters.get('host') self.db_port = parameters.get('port', 5432) self.db_name = parameters.get('dbname') self.db_search_path = parameters.get('search_path', ['public']) self._db_password = parameters.get('password') self.db_options = options
[文档] def _get_engine_and_table_model(self): """ Create a SQL Alchemy engine for the database and reflect the table model. Use existing versions from stores if available to allow reuse of Engine connection pool and save expensive table reflection. """ # One long-lived engine is used per database URL: # https://docs.sqlalchemy.org/en/14/core/connections.html#basic-usage engine_store_key = (self.db_user, self.db_host, self.db_port, self.db_name) try: engine = _ENGINE_STORE[engine_store_key] except KeyError: conn_str = URL.create( 'postgresql+psycopg2', username=self.db_user, password=self._db_password, host=self.db_host, port=self.db_port, database=self.db_name ) conn_args = { 'client_encoding': 'utf8', 'application_name': 'pygeoapi' } if self.db_options: conn_args.update(self.db_options) engine = create_engine( conn_str, connect_args=conn_args, pool_pre_ping=True) _ENGINE_STORE[engine_store_key] = engine # Reuse table model if one exists table_model_store_key = (self.db_host, self.db_port, self.db_name, self.table) try: table_model = _TABLE_MODEL_STORE[table_model_store_key] except KeyError: table_model = self._reflect_table_model(engine) _TABLE_MODEL_STORE[table_model_store_key] = table_model return engine, table_model
[文档] def _reflect_table_model(self, engine): """ Reflect database metadata to create a SQL Alchemy model corresponding to target table. This requires a database query and is expensive to perform. """ metadata = MetaData(engine) # Look for table in the first schema in the search path try: schema = self.db_search_path[0] metadata.reflect(schema=schema, only=[self.table], views=True) except OperationalError: msg = (f"Could not connect to {repr(engine.url)} " "(password hidden).") raise ProviderConnectionError(msg) except InvalidRequestError: msg = (f"Table '{self.table}' not found in schema '{schema}' " f"on {repr(engine.url)}.") raise ProviderQueryError(msg) # Create SQLAlchemy model from reflected table # It is necessary to add the primary key constraint because SQLAlchemy # requires it to reflect the table, but a view in a PostgreSQL database # does not have a primary key defined. sqlalchemy_table_def = metadata.tables[f'{schema}.{self.table}'] try: sqlalchemy_table_def.append_constraint( PrimaryKeyConstraint(self.id_field) ) except KeyError: msg = (f"No such id_field column ({self.id_field}) on " f"{schema}.{self.table}.") raise ProviderQueryError(msg) Base = automap_base(metadata=metadata) Base.prepare( name_for_scalar_relationship=self._name_for_scalar_relationship, ) TableModel = getattr(Base.classes, self.table) return TableModel
[文档] @staticmethod def _name_for_scalar_relationship( base, local_cls, referred_cls, constraint, ): """Function used when automapping classes and relationships from database schema and fixes potential naming conflicts. """ name = referred_cls.__name__.lower() local_table = local_cls.__table__ if name in local_table.columns: newname = name + '_' LOGGER.debug( f'Already detected column name {name!r} in table ' f'{local_table!r}. Using {newname!r} for relationship name.' ) return newname return name
def _sqlalchemy_to_feature(self, item, crs_transform_out=None): feature = { 'type': 'Feature' } # Add properties from item item_dict = item.__dict__ item_dict.pop('_sa_instance_state') # Internal SQLAlchemy metadata feature['properties'] = item_dict feature['id'] = item_dict.pop(self.id_field) # Convert geometry to GeoJSON style if feature['properties'].get(self.geom): wkb_geom = feature['properties'].pop(self.geom) shapely_geom = to_shape(wkb_geom) if crs_transform_out is not None: shapely_geom = crs_transform_out(shapely_geom) geojson_geom = shapely.geometry.mapping(shapely_geom) feature['geometry'] = geojson_geom else: feature['geometry'] = None return feature def _get_order_by_clauses(self, sort_by, table_model): # Build sort_by clauses if provided clauses = [] for sort_by_dict in sort_by: model_column = getattr(table_model, sort_by_dict['property']) order_function = asc if sort_by_dict['order'] == '+' else desc clauses.append(order_function(model_column)) # Otherwise sort by primary key (to ensure reproducible output) if not clauses: clauses.append(asc(getattr(table_model, self.id_field))) return clauses def _get_cql_filters(self, filterq): if not filterq: return True # Let everything through # Convert filterq into SQL Alchemy filters field_mapping = { column_name: getattr(self.table_model, column_name) for column_name in self.table_model.__table__.columns.keys()} cql_filters = to_filter(filterq, field_mapping) return cql_filters def _get_property_filters(self, properties): if not properties: return True # Let everything through # Convert property filters into SQL Alchemy filters # Based on https://stackoverflow.com/a/14887813/3508733 filter_group = [] for column_name, value in properties: column = getattr(self.table_model, column_name) filter_group.append(column == value) property_filters = and_(*filter_group) return property_filters def _get_bbox_filter(self, bbox): if not bbox: return True # Let everything through # Convert bbx to SQL Alchemy clauses envelope = ST_MakeEnvelope(*bbox) geom_column = getattr(self.table_model, self.geom) bbox_filter = geom_column.intersects(envelope) return bbox_filter def _select_properties_clause(self, select_properties, skip_geometry): # List the column names that we want if select_properties: column_names = set(select_properties) else: # get_fields() doesn't include geometry column column_names = set(self.fields.keys()) if self.properties: # optional subset of properties defined in config properties_from_config = set(self.properties) column_names = column_names.intersection(properties_from_config) if not skip_geometry: column_names.add(self.geom) # Convert names to SQL Alchemy clause selected_columns = [] for column_name in column_names: try: column = getattr(self.table_model, column_name) selected_columns.append(column) except AttributeError: pass # Ignore non-existent columns selected_properties_clause = load_only(*selected_columns) return selected_properties_clause def _get_crs_transform(self, crs_transform_spec=None): if crs_transform_spec is not None: crs_transform = get_transform_from_crs( pyproj.CRS.from_wkt(crs_transform_spec.source_crs_wkt), pyproj.CRS.from_wkt(crs_transform_spec.target_crs_wkt), ) else: crs_transform = None return crs_transform