GeoMesa Spark:广播加入和聚合¶
本教程将向您展示如何:
将GeoMesa用于 Apache Spark 在斯卡拉。
通过我们的地理空间用户定义函数创建和使用DataFrame。
使用一组覆盖的多边形来计算聚合统计信息。
创建新的简单要素类型来表示此聚合。
将结果可视化为全息图。
背景¶
GDELT 提供从1979年至今全球广播、印刷和网络新闻媒体报道的事件的全面的按时间和地点索引的存档。
FIPS Codes 是唯一标识美国各县的联邦信息处理标准发布代码。
如果我们假设有更多GDELT事件的县在某种程度上更具政治相关性,我们可能会有兴趣看看哪些县GDELT事件更密集。
由于GDELT是一个包含点几何图形的数据集,我们不能立即知道它属于哪个县。为了解决这个问题,我们将使用第二个数据集,即表示县的边界和FIPS代码的FIPS代码shapefile,并将GDELT点与包含GDELT点的县连接起来。
在这种情况下,县的数量特别少,大约有3000条记录,我们可以通过“广播”县来使我们的查询更高效。“广播”在这里的意思是 Spark Broadcast 。在传统的Spark SQL连接中,数据将根据RDDS的分割器在执行器之间打乱,而且由于在我们的例子中,连接键是一个几何字段,所以没有内置的Spark Partiator可以有效地映射数据。由此产生的跨节点的数据移动代价很高,因此我们可以通过向每个节点发送(广播)一次我们的整个小数据集来实现性能提升。这确保了执行器拥有计算联接所需的所有数据,并且不需要额外的调整。
Spark提供了一种执行这种“广播联接”的方法,尽管它应该仅在正在广播的数据小到足以存储在执行器的内存中时才使用。
先决条件¶
对于本教程,我们将假设您已经将两个数据集摄取到您选择的数据存储中。在没有创建必要的表的情况下遵循本教程将导致错误。
GDELT数据集的转换器, 全球事件、语言和语调数据库(GDELT) ,是随GeoMesa提供的,并且无需转换器即可将FIPS数据作为shapefile摄取。要获得进一步的指导,您可以按照摄取教程之一进行操作 MAP-减少GDELT的摄取 。在GeoMesa中获取数据后,您可以继续本教程的其余部分。
正在初始化Spark¶
要开始使用Spark,我们需要初始化Spark会话,要将GeoMesa的地理空间用户定义类型(UDT)和用户定义函数(UDF)应用到Spark中的数据,我们需要初始化SparkSQL扩展。该功能要求在运行Spark作业时在类路径上具有适当的GeoMesa Spark运行时JAR。GeoMesa为Acumulo、HBase和文件系统数据存储提供Spark运行时JAR。例如,以下代码将启动一个交互式Spark REPL,其中包含在Acumulo数据存储上使用GeoMesa运行Spark所需的所有依赖项。替换 ${VERSION}
使用适当的Scala Plus GeoMesa版本(例如 2.12-4.0.2
):
$ bin/spark-shell --jars geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar
备注
看见 空间RDD提供程序 有关选择正确的GeoMesa Spark运行时JAR的详细信息。
要配置Spark会话以便我们可以序列化简单特征并使用几何UDT和UDF,我们必须按如下方式更改Spark会话。
import org.apache.spark.sql.SparkSession
import org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator
import org.locationtech.geomesa.spark.jts._
val spark: SparkSession = SparkSession.builder()
.appName("testSpark")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)
.master("local[*]")
.getOrCreate()
.withJTS
请注意 withJTS
,它注册了GeoMesa的UDT和UDF,以及两个 config
这些选项告诉Spark使用GeoMesa的定制Kryo序列化程序和注册器来处理简单功能的序列化。这些配置选项也可以在 conf/spark-defaults.conf
配置文件。
创建DataFrame¶
创建并配置Spark会话后,我们可以继续将数据从数据存储加载到Spark DataFrame。
首先,我们将设置用于连接到数据存储的参数。例如,如果我们的数据位于两个Acumulo目录中,我们将设置以下参数映射:
val fipsParams = Map(
"accumulo.instance.name" -> "instance",
"accumulo.zookeepers" -> "zoo1:2181,zoo2:2181,zoo3:2181",
"accumulo.user" -> "user",
"accumulo.password" -> "password",
"accumulo.catalog" -> "fips")
val gdeltParams = Map(
"accumulo.instance.name" -> "instance",
"accumulo.zookeepers" -> "zoo1:2181,zoo2:2181,zoo3:2181",
"accumulo.user" -> "user",
"accumulo.password" -> "password",
"accumulo.catalog" -> "gdelt")
备注
以上参数假定Acumulo为后备数据存储,但本教程的其余部分与使用哪个数据存储无关。只需适当地调整上述参数即可使用其他受支持的数据存储。
然后我们利用斯帕克的 DataFrameReader
还有我们的 SpatialRDDProvider
要创建 DataFrame
具有地理空间类型。
val fipsDF = spark.read.format("geomesa")
.options(fipsParams)
.option("geomesa.feature", "fips")
.load()
val gdeltDF = spark.read.format("geomesa")
.options(gdeltParams)
.option("geomesa.feature", "gdelt")
.load()
过滤数据帧(可选)¶
根据数据存储中数据的规模和我们的问题的具体程度,我们可能希望在连接之前缩小结果范围。例如,如果我们只想要一周范围内的GDELT事件,我们可以按如下方式筛选DataFrame:
import spark.implicits._
val filteredGdelt = gdeltDF.where("dtg between '2018-01-01 12:00:00' and '2018-01-08 12:00:00'")
广播加入¶
现在我们准备好连接这两个数据集。这是我们将利用我们的地理空间UDF的地方。 st_contains
接受两个几何图形作为输入,并输出第二个几何图形是否位于第一个几何图形内。有关GeoMesa提供的更多文档和UDF的完整列表,请参阅 SparkSQL函数 。
使用这两个UDF,我们可以构建以下联接查询。
import org.apache.spark.sql.functions.broadcast
val joinedDF = gdeltDF.join(broadcast(fipsDF), st_contains($"the_geom", $"geom"))
上述查询如前所述执行广播,将FIPS数据发送给每个执行者,然后根据GDELT事件是否发生在该县连接两个数据集。
聚合¶
现在我们有了一个DataFrame,其中每个GDELT事件都与它发生的美国县配对。要将其转化为有关GDELT事件在美国的分布的有意义的统计数据,我们可以执行以下操作 GROUP BY
操作并使用SparkSQL的一些聚合函数。
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.functions.first
val aggregateDF = joinedDF.groupBy(concat($"STATEFP", $"COUNTYFP"))
.agg(first("NAME").as("name"),
count($"globalEventId").as("eventCount"),
first("the_geom").as("geom"))
上面的查询根据FIPS代码(分为州和县代码)对数据进行分组,并计算每个代码中不同的GDELT事件的数量。结果可用于生成每个县的事件密度的可视化,我们将在下一节中看到这一点。
可视化¶
要可视化该结果,我们首先需要将数据映射到 GeoJSON 格式化。为此,我们使用了GeoMesa的DataFrame到GeoJSON的转换器。
import org.locationtech.geomesa.spark.sql.GeoJSONExtensions._
val geojsonDF = aggregateDF.toGeoJSON
如果结果可以存储在内存中,则可以在驱动程序上收集结果并将其写入文件。如果不是,则每个执行器都可以写入像HDFS这样的分布式文件系统。
val geoJsonString = geojsonDF.collect.mkString("[",",","]")
将数据导出为GeoJSON后,我们可以创建 Leaflet 地图,这是一种可以嵌入到网页中的交互地图。
加载和解析JSON非常简单。在本例中,我们将文件加载包装在 XMLHttpRequest
与Jupyter或Zeppelin等笔记本兼容的回调函数。如果将GeoJSON导出为名为的文件 aggregate.geojson
,然后下面的JavaScript会将该文件加载到传单地图中。
$(document).ready(function() {
var map = L.map('map').setView([35.4746,-44.7022],3);
L.tileLayer("https://{s}.tile.osm.org/{z}/{x}/{y}.png").addTo(map);
var aggFeature = "eventCount";
var colors = ["#1a9850", "#66bd63", "#a6d96a", "#d9ef8b", "#ffffbf", "#fee08b", "#fdae61", "#f46d43", "#d73027"]
var numBins = 8;
var bins = [];
// Load the GeoJSON
var rawFile = new XMLHttpRequest();
rawFile.onreadystatechange = function () {
if(rawFile.readyState === 4) {
if(rawFile.status === 200 || rawFile.status == 0) {
var allText = rawFile.response;
var aggJson = JSON.parse(allText)
L.geoJson(aggJson, {
style: function(feature) { return {
fillColor: getColor(feature.properties[aggFeature]),
weight: 0.5
}},
onEachFeature: decorate
}).addTo(map);
// Css override
$('svg').css("max-width","none")
}
}
}
rawFile.open("GET", "aggregate.geojson", false);
rawFile.send()
});
这确实使用了几个助手函数来设置地图上每个项目的颜色和弹出内容:
// Create the bins of the histogram, allows for coloring features by value
function createBins(json) {
var min = Number.MAX_SAFE_INTEGER;
var max = 0;
json.forEach(function(feature) {
let aggValue = Number(feature.properties[aggFeature])
if (aggValue < min)
min = aggValue
if (aggValue > max)
max = aggValue
});
var interval = (max-min) / numBins;
for (var i = 0; i < numBins; i++) {
bins.push(i*interval);
}
}
// Get the fill color based on which bin a value is in
function getColor(value) {
var fillColor = colorRange[numBins];
for (var x = 0; x < numBins; x++) {
if (Number(value)< bins[x]) {
fillColor = colorRange[x];
break;
}
}
return fillColor;
}
// Decorate a feature with a popup of its properties
function decorate(feature, layer) {
feature.properties.popupContent = Object.entries(feature.properties).join("<br/>").toString();
layer.bindPopup(feature.properties.popupContent);
}
然后,这个简单的HTML将加载一个包含数据的传单地图。
<html>
<meta charset="utf-8"/>
<link rel="stylesheet" href="https://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.css" />
<script src="https://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script src="theAboveJavascriptFile.js"></script>
<body>
<div id="map" style="height: 100%"></div>
</body>
</html>
最终结果将如下所示:
