GeoMesa Kafka Streams快速入门

本教程演示如何在GeoMesa中使用Kafka Streams。它将演练读写GeoMesa-Kafka主题,并使用Kafka流处理数据。

关于本教程

本教程使用的组件与 GeoMesa Kafka快速入门 并可以被视为它的扩展。在这里,我们将:

  1. 建立新的(静态)SimpleFeatureType

  2. 准备Kafka主题以写入此类型的数据

  3. 创建数千个SimpleFeature示例

  4. 将这些SimpleFeature写入卡夫卡主题

  5. 使用Kafka Streams使用主题中的这些功能

  6. 处理使用的要素

  7. 将结果写回相同的卡夫卡主题

  8. 在Geoserver中可视化更改的数据(可选)

快速入门通过同时查询和编写数千个功能更新来运行。该数据包含两个实体的位置信息。每个实体都使用唯一的特征标识符。实体的更新将重复使用该实体的功能标识符以使每个实体只有一个活动功能。

Kafka Streams拓扑将配置为在两个实体彼此具有距离阈值时查找实例。这些邻近事件使用唯一的要素标识符写回相同的主题,以便它们保留在层中。

使用的数据是沿着I64州际公路在夏洛茨维尔和弗吉尼亚州里士满之间的模拟驾驶。

背景

Apache Kafka 是“将发布-订阅消息传递重新考虑为分布式提交日志”。

在GeoMesa的背景下,Kafka是处理地理空间数据流的有用工具。在GeoMesa中通过实现GeoTool的KafkaDataStore与Kafka进行交互 DataStore 界面。

此外,GeoMesa还配置了一个Kafka Streams拓扑来读取、处理和写入数据到主题。有关Kafka Streams的更多信息,请参阅 official documentation

先决条件

在开始之前,您必须安装并配置以下软件:

  • Java JDK 1.8

  • Apache Maven 3.6 or later

  • GitHub客户端

  • 卡夫卡 2.0 or later 聚类

确保您的Kafka和ZooKeeper实例正在运行。你可以用卡夫卡的 quickstart 快速启动和运行Kafka/ZooKeeper实例。

配置地理服务器(可选)

您可以使用Geoserver访问和可视化存储在GeoMesa中的数据。要使用Geoserver,请下载并安装版本 2.22.2 。然后按照中的说明操作 在Geoserver中安装GeoMesa Kafka 以启用GeoMesa。

下载并构建教程

在您的计算机上选择一个合理的目录,然后运行:

$ git clone https://github.com/geomesa/geomesa-tutorials.git
$ cd geomesa-tutorials

警告

确保下载或检出与您的GeoMesa版本对应的教程项目版本。看见 关于教程版本 了解更多详细信息。

若要确保快速入门适用于您的环境,请修改 pom.xml 为卡夫卡、ZooKeeper等设置合适的版本。

为了便于使用,该项目构建了一个捆绑的构件,该构件在单个JAR中包含所有必需的依赖项。要构建,请运行:

$ mvn clean install -pl geomesa-tutorials-kafka/geomesa-tutorials-kafka-streams-quickstart -am

运行教程

在命令行上,运行:

$ java -cp geomesa-tutorials-kafka/geomesa-tutorials-kafka-streams-quickstart/target/geomesa-tutorials-kafka-streams-quickstart-$VERSION.jar \
    org.geomesa.example.kafka.KafkaStreamsQuickStart \
    --kafka.brokers <brokers>                 \
    --kafka.zookeepers <zookeepers>

其中,您可以提供以下参数:

  • <brokers> your Kafka broker instances, comma separated. For a local install, this would be localhost:9092

  • <zookeepers> your Zookeeper nodes, comma separated. For a local install, this would be localhost:2181

或者,您也可以指定快速入门在完成时删除其数据。使用 --cleanup 标志,当您运行以启用此行为时。

