11.2. 火花JTS

Spark JTS模块提供了一组用户定义函数(UDF)和用户定义类型(UDT),允许在Spark中执行SQL查询,从而对地理空间数据类型执行地理空间操作。

GeoMesa SparkSQL支持建立在 DataSet/DataFrame Spark SQL模块中提供的API可提供地理空间功能。这包括自定义地理空间数据类型和函数、创建 DataFrame 来自GeoTools DataStore ,以及用于提高SQL查询性能的优化。

此功能位于 geomesa-spark/geomesa-spark-jts 模块:

<properties>
  <geomesa.version>4.0.2</geomesa.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
  <groupId>org.locationtech.geomesa</groupId>
  <artifactId>geomesa-spark-jts_${scala.binary.version}</artifactId>
  <version>${geomesa.version}</version>
</dependency>

11.2.1. 示例

以下是加载具有用户定义类型的DataFrame的Scala示例:

import org.locationtech.jts.geom._
import org.apache.spark.sql.types._
import org.locationtech.geomesa.spark.jts._

import spark.implicits._

val schema = StructType(Array(
  StructField("name",StringType, nullable=false),
  StructField("pointText", StringType, nullable=false),
  StructField("polygonText", StringType, nullable=false),
  StructField("latitude", DoubleType, nullable=false),
  StructField("longitude", DoubleType, nullable=false)))

val dataFile = this.getClass.getClassLoader.getResource("jts-example.csv").getPath
val df = spark.read
  .schema(schema)
  .option("sep", "-")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .csv(dataFile)

val alteredDF = df
  .withColumn("polygon", st_polygonFromText($"polygonText"))
  .withColumn("point", st_makePoint($"longitude", $"latitude"))

注意初始模式没有UserDefinedType,但是在将我们的用户定义函数应用到适当的列之后,我们得到了一个具有地理空间列类型的数据框。

还可以从地理空间对象列表构建DataFrame:

import spark.implicits._
val point = new GeometryFactory().createPoint(new Coordinate(3.4, 5.6))
val df = Seq(point).toDF("point")

11.2.2. 配置

要启用此行为,请导入 org.locationtech.geomesa.spark.jts._ ,创建一个 SparkSession` and call `` .上面有JTS。这将注册UDF和UDT以及针对这些操作的一些催化剂优化。或者,您也可以致电 initJTS(SQLContext)

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.locationtech.geomesa.spark.jts._

val spark: SparkSession = SparkSession.builder() // ... initialize spark session
spark.withJTS

11.2.3. 地理空间用户定义的类型和函数

Spark JTS模块需要几个 classes representing geometry objects (正如OGC所描述的 OpenGIS Simple feature access common architecture 规范并由Java Topology Suite实现),并将它们注册为SparkSQL中的用户定义类型(UDT)。例如, Geometry 类注册为 GeometryUDT 。注册了以下类型:

  • GeometryUDT

  • PointUDT

  • LineStringUDT

  • PolygonUDT

  • MultiPointUDT

  • MultiLineStringUDT

  • MultiPolygonUDT

  • GeometryCollectionUDT

Spark JTS还实现了OGC中描述的功能子集 OpenGIS Simple feature access SQL option 规范为SparkSQL用户定义函数(UDF)。这些函数包括创建几何图形、访问几何图形的属性、将几何图形对象转换为更具体的子类、以其他格式输出几何图形、测量几何图形之间的空间关系以及处理几何图形。

例如,下面的SQL查询

select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)

使用两个UDF--st_contains``和  ``st_makeBBOX --要查找 chicago DataFrame WHERE列 geom 包含在指定的边界框中。

UDF还公开以与DataFrame API一起使用,这意味着上面的示例也可以通过以下代码实现:

import org.locationtech.geomesa.spark.jts._
import spark.implicits. _
chicagoDF.where(st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), $"geom"))

11.2.4. GeoTools用户定义函数

请注意,有三个GeoTool派生的UDF,它们是:

  • st_distanceSpheroid

  • st_lengthSpheroid

  • st_transform

它们在geomesa-spark-SQL JAR中可用,但在默认情况下也捆绑在spark运行时中。示例用法如下:

import org.locationtech.geomesa.spark.geotools._
chicagoDF.where(st_distanceSpheroid(st_point(0.0,0.0), col("geom")) > 10)

下一节给出了已实现的UDF的完整列表 (SparkSQL函数 )。

import org.locationtech.geomesa.spark.jts._
import spark.implicits. _
chicagoDF.where(st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), $"geom"))

11.2.5. 建房

该模块可以通过以下命令独立于GeoMesa构建和使用:

$ mvn install -pl geomesa-spark/geomesa-spark-jts