11.7. GeoMesa火花源

GeoMesa提供与Spark PythonAPI的集成,用于访问GeoMesa数据存储中的数据。

11.7.1. 先决条件

  • Spark 3.3 应该安装。

  • Python 应安装2.7或3.x。

  • pippip3 应该安装。

  • conda-pack 是可选的。

11.7.2. 安装

这个 geomesa_pyspark 程序包不可下载。使用配置文件在本地构建构件 -Ppython 。然后使用以下工具安装 pippip3 如下所示。您还需要一个适当的 geomesa-spark-runtime 罐子。我们假定此处使用的是Acumulo,但您也可以选择使用 空间RDD提供程序

mvn clean install -Ppython
pip3 install geomesa-spark/geomesa_pyspark/target/geomesa_pyspark-$VERSION.tar.gz
cp  geomesa-accumulo/geomesa-accumulo-spark-runtime-accumulo2/target/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar /path/to/

或者,您可以使用 conda-pack 来捆绑项目的依赖项。如果您有其他依赖项,这可能更合适。

export ENV_NAME=geomesa-pyspark

conda create --name $ENV_NAME -y python=3.7
conda activate $ENV_NAME

pip install geomesa-spark/geomesa_pyspark/target/geomesa_pyspark-$VERSION.tar.gz
# Install additional dependencies using conda or pip here

conda pack -o environment.tar.gz
cp geomesa-accumulo/geomesa-accumulo-spark-runtime-accumulo2/target/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar /path/to/

警告

conda-pack 目前有关于Python3.8的问题,以及 pyspark 在Python3.9中有问题,因此明确使用了Python3.7

11.7.3. 使用GeoMesa PySpark

然后,默认情况下,您可以使用纱线母版访问Spark。重要的是,因为 geomesa_pyspark 库与底层Java库交互,必须先设置GeoMesa配置,然后才能引用 pyspark 类库。

import geomesa_pyspark
conf = geomesa_pyspark.configure(
    jars=['/path/to/geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar'],
    packages=['geomesa_pyspark','pytz'],
    spark_home='/path/to/spark/').\
    setAppName('MyTestApp')

conf.get('spark.master')
# u'yarn'

from pyspark.sql import SparkSession

spark = ( SparkSession
    .builder
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()
)

或者,如果您使用 conda-pack 然后,您不需要如上所述设置GeoMesa配置,但您必须启动 pyspark 或您的应用程序,如下所示,根据需要更新路径:

PYSPARK_DRIVER_PYTHON=/opt/anaconda3/envs/$ENV_NAME/bin/python PYSPARK_PYTHON=./environment/bin/python pyspark \
--jars /path/to/geomesa-accumulo-spark-runtime_${VERSION}.jar \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn --deploy-mode client --archives environment.tar.gz#environment

此时,您已经准备好为Acumulo数据存储创建一组连接参数,并获得一个空间数据框。

params = {
    "accumulo.instance.name": "myInstance",
    "accumulo.zookeepers": "zoo1,zoo2,zoo3",
    "accumulo.user": "user",
    "accumulo.password": "password",
    "accumulo.catalog": "myCatalog"
}
feature = "mySchema"
df = ( spark
    .read
    .format("geomesa")
    .options(**params)
    .option("geomesa.feature", feature)
    .load()
)

df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()

# Count features in a bounding box.
spark.sql("""
select count(*)
from tbl
where st_contains(st_makeBBOX(-72.0, 40.0, -71.0, 41.0), geom)
""").show()

在没有GeoMesa数据存储的情况下,也可以使用GeoMesa PySpark。注册用户定义的类型和函数可以通过调用 geomesa_pyspark.init_sql() 在Spark Session对象上:

geomesa_pyspark.init_sql(spark)

您可以使用以下命令终止Spark作业 spark.stop()

11.7.4. 在PySpark中使用Geomesa UDF

有3种不同的方式可以从PySpark使用Geomesa UDF:从SQL API,通过SQL表达式从连贯API,或通过Python包装器从连贯API。这些方法在性能方面是相同的,因此为您的项目选择最好的方法取决于您的偏好。

11.7.4.1. 1.从SQL API访问Geomesa UDF

我们只需在SQL表达式中包含函数,就可以通过SQL API访问Geomesa UDF。

df.createOrReplaceTempView("tbl")

spark.sql("""
select count(*) from tbl
where st_contains(st_makeBBOX(-72.0, 40.0, -71.0, 41.0), geom)
""").show()

11.7.4.2. 2.通过SQL表达式从流畅的API访问Geomesa UDF

我们还可以从Fluent API通过 pyspark.sql.functions 模块。此模块有一个 expr 我们可以用来访问Geomesa UDF的函数。

import pyspark.sql.functions as F

# add a new column
df = df.withColumn("geom_wkt", F.expr("st_asText(geom)"))

# filter using SQL where expression
df = df.select("*").where("st_area(geom) > 0.001")

df.show()

11.7.4.3. 3.通过Python包装器从流畅的API访问Geomesa UDF

我们还支持通过使用Python包装器将Geomesa UDF用作独立函数。Geomesa UDF的Python包装器在JVM上运行,并且比逻辑上等价的PythonUDF更快。

from geomesa_pyspark.scala.functions import st_asText, st_area

df = df.withColumn("geom_wkt", st_asText("geom"))
df = df.withColumn("geom_area", st_area("geom"))

df.show()

11.7.5. 使用来自PySpark的自定义Scala UDF

我们在中提供了一些实用函数 geomesa_pyspark 允许您使用自己的Scala UDF作为来自PySpark的独立函数。这里的优势在于,您可以用Java或Scala编写您的UDF(因此它们可以在JVM上运行),但是可以从PySpark中自然地使用,就好像它是流畅API的一部分一样。这使我们能够从PySpark编写和使用高性能的UDF,而不必依赖于PythonUDF,后者对于较大的数据集来说通常会慢得令人望而却步。

from functools import partial
from geomesa_pyspark.scala.udf import build_scala_udf, scala_udf, ColumnOrName
from pyspark import SparkContext
from pyspark.sql.column import Column

sc = SparkContext.getOrCreate()
custom_udfs = sc._jvm.path.to.your.CustomUserDefinedFunctions

# use the helper function for building your udf
def my_scala_udf(col: ColumnOrName) -> Column:
    """helpful docstring that explains what col is"""
    return build_scala_udf(sc, custom_udfs.my_scala_udf)(col)

# or alternatively, build it directly by partially applying the scala udf
my_other_udf = partial(scala_udf, sc, custom_udfs.my_other_udf())

df.withColumn("edited_field_1", my_scala_udf("field_1")).show()
df.withColumn("edited_field_2", my_other_udf("field_2")).show()

回想一下,这些UDF实际上可以接受 pyspark.sql.column.Column 或我们要操作的列的字符串名称,因此以下内容是等效的:

# this is more readable
df.withColumn("edited_field_1", my_scala_udf("field_1")).show()

# but we can also do this
df.withColumn("edited_field_1", my_scala_udf(col("field_1"))).show()

11.7.6. 朱庇特

要使用 geomesa_pyspark 在Jupyter包中,您只需要一个默认提供的Python2或Python3内核。在上面的代码块中替换适当的Spark Home和运行时JAR路径。确保GeoMesa Acumulo客户端和服务器端版本匹配,如中所述 安装GeoMesa Acumulo