一旦运行,快速入门将创建卡夫卡主题,然后暂停并提示您在Geoserver中注册该层。如果您不想使用Geoserver,则可以跳过此步骤。否则,请在返回此处之前按照下一节中的说明进行操作。

继续后,教程应该运行大约30秒。您应该会看到以下输出:

Loading datastore

Loading datastore

Creating schema: entityId:String,dtg:Date,geom:Point

Generating test data

Configuring Streams Topology
Feature type created - register the layer 'cvilleric-quickstart' in geoserver with bounds: MinX[-78.4696824929457] MinY[37.532442090296044] MaxX[-77.42668269989638] MaxY[38.03920921521279]
Press <enter> to continue


Writing features to Kafka... refresh GeoServer layer preview to see changes
Current consumer state:
a=a|2022-09-21T21:03:02.675Z|POINT (-78.2742794712714 37.995618168053184)
b=b|2022-09-21T21:03:02.675Z|POINT (-77.56747216770198 37.6305975318267)
Current consumer state:
a=a|2022-09-21T21:28:02.675Z|POINT (-78.01751112645616 37.872800086051654)
b=b|2022-09-21T21:28:02.675Z|POINT (-77.87883454073382 37.772794168668476)
Current consumer state:
b=b|2022-09-21T21:53:02.675Z|POINT (-78.14780655790103 37.95424382536054)
a=a|2022-09-21T21:53:02.675Z|POINT (-77.711327871061 37.694257161353974)
proximity0ab51dd3-2e48-4827-9388-c76c7f95279b=proximity-a-b|2022-09-21T21:35:02.675Z|POINT (-77.94037514437152 37.81389651562376)
proximity911fd4dd-40c8-4336-90aa-0315e4d896b5=proximity-b-a|2022-09-21T21:33:02.675Z|POINT (-77.94037514437152 37.81389651562376)
proximity70a19c33-8d77-4539-b2a0-5d4f0abfcd9a=proximity-a-b|2022-09-21T21:33:02.675Z|POINT (-77.96397858370257 37.828337948614255)
proximityaef4c251-9edb-4d96-8a1a-65da5a40c11d=proximity-b-a|2022-09-21T21:34:02.675Z|POINT (-77.95393639315081 37.82182948351288)
proximity3025cd2b-699a-4625-9760-2781acf98edf=proximity-a-b|2022-09-21T21:34:02.675Z|POINT (-77.95393639315081 37.82182948351288)
proximity0eb6874d-19c1-4c55-887f-ff8e50455662=proximity-b-a|2022-09-21T21:35:02.675Z|POINT (-77.96397858370257 37.828337948614255)
Current consumer state:
b=b|2022-09-21T22:18:02.675Z|POINT (-78.40589688999782 38.018104630123695)
a=a|2022-09-21T22:18:02.675Z|POINT (-77.46880947199425 37.579440835126896)
proximity0ab51dd3-2e48-4827-9388-c76c7f95279b=proximity-a-b|2022-09-21T21:35:02.675Z|POINT (-77.94037514437152 37.81389651562376)
proximity911fd4dd-40c8-4336-90aa-0315e4d896b5=proximity-b-a|2022-09-21T21:33:02.675Z|POINT (-77.94037514437152 37.81389651562376)
proximity70a19c33-8d77-4539-b2a0-5d4f0abfcd9a=proximity-a-b|2022-09-21T21:33:02.675Z|POINT (-77.96397858370257 37.828337948614255)
proximityaef4c251-9edb-4d96-8a1a-65da5a40c11d=proximity-b-a|2022-09-21T21:34:02.675Z|POINT (-77.95393639315081 37.82182948351288)
proximity3025cd2b-699a-4625-9760-2781acf98edf=proximity-a-b|2022-09-21T21:34:02.675Z|POINT (-77.95393639315081 37.82182948351288)
proximity0eb6874d-19c1-4c55-887f-ff8e50455662=proximity-b-a|2022-09-21T21:35:02.675Z|POINT (-77.96397858370257 37.828337948614255)


Done

使用Geoserver可视化数据(可选)

