GeoMesa Spark:空间连接和聚合

本教程将向您展示如何:

  1. 将GeoMesa用于 Apache Spark 在斯卡拉。

  2. 通过我们的地理空间用户定义函数创建和使用DataFrame。

  3. 根据阈值距离计算聚合统计数据。

  4. 创建新的简单要素类型来表示此聚合。

  5. 将结果写回数据存储。

背景

NYCTaxi 是伊利诺伊大学发布的出租车活动数据,从信息自由法的要求到纽约市出租车和豪华轿车委员会。

GeoNames 是一个包含1000多万个地名和900多万个独特特征的地理数据库。

假设我们想要回答这样的问题:“出租车接送是集中在特定的兴趣点附近吗?”,“人们更有可能要求接送还是在兴趣点下车?”为了找出答案,我们需要将这两个数据集结合在一起,并汇总结果的统计数据。

由于GeoNames是一个点的数据集,而nyctaxi只提供行程的上下车点,因此行程不太可能准确地在标记的兴趣点开始或结束。因此,我们不能天真地根据点相等的位置连接数据集。取而代之的是,我们需要将这些点连接在彼此之间可以容忍的距离内。这在下文中称为D-in联接。

先决条件

对于本教程,我们将假设您已经将两个数据集摄取到您选择的数据存储中。在没有创建必要的表的情况下遵循本教程将导致错误。

GeoNames数据集的转换器, GeoNames ,和nyctaxi数据集, 纽约市出租车 ,均随GeoMesa提供。要获得进一步的指导,您可以按照摄取教程之一进行操作 MAP-减少GDELT的摄取 。在GeoMesa中获取数据后,您可以继续本教程的其余部分。

正在初始化Spark

要开始使用Spark,我们需要初始化Spark会话,要将GeoMesa的地理空间用户定义类型(UDT)和用户定义函数(UDF)应用到Spark中的数据,我们需要初始化SparkSQL扩展。该功能要求在运行Spark作业时在类路径上具有适当的GeoMesa Spark运行时JAR。GeoMesa为Acumulo、HBase和文件系统数据存储提供了Spark运行时JAR。例如,以下代码将启动一个交互式Spark REPL,其中包含在Acumulo数据存储上使用GeoMesa运行Spark所需的所有依赖项。替换 ${VERSION} 使用适当的Scala Plus GeoMesa版本(例如 2.12-4.0.2 ):

$ bin/spark-shell --jars geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar

备注

看见 空间RDD提供程序 有关选择正确的GeoMesa Spark运行时JAR的详细信息。

要配置Spark会话以便我们可以序列化简单特征并使用几何UDT和UDF,我们必须按如下方式更改Spark会话。

import org.apache.spark.sql.SparkSession
import org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator
import org.locationtech.geomesa.spark.jts._

val spark: SparkSession = SparkSession.builder()
    .appName("testSpark")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)
    .master("local[*]")
    .getOrCreate()
    .withJTS

请注意 withJTS ,它注册了GeoMesa的UDT和UDF,以及两个 config 这些选项告诉Spark使用GeoMesa的定制Kryo序列化程序和注册器来处理简单功能的序列化。这些配置选项也可以在 conf/spark-defaults.conf 配置文件。

创建DataFrame

创建并配置Spark会话后,我们可以继续将数据从数据存储加载到Spark DataFrame。

首先,我们将设置用于连接到数据存储的参数。例如,如果我们的数据位于两个Acumulo目录中,我们将设置以下参数映射:

val taxiParams = Map(
  "accumulo.instance.name" -> "instance",
  "accumulo.zookeepers"    -> "zoo1:2181,zoo2:2181,zoo3:2181",
  "accumulo.user"          -> "user",
  "accumulo.password"      -> "password",
  "accumulo.catalog"       -> "nyctaxi")

val geonamesParams = Map(
  "accumulo.instance.name" -> "instance",
  "accumulo.zookeepers"    -> "zoo1:2181,zoo2:2181,zoo3:2181",
  "accumulo.user"          -> "user",
  "accumulo.password"      -> "password",
  "accumulo.catalog"       -> "geonames")

