11.5. SparkSQL

GeoMesa SparkSQL支持建立在 DataSet/DataFrame Spark SQL模块中提供的API可提供地理空间功能。这包括自定义地理空间数据类型和函数、创建 DataFrame 来自GeoTools DataStore ,以及用于提高SQL查询性能的优化。

GeoMesa SparkSQL代码由 geomesa-spark-sql 模块:

<dependency>
  <groupId>org.locationtech.geomesa</groupId>
  <artifactId>geomesa-spark-sql_2.12</artifactId>
  // version, etc.
</dependency>

11.5.1. 示例

以下是通过SparkSQL连接到GeoMesa Acumulo的Scala示例:

// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
  "accumulo.instance.name" -> "instance",
  "accumulo.zookeepers"    -> "zoo1,zoo2,zoo3",
  "accumulo.user"          -> "user",
  "accumulo.password"      -> "*****",
  "accumulo.catalog"       -> "geomesa_catalog",
  "geomesa.security.auths" -> "USER,ADMIN")

// Create SparkSession
val sparkSession = SparkSession.builder()
  .appName("testSpark")
  .config("spark.sql.crossJoin.enabled", "true")
  .master("local[*]")
  .getOrCreate()

// Create DataFrame using the "geomesa" format
val dataFrame = sparkSession.read
  .format("geomesa")
  .options(dsParams)
  .option("geomesa.feature", "chicago")
  .load()
dataFrame.createOrReplaceTempView("chicago")

// Query against the "chicago" schema
val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)

resultDataFrame.show
/*
+-------+------+-----------+--------------------+-----------------+
|__fid__|arrest|case_number|                 dtg|             geom|
+-------+------+-----------+--------------------+-----------------+
|      4|  true|          4|2016-01-04 00:00:...|POINT (76.5 38.5)|
|      5|  true|          5|2016-01-05 00:00:...|    POINT (77 38)|
|      6|  true|          6|2016-01-06 00:00:...|    POINT (78 39)|
|      7|  true|          7|2016-01-07 00:00:...|    POINT (20 20)|
|      9|  true|          9|2016-01-09 00:00:...|    POINT (50 50)|
+-------+------+-----------+--------------------+-----------------+
*/

11.5.2. 配置

因为GeoMesa SparkSQL堆叠在 geomesa-spark-core 模块,一个或多个 SpatialRDDProvider 实现必须包含在类路径中。看见 配置 有关设置Spark类路径的详细信息,请参见。

备注

在大多数情况下,不需要设置中所述的Kryo序列化 简单功能序列化 在使用SparkSQL时。但是,在使用 GeoTools RDD提供程序

如果你愿意的话 JOIN -ING多个 DataFrame S在一起,将需要添加 spark.sql.crossJoin.enabled 属性时创建 SparkSession 对象:

val spark = SparkSession.builder().
   // ...
   config("spark.sql.crossJoin.enabled", "true").
   // ...
   getOrCreate()

警告

交叉联接的效率可能非常非常低。注意确保联接的一个或两个数据集非常小,并考虑使用 broadcast() 方法以确保至少一个 DataFrame 加入是在记忆中。

11.5.3. 用法

创建启用了SQL的GeoMesa SparkSQL DataFrame 使用与特定要素类型对应的数据,请执行以下操作:

// dsParams contains the parameters to pass to the data store
val dataFrame = sparkSession.read
  .format("geomesa")
  .options(dsParams)
  .option("geomesa.feature", typeName)
  .load()

具体地说,调用 format("geomesa") 注册GeoMesa SparkSQL数据源,并 option("geomesa.feature", typeName) 告知GeoMesa使用名为的要素类型 typeName 。这还注册了在GeoMesa SparkSQL中实现的自定义用户定义类型和函数。

通过注册一个 DataFrame 作为临时视图,可以在后续的SQL调用中访问此数据框。例如:

dataFrame.createOrReplaceTempView("chicago")

可以通过别名“Chicago”调用此数据框:

val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)

注册用户定义的类型和函数也可以通过调用 SQLTypes.init()SQLContext Spark会话的对象:

SQLTypes.init(sparkSession.sqlContext)

还可以使用以下命令将Spark DataFrame写入GeoMesa表:

dataFrame.write.format("geomesa").options(dsParams).option("geomesa.feature", "featureName").save()

这将自动转换数据框的基础RDD [Row] 变成了RDD [SimpleFeature] 并并行地写入数据存储。要使此功能起作用,要素类型 featureName 数据存储中必须已存在。

写回要素时,可以通过特殊的 __fid__ 专栏:

dataFrame
    .withColumn("__fid__", $"custom_fid")
    .write
    .format("geomesa")
    .options(dsParams)
    .option("geomesa.feature", "featureName")
    .save

11.5.4. 地理空间用户定义的类型和函数