您可以使用Geoserver访问和可视化存储在GeoMesa中的数据。要使用Geoserver,请下载并安装版本 2.22.2 。然后按照中的说明操作 在Geoserver中安装GeoMesa Kafka 以启用GeoMesa。

向Geoserver注册GeoMesa商店

使用您的用户和密码凭据登录到Geoserver。点击“商店”和“添加新商店”。选择 Kafka (GeoMesa) 矢量数据源,并填写所需参数。

基本店铺信息:

  • workspace 这取决于您的Geoserver安装

  • data source name pick a sensible name, such as geomesa_quick_start

  • description this is strictly decorative; GeoMesa quick start

连接参数:

  • 这些参数值与您在运行本教程时在命令行上提供的参数值相同;它们描述了如何连接到数据所在的Kafka实例

点击“保存”,Geoserver将在ZooKeeper中搜索任何GeoMesa管理的要素类型。

发布该层

如果您已经运行了该命令来启动教程,那么Geoserver应该会识别 cvilleric-quickstart 要素类型,并应将其显示为可发布的图层。点击“发布”链接。如果不是,则按照上文中所述运行教程 Running the Tutorial 。当教程暂停时,转到“层”和“添加新层”。选择您刚刚创建的GeoMesa Kafka商店,然后在 cvilleric-quickstart 一层。

您将被带到编辑层屏幕。您需要为数据边框输入值。对于此演示,请使用值Minx:-78.46969、Miny:37.53245、Maxx:-77.42669、Maxy:38.03921。

完成后,请单击“保存”按钮。

设置层的样式(可选)

为了更好地可视化输入数据和由Kafka Stream拓扑生成的数据之间的交互,应用一些简单的样式规则会很有帮助。要做到这一点,首先创建一个新样式。

点击“样式”和“添加新样式”。给它一个合理的名称,并将格式设置为css。将以下css插入到编辑器窗口中。

* {
    mark: symbol(circle);
    mark-size: 9px;
    fill: #1e8003;
}

[entityId = 'a'] :mark {
    fill: #AD0000;
}

[entityId = 'b'] :mark {
    fill: #001AAD;
}

单击“提交”以保存该样式。接下来,必须将样式添加到层中并设置为默认样式。在“层”下,选择你创建的层。在“发布”选项卡上的“WMS设置”和“层设置”下,将“默认样式”设置为您创建的样式。在页面底部,单击“保存”继续。

瞧一瞧

点击左手边沟中的“层预览”链接。如果在结果的第一页上看不到快速启动图层,请在搜索框中输入刚创建的图层的名称,然后按 <Enter>

起初,不会显示任何数据。达到此点后,返回快速入门控制台,然后按“<Enter>”继续本教程。随着数据在Kafka中的更新,您可以刷新图层预览页面以查看要素的移动。

Geoserver发生了什么

Geoserver的层预览使用 KafkaFeatureStore 以显示数据流当前状态的实时视图。有两个 SimpleFeatures 随着时间的推移在Kafka中更新,这反映在Geoserver显示屏上。

刷新页面时,您应该会看到 SimpleFeatures 四处走动。当这两个点(下面的红点和蓝点)彼此靠近时,您将看到 SimpleFeatures 表示添加到数据流的邻近事件(下面的灰点)。这些功能将保持不变,因为没有使用相同功能ID发送的更新。

使用Geoserver可视化快速入门数据

使用Geoserver可视化快速入门数据

看《守则》

源代码对于本教程来说是可以访问的。该逻辑包含在泛型 org.geomesa.example.quickstart.GeoMesaQuickStartgeomesa-quickstart-common 模块和特定于Kafka-Streams的 org.geomesa.example.kafka.KafkaStreamsQuickStartgeomesa-quickstart-kafka-streams 模块。一些相关的方法包括:

  • createDataStore 中被重写 KafkaQuickStart 使用输入配置获取一对数据存储实例,一个用于写入数据,另一个用于读取数据。此外, GeoMesaStreamsBuilder 用于创建Kafka Streams拓扑构建器。

  • createSchema 在数据存储中创建架构,作为写入数据的前提条件

  • writeFeatures 中被重写 KafkaQuickStart 同时从Kafka写入和读取要素以及设置和运行STREAMS拓扑

  • queryFeatures 在本教程中未使用

  • cleanup 删除示例数据并处置数据存储实例