备注

以上参数假定Acumulo为后备数据存储,但本教程的其余部分与使用哪个数据存储无关。只需适当地调整上述参数即可使用其他受支持的数据存储。

然后我们就可以利用斯帕克的 DataFrameReader 还有我们的 SpatialRDDProvider 要创建 DataFrame

val taxiDF = spark.read.format("geomesa")
  .options(taxiParams)
  .option("geomesa.feature", "nyctaxi-single")
  .load()

val geonamesDF = spark.read.format("geomesa")
  .options(geonamesParams)
  .option("geomesa.feature", "geonames")
  .load()

因为我们知道我们的出租车数据仅限于纽约州,所以我们可以过滤我们的地理名称数据。

import spark.implicits._
import org.apache.spark.sql.functions._

val geonamesNY = geonamesDF.where($"admin1Code" === lit("NY"))

D-在连接内

现在我们准备好连接这两个数据集。这就是我们将使用两个地理空间UDF的地方。 st_contains 接受两个几何图形作为输入,并输出第二个几何图形是否位于第一个几何图形内。 st_bufferPoint 将一个点和一个以米为单位的距离作为输入,并输出一个半径等于提供的距离的圆围绕该点。有关GeoMesa提供的更多文档和UDF的完整列表,请参阅 SparkSQL函数

使用这两个UDF,我们可以构建以下联接查询。

val joinedDF = geonamesNY
  .select(st_bufferPoint($"geom", lit(50)).as("buffer"), $"name", $"geonameId")
  .join(taxiDF, st_contains($"buffer", $"pickup_point"))

上面的查询将每个GeoName点的几何图形转换为半径为50米的圆,并将结果与该圆内任何位置的出租车记录连接在一起。

聚合

现在我们有了一个DataFrame,其中纽约的每个兴趣点都与出租车记录相结合,其中大约从那个位置发出了一辆皮卡。为了将其转化为关于该地区出租车习惯的有意义的统计数据,我们可以做一个 GROUP BY 操作并使用SparkSQL的一些聚合函数。

val aggregateDF = joinedDF.groupBy($"geonameId")
  .agg(first("name").as("name"),
       countDistinct($"trip_id")).as(s"numPickups"),
       first("buffer").as("buffer"))

上面的查询根据兴趣点对数据进行分组,并计算不同拾取的数量。结果可以用来生成基于皮卡密度的兴趣点热图,但要快速查看哪些兴趣点最远离出租车,我们可以对结果进行排序并查看前十名。

val top10 = aggregateDF.orderBy($"numPickups".desc).take(10)
top10.foreach { row => println(row.getAs[String]("name") + row.getAs[Int]("numPickups")) }

这告诉我们,Gansevoort酒店有最多的出租车接送。

回写

如果我们想要在Spark会话之后持久保存此聚合结果,则需要将其写回底层数据存储。这是完成这两个步骤。

首先,我们创建一个与聚合结果一致的SimpleFeatureType:

import org.locationtech.geomesa.utils.geotools.SchemaBuilder

val aggregateSft = SchemaBuilder.builder()
    .addString("name")
    .addInt("numPickups")
    .addPolygon("buffer")
    .build("aggregate")

接下来,我们可以在数据存储中创建模式,然后安全地写入数据。

import org.geotools.data.DataStoreFinder
DataStoreFinder.getDataStore(taxiParams).createSchema(aggregateSft)
aggregateDF.write.format("geomesa").options(taxiParams).option("geomesa.feature", "aggregate").save()

如果您遵循上述所有步骤,最终结果将是一个数据集,其中包含纽约所有兴趣点的出租车接送密度,并可选择写回数据存储。如果您想进一步将此结果与出租车落客点的分布进行比较,则可以很容易地将上述代码修改为使用落客点。

可以按照中的示例采取进一步的步骤来可视化此结果 GeoMesa Spark:广播加入和聚合 。这将导致如下所示:

../_images/aggregate-NYCTaxi.png