11.7. GeoMesa火花源¶
GeoMesa提供与Spark PythonAPI的集成,用于访问GeoMesa数据存储中的数据。
11.7.1. 先决条件¶
Spark 3.3 应该安装。
Python 应安装2.7或3.x。
pip 或
pip3
应该安装。conda-pack 是可选的。
11.7.2. 安装¶
这个 geomesa_pyspark
程序包不可下载。使用配置文件在本地构建构件 -Ppython
。然后使用以下工具安装 pip
或 pip3
如下所示。您还需要一个适当的 geomesa-spark-runtime
罐子。我们假定此处使用的是Acumulo,但您也可以选择使用 空间RDD提供程序 。
mvn clean install -Ppython
pip3 install geomesa-spark/geomesa_pyspark/target/geomesa_pyspark-$VERSION.tar.gz
cp geomesa-accumulo/geomesa-accumulo-spark-runtime-accumulo2/target/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar /path/to/
或者,您可以使用 conda-pack
来捆绑项目的依赖项。如果您有其他依赖项,这可能更合适。
export ENV_NAME=geomesa-pyspark
conda create --name $ENV_NAME -y python=3.7
conda activate $ENV_NAME
pip install geomesa-spark/geomesa_pyspark/target/geomesa_pyspark-$VERSION.tar.gz
# Install additional dependencies using conda or pip here
conda pack -o environment.tar.gz
cp geomesa-accumulo/geomesa-accumulo-spark-runtime-accumulo2/target/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar /path/to/
警告
conda-pack
目前有关于Python3.8的问题,以及 pyspark
在Python3.9中有问题,因此明确使用了Python3.7
11.7.3. 使用GeoMesa PySpark¶
然后,默认情况下,您可以使用纱线母版访问Spark。重要的是,因为 geomesa_pyspark
库与底层Java库交互,必须先设置GeoMesa配置,然后才能引用 pyspark
类库。
import geomesa_pyspark
conf = geomesa_pyspark.configure(
jars=['/path/to/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar'],
packages=['geomesa_pyspark','pytz'],
spark_home='/path/to/spark/').\
setAppName('MyTestApp')
conf.get('spark.master')
# u'yarn'
from pyspark.sql import SparkSession
spark = ( SparkSession
.builder
.config(conf=conf)
.enableHiveSupport()
.getOrCreate()
)
或者,如果您使用 conda-pack
然后,您不需要如上所述设置GeoMesa配置,但您必须启动 pyspark
或您的应用程序,如下所示,根据需要更新路径:
PYSPARK_DRIVER_PYTHON=/opt/anaconda3/envs/$ENV_NAME/bin/python PYSPARK_PYTHON=./environment/bin/python pyspark \
--jars /path/to/geomesa-accumulo-spark-runtime_${VERSION}.jar \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn --deploy-mode client --archives environment.tar.gz#environment
此时,您已经准备好为Acumulo数据存储创建一组连接参数,并获得一个空间数据框。
params = {
"accumulo.instance.name": "myInstance",
"accumulo.zookeepers": "zoo1,zoo2,zoo3",
"accumulo.user": "user",
"accumulo.password": "password",
"accumulo.catalog": "myCatalog"
}
feature = "mySchema"
df = ( spark
.read
.format("geomesa")
.options(**params)
.option("geomesa.feature", feature)
.load()
)
df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()
# Count features in a bounding box.
spark.sql("""
select count(*)
from tbl
where st_contains(st_makeBBOX(-72.0, 40.0, -71.0, 41.0), geom)
""").show()
在没有GeoMesa数据存储的情况下,也可以使用GeoMesa PySpark。注册用户定义的类型和函数可以通过调用 geomesa_pyspark.init_sql()
在Spark Session对象上:
geomesa_pyspark.init_sql(spark)
您可以使用以下命令终止Spark作业 spark.stop()
。
11.7.4. 在PySpark中使用Geomesa UDF¶
有3种不同的方式可以从PySpark使用Geomesa UDF:从SQL API,通过SQL表达式从连贯API,或通过Python包装器从连贯API。这些方法在性能方面是相同的,因此为您的项目选择最好的方法取决于您的偏好。
11.7.4.1. 1.从SQL API访问Geomesa UDF¶
我们只需在SQL表达式中包含函数,就可以通过SQL API访问Geomesa UDF。
df.createOrReplaceTempView("tbl")
spark.sql("""
select count(*) from tbl
where st_contains(st_makeBBOX(-72.0, 40.0, -71.0, 41.0), geom)
""").show()
11.7.4.2. 2.通过SQL表达式从流畅的API访问Geomesa UDF¶
我们还可以从Fluent API通过 pyspark.sql.functions 模块。此模块有一个 expr 我们可以用来访问Geomesa UDF的函数。
import pyspark.sql.functions as F
# add a new column
df = df.withColumn("geom_wkt", F.expr("st_asText(geom)"))
# filter using SQL where expression
df = df.select("*").where("st_area(geom) > 0.001")
df.show()
11.7.4.3. 3.通过Python包装器从流畅的API访问Geomesa UDF¶
我们还支持通过使用Python包装器将Geomesa UDF用作独立函数。Geomesa UDF的Python包装器在JVM上运行,并且比逻辑上等价的PythonUDF更快。
from geomesa_pyspark.scala.functions import st_asText, st_area
df = df.withColumn("geom_wkt", st_asText("geom"))
df = df.withColumn("geom_area", st_area("geom"))
df.show()
11.7.5. 使用来自PySpark的自定义Scala UDF¶
我们在中提供了一些实用函数 geomesa_pyspark 允许您使用自己的Scala UDF作为来自PySpark的独立函数。这里的优势在于,您可以用Java或Scala编写您的UDF(因此它们可以在JVM上运行),但是可以从PySpark中自然地使用,就好像它是流畅API的一部分一样。这使我们能够从PySpark编写和使用高性能的UDF,而不必依赖于PythonUDF,后者对于较大的数据集来说通常会慢得令人望而却步。
from functools import partial
from geomesa_pyspark.scala.udf import build_scala_udf, scala_udf, ColumnOrName
from pyspark import SparkContext
from pyspark.sql.column import Column
sc = SparkContext.getOrCreate()
custom_udfs = sc._jvm.path.to.your.CustomUserDefinedFunctions
# use the helper function for building your udf
def my_scala_udf(col: ColumnOrName) -> Column:
"""helpful docstring that explains what col is"""
return build_scala_udf(sc, custom_udfs.my_scala_udf)(col)
# or alternatively, build it directly by partially applying the scala udf
my_other_udf = partial(scala_udf, sc, custom_udfs.my_other_udf())
df.withColumn("edited_field_1", my_scala_udf("field_1")).show()
df.withColumn("edited_field_2", my_other_udf("field_2")).show()
回想一下,这些UDF实际上可以接受 pyspark.sql.column.Column 或我们要操作的列的字符串名称,因此以下内容是等效的:
# this is more readable
df.withColumn("edited_field_1", my_scala_udf("field_1")).show()
# but we can also do this
df.withColumn("edited_field_1", my_scala_udf(col("field_1"))).show()
11.7.6. 朱庇特¶
要使用 geomesa_pyspark
在Jupyter包中,您只需要一个默认提供的Python2或Python3内核。在上面的代码块中替换适当的Spark Home和运行时JAR路径。确保GeoMesa Acumulo客户端和服务器端版本匹配,如中所述 安装GeoMesa Acumulo 。