将数据解析为GeoTools SimpleFeature的代码包含在 org.geomesa.example.data.CvilleRICData

  • getSimpleFeatureType 创建 SimpleFeatureType 表示数据

  • getTestData 解析嵌入的CSV文件以创建 SimpleFeature 对象

  • getTestQueries 在本教程中未使用

溪流拓扑

中的代码 setupStreams 使用GeoMesa Kafka Streams集成构建Kafka Streams拓扑。这个 GeoMesaStreamsBuilder 类包装了一个内部Kafka StreamsBuilder 举个例子。这使得GeoMesa能够提供卡夫卡 Serde 当向基础Kafka主题读取和写入数据时,并提供 TimestampExtractor 适用于 SimpleFeatureType 。此外,GeoMesa能够为给定的TypeName解析正确的Kafka主题。

QuickStart拓扑会将数据从QuickStart主题读取到 KStream ,利用 SerdeTimestampExtractor 来自格罗梅萨的。

KStream<String, GeoMesaMessage> input = builder.stream(typeName);

接下来,对输入流进行筛选,以删除不是对两个实体进行更新的任何消息。如果不执行此步骤,将允许我们稍后编写的邻近消息被拓扑拾取和处理。过滤后,重新设置数据的关键字。这个 GeoPartitioner 类是一个 KeyValueMapper 它用于为每条记录选择一个新键。通过利用GeoMesa来确定新密钥 Z2SFC 以确定给定记录包含在哪个地理空间Z-Bin中。有关Z2曲线和索引的更多信息,请参阅 索引概述 。更改记录键将导致Kafka Stream对数据流进行重新分区。这将创建一个中间主题,但将确保数据与空间上邻近的其他数据位于同一位置。

KStream<String, GeoMesaMessage> geoPartioned = input
    .filter((k, v) -> !Objects.equals(getFID(v), "") && !getFID(v).startsWith(proximityId))
    .selectKey(new GeoPartitioner(numbits, defaultGeomIndex));

要找出一个点是否在另一个点附近,需要计算到其他点的距离。要找到一组点中的所有邻接关系,需要所有点的笛卡尔乘积。这可能是一项非常昂贵的操作,因此减少需要比较的点数很重要。通过对数据进行空间分区,我们可以通过排除空间区域来减少比较次数。只需要计算共享相同Z-Bin的记录的笛卡尔乘积(本教程忽略Z-Bin边界的问题)。

快速入门下一步使用地理分区 KStream 要使用现在的空间键执行自连接,请执行以下操作。这允许我们为需要评估的每个比较创建一个邻近对象。

自联接本质上会将一条记录联接到自身。过滤步骤首先删除它们,然后执行实际的邻近度计算和阈值检查。最后,我们将 Proximity 事件进入 GeoMesaMessage 并设置一个关键点,表示它是接近信息(在上一步过滤步骤中使用)。

KStream<String, GeoMesaMessage> proximities = geoPartioned
    .join(geoPartioned,
        (left, right) -> new Proximity(left, right, defaultGeomIndex),
        JoinWindows.of(Duration.ofMinutes(2)),
        StreamJoined.with(Serdes.String(), serde, serde))
    .filter((k, v) -> v.areDifferent() && v.getDistance() < proximityDistanceMeters)
    .mapValues(Proximity::toGeoMesaMessage)
    .selectKey((k, v) -> proximityId + UUID.randomUUID());

最后, GeoMesaStreamsBuilder 再次用于从提供的typeName配置目标主题,并处理 Serde 对我们来说。

builder.to(typeName, proximities);