GeoMesa SparkSQL模块需要几个 classes representing geometry objects (正如OGC所描述的 OpenGIS Simple feature access common architecture 规范并由Java Topology Suite实现),并将它们注册为SparkSQL中的用户定义类型(UDT)。例如, Geometry 类注册为 GeometryUDT 。在GeoMesa SparkSQL中注册了以下类型:

  • GeometryUDT

  • PointUDT

  • LineStringUDT

  • PolygonUDT

  • MultiPointUDT

  • MultiLineStringUDT

  • MultiPolygonUDT

  • GeometryCollectionUDT

GeoMesa SparkSQL还实现了OGC中描述的函数子集 OpenGIS Simple feature access SQL option 规范为SparkSQL用户定义函数(UDF)。这些函数包括创建几何图形、访问几何图形的属性、将几何图形对象转换为更具体的子类、以其他格式输出几何图形、测量几何图形之间的空间关系以及处理几何图形。

例如,下面的SQL查询

select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)

使用两个UDF--st_contains``和  ``st_makeBBOX --要查找 chicago DataFrame WHERE列 geom 包含在指定的边界框中。

下一节给出了已实现的UDF的完整列表 (SparkSQL函数 )。

11.5.5. 内存中索引

如果您的数据足够小,可以放入执行器的内存中,则可以告诉GeoMesa SparkSQL将RDDS持久化到内存中,并利用CQEngine作为内存中的索引数据存储。要执行此操作,请添加选项 option("cache", "true") 在创建数据框时。这将为每个属性(不包括 fid 和几何图形。要基于几何建立索引,请添加选项 option("indexGeom", "true") 。对此关系的查询将自动命中缓存的RDD,并查询驻留在每个分区上的内存中数据存储,这可以带来显著的加速。

在对数据有一定了解的情况下,还可以通过应用初始查询来确保数据可以放入内存。这可以使用 query 选择。例如, option("query", "dtg AFTER 2016-12-31T23:59:59Z")

11.5.6. GeoJSON输出

这个 geomesa-spark-sql module provides a means of exporting a DataFrame to a GeoJSON 弦乐。这允许快速可视化许多支持GeoJSON输入的前端映射库中的数据,例如LEAFLE或Open Layers。

若要转换DataFrame,请导入隐式转换并调用 toGeoJSON 方法。

import org.locationtech.geomesa.spark.sql.GeoJSONExtensions._
val df: DataFrame = // Some data frame
val geojsonDf = df.toGeoJSON

如果结果可以存储在内存中,则可以在驱动程序上收集结果并将其写入文件。如果不是,则每个执行器都可以写入像HDFS这样的分布式文件系统。

val geoJsonString = geojsonDF.collect.mkString("[",",","]")

备注

要执行此操作,数据框应具有几何字段,这意味着其方案应具有 StructField 这是GeoMesa提供的JTS几何体类型之一。

11.5.7. 在ApacheSedona中使用GeoMesa SparkSQL

GeoMesa SparkSQL可以与 Apache Sedona 。您可以通过将ApacheSedona JAR添加到您的类路径来启用此功能。例如,您可以使用以下命令提交Spark作业 sedona-python-adapter-${spark-version}_${scala-version}-${sedona-version}.jar 添加到 --jars 选项:

spark-submit --jars /path/to/geomesa-spark-runtime-jar.jar,/path/to/sedona-python-adapter-jar.jar ...

备注

一旦ApacheSedona提供的类可用,就会自动启用ApacheSedona集成。您可以通过设置系统属性来手动禁用此功能 geomesa.use.sedonafalse

spark-submit --conf "spark.driver.extraJavaOptions=-Dgeomesa.use.sedona=false" \
              --conf "spark.executor.extraJavaOptions=-Dgeomesa.use.sedona=false" \
              ...

创建Spark Session对象时需要注意几个配置:

val spark = SparkSession.builder().
  // ...
  config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
  config("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName).
  config("spark.geomesa.sedona.udf.prefix", "sedona_").
  // ...
  getOrCreate()
SQLTypes.init(spark.sqlContext)
  • spark.serializerspark.kryo.registrator 应配置为使用由GeoMesa Spark提供的Kryo序列化程序。 GeoMesaSparkKryoRegistrator 将自动注册由阿帕奇·塞多纳提供的其他kryo序列化程序。

  • spark.geomesa.sedona.udf.prefix 选项指定要添加到由ApacheSedona提供的Spark SQL函数的公共前缀。有很多功能都是由提供的 火花JTS 和阿帕奇·塞多纳。例如, st_pointFromText 由Spark JTS提供,接受单个参数,其中 ST_PointFromText 由ApacheSedona提供,带有两个参数。配置 config("spark.geomesa.sedona.udf.prefix", "sedona_") 使我们能够区分这两种功能:

    spark.sql("SELECT st_pointFromText('POINT (10 20)')")
    spark.sql("SELECT sedona_ST_PointFromText('10,20', ',')")
    

    的默认值为 spark.geomesa.sedona.udf.prefix"sedona_" 。当此选项显式设置为空字符串时,Spark JTS函数将被来自ApacheSedona的同名函数覆盖。

  • SQLTypes.init 必须调用才能注册由ApacheSedona提供的UDF和UDAF。

GeoMesa SparkSQL可以将几何谓词函数调用推送到数据存储区:

spark.sql("SELECT geom FROM schema WHERE sedona_ST_Intersects(geom, sedona_ST_PolygonFromEnvelope(100.0,20.0,110.0,30.0))").explain()
// == Physical Plan ==
// *(1) Scan GeoMesaRelation(...,Some([ geom intersects POLYGON ((100 20, 100 30, 110 30, 110 20, 100 20)) ]),None,None) [geom#32] ...

在执行空间连接时,连接两个数据集的谓词应该是由ApacheSedona提供的函数,否则ApacheSedona的CATALYST优化规则将不会提取和优化您的连接。

// This join is accelerated by Apache Sedona as a RangeJoin
spark.sql("SELECT linestrings.geom, polygons.the_geom FROM linestrings JOIN polygons ON sedona_ST_Intersects(linestrings.geom, polygons.the_geom)").explain()
// == Physical Plan ==
// RangeJoin geom#32: linestring, the_geom#101: multipolygon, true
// :- *(1) Scan GeoMesaRelation...
// +- *(2) Scan GeoMesaRelation...

// This is just a normal CartesianProduct or BroadcastNestedLoopJoin
spark.sql("SELECT linestrings.geom, polygons.the_geom FROM linestrings JOIN polygons ON ST_Intersects(linestrings.geom, polygons.the_geom)").explain()
// == Physical Plan ==
// CartesianProduct UDF:st_intersects(geom#32, the_geom#101)
// :- *(1) Scan GeoMesaRelation...
// +- *(2) Scan GeoMesaRelation...

// Calling DataFrame functions provided by GeoMesa Spark JTS also yields CartesianProduct or BroadcastNestedLoopJoin
dfLineString.join(dfPolygon, st_intersects($"geom", $"the_geom")).explain()
// == Physical Plan ==
// CartesianProduct UDF(geom#32, the_geom#101)
// :- *(1) Scan GeoMesaRelation...
// +- *(2) Scan GeoMesaRelation...

警告

option("spatial", "true") 和中描述的任何其他选项 空间分区和更快的连接 在使用ApacheSedona时配置空间连接无济于事。有关可用的配置选项,请参阅ApacheSedona文档。

用户在使用PySpark时也可以利用与ApacheSedona的集成,请确保

  • apache-sedona 包在您的Python环境中可用。

  • sedona.register 包最终还是导入了 geomesa_pyspark 包裹。

  • SedonaRegister.registerAll 在调用 geomesa_pyspark.init_sql 或从GeoMesa数据存储加载DataFrame。

以下是在PySpark中使用ApacheSedona集成功能的示例启动代码:

import geomesa_pyspark
...
from pyspark.sql import SparkSession
...
from sedona.register import SedonaRegistrator

spark = SparkSession.builder.config(...).getOrCreate()
...
geomesa_pyspark.init_sql(spark)
SedonaRegistrator.registerAll(spark)

11.5.8. 空间分区和更快的连接

备注

推荐使用ApacheSedona来加快连接速度。看见 在ApacheSedona中使用GeoMesa SparkSQL 了解更多细节。

还可以通过对数据进行空间分区来实现额外的加速。添加选项 option("spatial", "true") 将确保空间上彼此接近的数据将放置在同一分区上。默认情况下,您的数据将被分区到一个NxN网格中,但总共有4种分区策略,每种策略都可以通过名称指定 option("strategy", strategyName)

等于-计算数据的边界并将其划分为大小相等的N×N格网,其中 N = sqrt(numPartitions)

加权-类似相等,但确保沿每个轴的相同比例的数据位于每个网格单元格中。

与地球相似,但使用整个地球作为界限,而不是根据数据计算它们。

RTREE-基于数据样本构建R树,并使用边界矩形的子集作为分区包络。

空间分区的优势有两个方面:

1)具有完全位于一个分区中的空间谓词的查询可以直接转到该分区,从而跳过扫描肯定不包括所需数据的分区的开销。

2)如果两个数据集被相同的方案划分,导致两个关系的分区包络相同,则空间连接可以使用分区包络作为连接中的关键字。这大大减少了完成联接所需的比较次数。

其他数据框选项允许更好地控制如何创建分区。对于需要数据样本(加权和RTREE)的策略, sampleSizethresholdMultiplier 可用于控制决策过程中使用了多少底层数据,以及允许在RTree信封中包含多少项。

其他有用的选项如下:

option("partitions", "n") -指定基础RDDS将成为的分区数(覆盖默认并行度)

option("bounds", "POLYGON in WellKnownText") -限制网格的边界 WEIGHTEDEQUAL 策略使用。所有不在这些界限内的数据都将放置在单独的分区中

option("cover", "true") -由于只有EQUAL和Earth分区策略才能保证分区信封在所有关系中都相同,因此设置了此选项的数据帧将强制它们所连接的数据帧的分区方案与其自身的分区方案相匹配。