11.3. 火花芯¶
geomesa-spark-core
用于直接与 RDD
来自GeoMesa和其他地理空间数据存储的要素的S。
11.3.1. 示例¶
以下是通过对GeoMesa数据存储进行地理空间查询来创建RDD的完整Scala示例:
// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
"accumulo.instance.name" -> "instance",
"accumulo.zookeepers" -> "zoo1,zoo2,zoo3",
"accumulo.user" -> "user",
"accumulo.password" -> "*****",
"accumulo.catalog" -> "geomesa_catalog",
"geomesa.security.auths" -> "USER,ADMIN")
// set SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("testSpark")
val sc = SparkContext.getOrCreate(conf)
// create RDD with a geospatial query using GeoMesa functions
val spatialRDDProvider = GeoMesaSpark(dsParams)
val filter = ECQL.toFilter("CONTAINS(POLYGON((0 0, 0 90, 90 90, 90 0, 0 0)), geom)")
val query = new Query("chicago", filter)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, dsParams, query)
resultRDD.collect
// Array[org.opengis.feature.simple.SimpleFeature] = Array(
// ScalaSimpleFeature:4, ScalaSimpleFeature:5, ScalaSimpleFeature:6,
// ScalaSimpleFeature:7, ScalaSimpleFeature:9)
11.3.2. 配置¶
geomesa-spark-core
为访问Spark中的地理空间数据提供API,方法是定义一个名为 SpatialRDDProvider
。此接口的不同实现连接到不同的输入源。这些不同的提供程序在中有更详细的描述 用法 下面。
GeoMesa提供了几个JAR依赖项来简化Spark类路径的设置。要在Spark中使用这些库,可以将适当的阴影JAR传递(例如)给 spark-submit
命令通过 --jars
选项:
--jars file://path/to/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar
或通过Jupyter等笔记本服务器中的适当机制传递给Spark(请参见 使用Jupyter笔记本部署GeoMesa Spark )或齐柏林飞艇。
备注
看见 空间RDD提供程序 有关选择正确的GeoMesa Spark运行时JAR的详细信息。
阴影JAR还应该提供 Converter RDD提供程序 和 GeoTools RDD提供程序 ,所以这些罐子可以简单地添加到 --jars
也是如此(尽管在后一种情况下,可能需要额外的JAR来实现访问的GeoTools数据存储)。
11.3.3. 简单功能序列化¶
序列化 RDD
S SimpleFeature
S在群集的节点之间,Spark必须使用中提供的Kryo序列化注册器进行配置 geomesa-spark-core
。
备注
在中运行Spark时不需要配置Kryo序列化 local
模式,因为作业将在单个JVM中执行。
将这两个条目添加到 $SPARK_HOME/conf/spark-defaults.conf
(或将其作为 --conf
争论到 spark-submit
):
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator
备注
或者,可以在 SparkConf
对象,用于创建 SparkContext
:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)
在笔记本服务器中使用Spark时,需要禁用自动创建 SparkContext
。
设置配置选项后,GeoMesa创建的RDDS SpatialRDDProvider
实现将向序列化程序提供程序正确注册。
11.3.4. 用法¶
提供的功能的主要入口点 geomesa-spark-core
是 GeoMesaSpark
对象:
val spatialRDDProvider = GeoMesaSpark(params)
GeoMesaSpark
加载一个 SpatialRDDProvider
当类路径上包含适当的JAR时,通过SPI实现。由返回的实现 GeoMesaSpark
根据作为参数传递的参数进行选择,如下面的Scala代码所示:
// parameters to pass to the SpatialRDDProvider implementation
val params = Map(
"param1" -> "foo",
"param2" -> "bar")
// GeoTools Query; may be used to filter results retrieved from the data store
val query = new Query("foo")
// val query = new Query("foo", ECQL.toFilter("name like 'A%'"))
// get the RDD, using the SparkContext configured as above
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
要保存要素,请使用 save()
方法:
GeoMesaSpark(params).save(rdd, params, "gdelt")
警告
这个 save()
方法执行追加写入,当前不支持更新现有功能。重复使用要素ID是一个逻辑错误,可能会导致数据不一致。
请注意,某些提供程序可能是只读的。
看见 空间RDD提供程序 有关特定提供程序实现的详细信息,请参见。
11.3.5. GeoJSON输出¶
这个 geomesa-spark-core
module provides a means of exporting an RDD[SimpleFeature]
to a GeoJSON 弦乐。这允许快速可视化许多支持GeoJSON输入的前端映射库中的数据,例如LEAFLE或Open Layers。
若要转换RDD,请导入隐式转换并调用 asGeoJSONString
方法。
import org.locationtech.geomesa.spark.SpatialRDD._
val rdd: RDD[SimpleFeature] = ???
val geojson = rdd.asGeoJSONString