GeoMesa Kafka快速入门¶
本教程是使用Kafka进行数据流传输的GeoMesa入门的最快、最简单的方法。它是通向其他教程的一个很好的垫脚石,这些教程提供了如何使用GeoMesa的越来越复杂的示例。
关于本教程¶
本着简单的精神,本教程中的代码只做了几件小事:
建立新的(静态)SimpleFeatureType
准备Kafka主题以写入此类型的数据
创建数千个SimpleFeature示例
将这些SimpleFeature写入卡夫卡主题
在Geoserver中可视化更改的数据(可选)
为SimpleFeature更新创建事件侦听器(可选)
快速入门通过同时查询和编写数千个功能更新来运行。每次更新都使用相同的功能标识,因此在任何时候都只有一个“活动”功能。
使用的数据来自伊利诺伊大学发布的纽约市出租车活动数据。有关数据集的更多信息可用 here 。
在此演示中,只有一辆出租车被跟踪。
背景¶
Apache Kafka 是“将发布-订阅消息传递重新考虑为分布式提交日志”。
在GeoMesa的背景下,Kafka是处理地理空间数据流的有用工具。在GeoMesa中通过实现GeoTool的KafkaDataStore与Kafka进行交互 DataStore 界面。
先决条件¶
在开始之前,您必须安装并配置以下软件:
确保您的Kafka和ZooKeeper实例正在运行。你可以用卡夫卡的 quickstart 快速启动和运行Kafka/ZooKeeper实例。
配置地理服务器(可选)¶
您可以使用Geoserver访问和可视化存储在GeoMesa中的数据。要使用Geoserver,请下载并安装版本 2.22.2 。然后按照中的说明操作 在Geoserver中安装GeoMesa Kafka 以启用GeoMesa。
下载并构建教程¶
在您的计算机上选择一个合理的目录,然后运行:
$ git clone https://github.com/geomesa/geomesa-tutorials.git
$ cd geomesa-tutorials
警告
确保下载或检出与您的GeoMesa版本对应的教程项目版本。看见 关于教程版本 了解更多详细信息。
若要确保快速入门适用于您的环境,请修改 pom.xml
为卡夫卡、ZooKeeper等设置合适的版本。
为了便于使用,该项目构建了一个捆绑的构件,该构件在单个JAR中包含所有必需的依赖项。要构建,请运行:
$ mvn clean install -pl geomesa-tutorials-kafka/geomesa-tutorials-kafka-quickstart -am
运行教程¶
在命令行上,运行:
$ java -cp geomesa-tutorials-kafka/geomesa-tutorials-kafka-quickstart/target/geomesa-tutorials-kafka-quickstart-$VERSION.jar \
org.geomesa.example.kafka.KafkaQuickStart \
--kafka.brokers <brokers> \
--kafka.zookeepers <zookeepers>
其中,您可以提供以下参数:
<brokers>
your Kafka broker instances, comma separated. For a local install, this would belocalhost:9092
<zookeepers>
your Zookeeper nodes, comma separated. For a local install, this would belocalhost:2181
或者,您也可以指定快速入门在完成时删除其数据。使用 --cleanup
标志,当您运行以启用此行为时。
一旦运行,快速入门将创建卡夫卡主题,然后暂停并提示您在Geoserver中注册该层。如果您不想使用Geoserver,则可以跳过此步骤。否则,请在返回此处之前按照下一节中的说明进行操作。
继续后,教程应该运行大约30秒。您应该会看到以下输出:
Loading datastore
Creating schema: taxiId:String,dtg:Date,geom:Point
Generating test data
Feature type created - register the layer 'tdrive-quickstart' in geoserver with bounds: MinX[116.22366] MinY[39.72925] MaxX[116.58804] MaxY[40.09298]
Press <enter> to continue
Writing features to Kafka... refresh GeoServer layer preview to see changes
Current consumer state:
1277=1277|2008-02-03T04:32:53.000Z|POINT (116.35 39.90003)
Current consumer state:
1277=1277|2008-02-03T17:58:49.000Z|POINT (116.38812 39.93196)
Current consumer state:
1277=1277|2008-02-04T06:46:26.000Z|POINT (116.40218 39.94439)
Current consumer state:
1277=1277|2008-02-04T19:55:45.000Z|POINT (116.3631 39.94646)
Current consumer state:
1277=1277|2008-02-05T09:39:48.000Z|POINT (116.58264 40.07556)
Current consumer state:
1277=1277|2008-02-05T22:24:50.000Z|POINT (116.34112 39.95363)
Current consumer state:
1277=1277|2008-02-06T14:17:29.000Z|POINT (116.54203 39.91476)
Current consumer state:
1277=1277|2008-02-07T02:53:55.000Z|POINT (116.35683 39.89809)
Current consumer state:
1277=1277|2008-02-07T15:48:47.000Z|POINT (116.36785 39.99471)
Current consumer state:
1277=1277|2008-02-08T04:20:19.000Z|POINT (116.42872 39.91531)
Current consumer state:
1277=1277|2008-02-08T17:14:15.000Z|POINT (116.34609 39.93924)
Done
使用Geoserver可视化数据(可选)¶
您可以使用Geoserver访问和可视化存储在GeoMesa中的数据。要使用Geoserver,请下载并安装版本 2.22.2 。然后按照中的说明操作 在Geoserver中安装GeoMesa Kafka 以启用GeoMesa。
向Geoserver注册GeoMesa商店¶
使用您的用户和密码凭据登录到Geoserver。点击“商店”和“添加新商店”。选择 Kafka (GeoMesa)
矢量数据源,并填写所需参数。
基本店铺信息:
workspace
这取决于您的Geoserver安装data source name
pick a sensible name, such asgeomesa_quick_start
description
this is strictly decorative;GeoMesa quick start
连接参数:
这些参数值与您在运行本教程时在命令行上提供的参数值相同;它们描述了如何连接到数据所在的Kafka实例
点击“保存”,Geoserver将在ZooKeeper中搜索任何GeoMesa管理的要素类型。
发布该层¶
如果您已经运行了该命令来启动教程,那么Geoserver应该会识别 tdrive-quickstart
要素类型,并应将其显示为可发布的图层。点击“发布”链接。如果不是,则按照上文中所述运行教程 Running the Tutorial 。当教程暂停时,转到“层”和“添加新层”。选择您刚刚创建的GeoMesa Kafka商店,然后在 tdrive-quickstart
一层。
您将被带到编辑层屏幕。您需要为数据边框输入值。对于本演示,请使用值Minx:116.22366、Miny:39.72925、Maxx:116.58804、Maxy:40.09298。
完成后,请单击“保存”按钮。
瞧一瞧¶
点击左手边沟中的“层预览”链接。如果在结果的第一页上看不到快速启动图层,请在搜索框中输入刚创建的图层的名称,然后按 <Enter>
。
起初,不会显示任何数据。达到此点后,返回快速入门控制台,然后按“<Enter>”继续本教程。随着数据在Kafka中的更新,您可以刷新图层预览页面以查看要素的移动。
Geoserver发生了什么¶
Geoserver的层预览使用 KafkaFeatureStore
以显示数据流当前状态的实时视图。有一间单人房 SimpleFeature
随着时间的推移在Kafka中更新,这反映在Geoserver显示屏上。
刷新页面时,您应该会看到 SimpleFeature
四处走动。由于出租车路线的性质,以及回放数据的速度较快,移动没有太多模式。
看《守则》¶
源代码对于本教程来说是可以访问的。该逻辑包含在泛型 org.geomesa.example.quickstart.GeoMesaQuickStart
在 geomesa-quickstart-common
模块和特定于卡夫卡的 org.geomesa.example.kafka.KafkaQuickStart
在 geomesa-quickstart-kafka
模块。一些相关的方法包括:
createDataStore
中被重写KafkaQuickStart
使用输入配置获取一对数据存储实例,一个用于写入数据,另一个用于读取数据。createSchema
在数据存储中创建架构,作为写入数据的前提条件writeFeatures
中被重写KafkaQuickStart
同时写入和读取卡夫卡的特写queryFeatures
在本教程中未使用cleanup
删除示例数据并处置数据存储实例
QuickStart使用出租车数据的一小部分。将数据解析为GeoTools SimpleFeature的代码包含在 org.geomesa.example.data.TDriveData
:
getSimpleFeatureType
创建SimpleFeatureType
表示数据getTestData
解析嵌入的CSV文件以创建SimpleFeature
对象getTestQueries
在本教程中未使用
侦听功能事件(可选)¶
GeoTools API还包括一种机制来触发 FeatureEvent 每次发生事件时, DataStore
(typically when the data is changed). A client may implement a FeatureListener ,它只有一个名为 changed()
作为每个调用的 FeatureEvent
被解雇了。
中的代码 KafkaListener
实现了一个简单的 FeatureListener
打印收到的消息。打开第二个终端窗口并运行:
$ java -cp geomesa-tutorials-kafka/geomesa-tutorials-kafka-quickstart/target/geomesa-tutorials-kafka-quickstart-$VERSION.jar \
org.geomesa.example.kafka.KafkaListener \
--kafka.brokers <brokers> \
--kafka.zookeepers <zookeepers>
将相同的设置用于 <brokers>
和 <zookeepers>
就像你之前做的那样。然后在原始终端窗口中,重新运行 KafkaQuickStart
代码与之前一样。这个 KafkaListener
终端应产生如下消息:
Received FeatureEvent from schema 'tdrive-quickstart' of type 'CHANGED'
1277=1277|2008-02-02T13:34:51.000Z|POINT (116.32674 39.89577)
这个 KafkaListener
代码将一直运行,直到被中断(通常使用ctrl-c)。
的那部分 KafkaListener
它创建并实现 FeatureListener
是:
FeatureListener listener = featureEvent -> {
System.out.println("Received FeatureEvent from schema '" + typeName + "' of type '" + featureEvent.getType() + "'");
if (featureEvent.getType() == FeatureEvent.Type.CHANGED &&
featureEvent instanceof KafkaFeatureChanged) {
System.out.println(DataUtilities.encodeFeature(((KafkaFeatureChanged) featureEvent).feature()));
} else if (featureEvent.getType() == FeatureEvent.Type.REMOVED) {
System.out.println("Received Delete for filter: " + featureEvent.getFilter());
}
};
datastore.getFeatureSource(typeName).addFeatureListener(listener);
(请注意使用lambda表达式来创建侦听器)