11.3. 火花芯

geomesa-spark-core 用于直接与 RDD 来自GeoMesa和其他地理空间数据存储的要素的S。

11.3.1. 示例

以下是通过对GeoMesa数据存储进行地理空间查询来创建RDD的完整Scala示例:

// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
  "accumulo.instance.name" -> "instance",
  "accumulo.zookeepers"    -> "zoo1,zoo2,zoo3",
  "accumulo.user"          -> "user",
  "accumulo.password"      -> "*****",
  "accumulo.catalog"       -> "geomesa_catalog",
  "geomesa.security.auths" -> "USER,ADMIN")

// set SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("testSpark")
val sc = SparkContext.getOrCreate(conf)

// create RDD with a geospatial query using GeoMesa functions
val spatialRDDProvider = GeoMesaSpark(dsParams)
val filter = ECQL.toFilter("CONTAINS(POLYGON((0 0, 0 90, 90 90, 90 0, 0 0)), geom)")
val query = new Query("chicago", filter)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, dsParams, query)

resultRDD.collect
// Array[org.opengis.feature.simple.SimpleFeature] = Array(
//    ScalaSimpleFeature:4, ScalaSimpleFeature:5, ScalaSimpleFeature:6,
//    ScalaSimpleFeature:7, ScalaSimpleFeature:9)

11.3.2. 配置

geomesa-spark-core 为访问Spark中的地理空间数据提供API,方法是定义一个名为 SpatialRDDProvider 。此接口的不同实现连接到不同的输入源。这些不同的提供程序在中有更详细的描述 用法 下面。

GeoMesa提供了几个JAR依赖项来简化Spark类路径的设置。要在Spark中使用这些库,可以将适当的阴影JAR传递(例如)给 spark-submit 命令通过 --jars 选项:

--jars file://path/to/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar

或通过Jupyter等笔记本服务器中的适当机制传递给Spark(请参见 使用Jupyter笔记本部署GeoMesa Spark )或齐柏林飞艇。

备注

看见 空间RDD提供程序 有关选择正确的GeoMesa Spark运行时JAR的详细信息。

阴影JAR还应该提供 Converter RDD提供程序GeoTools RDD提供程序 ,所以这些罐子可以简单地添加到 --jars 也是如此(尽管在后一种情况下,可能需要额外的JAR来实现访问的GeoTools数据存储)。

11.3.3. 简单功能序列化

序列化 RDD S SimpleFeature S在群集的节点之间,Spark必须使用中提供的Kryo序列化注册器进行配置 geomesa-spark-core

备注

在中运行Spark时不需要配置Kryo序列化 local 模式,因为作业将在单个JVM中执行。

将这两个条目添加到 $SPARK_HOME/conf/spark-defaults.conf (或将其作为 --conf 争论到 spark-submit ):

spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator  org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator

备注

或者,可以在 SparkConf 对象,用于创建 SparkContext

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)

在笔记本服务器中使用Spark时,需要禁用自动创建 SparkContext

设置配置选项后,GeoMesa创建的RDDS SpatialRDDProvider 实现将向序列化程序提供程序正确注册。

11.3.4. 用法

提供的功能的主要入口点 geomesa-spark-coreGeoMesaSpark 对象:

val spatialRDDProvider = GeoMesaSpark(params)

GeoMesaSpark 加载一个 SpatialRDDProvider 当类路径上包含适当的JAR时,通过SPI实现。由返回的实现 GeoMesaSpark 根据作为参数传递的参数进行选择,如下面的Scala代码所示:

// parameters to pass to the SpatialRDDProvider implementation
val params = Map(
  "param1" -> "foo",
  "param2" -> "bar")
// GeoTools Query; may be used to filter results retrieved from the data store
val query = new Query("foo")
// val query = new Query("foo", ECQL.toFilter("name like 'A%'"))
// get the RDD, using the SparkContext configured as above
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)

要保存要素,请使用 save() 方法:

GeoMesaSpark(params).save(rdd, params, "gdelt")

警告

这个 save() 方法执行追加写入,当前不支持更新现有功能。重复使用要素ID是一个逻辑错误,可能会导致数据不一致。

请注意,某些提供程序可能是只读的。

看见 空间RDD提供程序 有关特定提供程序实现的详细信息,请参见。

11.3.5. GeoJSON输出

这个 geomesa-spark-core module provides a means of exporting an RDD[SimpleFeature] to a GeoJSON 弦乐。这允许快速可视化许多支持GeoJSON输入的前端映射库中的数据,例如LEAFLE或Open Layers。

若要转换RDD,请导入隐式转换并调用 asGeoJSONString 方法。

import org.locationtech.geomesa.spark.SpatialRDD._
val rdd: RDD[SimpleFeature] = ???
val geojson = rdd.asGeoJSONString