GeoMesa Storm快速入门¶
阿帕奇风暴是“一个免费和开源的分布式实时计算系统”。
您可以利用Storm近乎实时地分析数据并将数据摄取到GeoMesa中。在本教程中,我们将:
使用ApacheKakfa将消息发送到Storm拓扑。
使用Storm解析开放式街道地图(OSM)数据文件,并将其摄取到Acumulo中。
利用Geoserver查询和可视化数据。
先决条件¶
您将需要访问以下内容:
累积式运算的一个实例 2.0 or 2.1 ,
具有CREATE-TABLE和WRITE权限的Acumulo用户,
卡夫卡 2.0 or later 集群,
安装Storm 0.9+,以及
Geoserver的一个实例 2.22.2 使用GeoMesa Acumulo插件。安装好
要安装GeoMesa Acumulo Geoserver插件,请参阅 在Geoserver中安装GeoMesa Acumulo 。
您还需要:
这个 xz 数据压缩工具,
Apache Maven 3.6 or later, and
一个 git 客户。
下载并构建教程¶
在您的计算机上选择一个合理的目录,然后运行:
$ git clone https://github.com/geomesa/geomesa-tutorials.git
$ cd geomesa-tutorials
备注
您可能需要下载教程项目的特定版本才能针对特定的GeoMesa版本。
要构建,请运行
$ mvn clean install -pl geomesa-quickstart-storm
备注
确保根目录中的Acumulo、Hadoop、Storm等版本 pom.xml
与您的环境相匹配。
备注
根据版本的不同,您可能还需要在本地构建GeoMesa。有关说明,请参阅 安装 。
获取OSM数据¶
在本演示中,我们将使用 simple-gps-points
OSM data that contains only the location of an observation. Download the OSM 数据 here 。
备注
文件大小约为7 GB。
使用以下命令解压数据:
$ xz simple-gps-points-120312.txt.xz
部署接收拓扑¶
快速入门拓扑将从卡夫卡主题中读取消息,将它们解析为 SimpleFeature
S,并把它们写给阿库库洛。
使用 storm jar
要将拓扑提交到Storm实例,请执行以下操作:
$ storm jar geomesa-quickstart-storm/target/geomesa-quickstart-storm-$VERSION.jar \
com.example.geomesa.storm.OSMIngest \
-instanceId <accumulo-instance-id> \
-zookeepers <zookeepers> \
-user <accumulo username> \
-password <accumulo password> \
-tableName OSM \
-featureName event \
-topic OSM
在系统中运行数据¶
我们使用Kafka作为Storm拓扑的输入。首先,创建一个主题来发送数据:
对于Kafka 0.8,使用以下命令。
$ kafka-create-topic.sh \
--zookeeper <zookeepers> \
--replica 3 \
--partition 10 \
--topic OSM
对于Kafka 0.9+,请使用以下命令。
$ kafka-topics.sh \
--create \
--zookeeper localhost \
--replication-factor 3 \
--partitions 10 \
--topic OSM
请注意,我们创建了一个具有多个分区的主题,以便并行处理来自生产者端和消费者(Storm)端的摄取。
接下来,使用教程代码将OSM文件作为一系列Kafka消息发送:
$ java -cp geomesa-quickstart-storm/target/geomesa-quickstart-storm-$VERSION.jar \
com.example.geomesa.storm.OSMIngestProducer \
-ingestFile simple-gps-points-120312.txt \
-topic OSM \
-brokers <kafka broker list>
请注意,Kafka的默认分区程序类基于所提供的键的散列来分配消息分区。如果未提供密钥,则为所有消息分配相同的分区。
for (String x = bufferedReader.readLine(); x != null; x = bufferedReader.readLine()) {
producer.send(new KeyedMessage<String, String>(topic, String.valueOf(rnd.nextInt()), x));
}
风暴喷口和螺栓¶
在快速启动代码中,Storm Spout
S消费卡夫卡话题消息,通过摄取拓扑发送:
public void nextTuple() {
if (kafkaIterator.hasNext()) {
List<Object> messages = new ArrayList<Object>();
messages.add(kafkaIterator.next().message());
_collector.emit(messages);
}
}
这个 Bolt
S解析消息,创建并撰写 SimpleFeature
S。在 prepare
的方法。 Bolt
类中,我们获取在构造函数中初始化的连接参数,并在 FeatureWriter
:
ds = DataStoreFinder.getDataStore(connectionParams);
SimpleFeatureType featureType = ds.getSchema(featureName);
featureBuilder = new SimpleFeatureBuilder(featureType);
featureWriter = ds.getFeatureWriter(featureName, Transaction.AUTO_COMMIT);
对的输入 Bolt
的Execute方法是一个 Tuple
包含一个 String
。我们平分了 String
在‘%’上获得个人积分。对于每个点,我们使用逗号分隔以提取属性。我们解析纬度和经度字段以设置 SimpleFeature
。请注意,OSM纬度和经度值存储为必须除以107的整数。
private Geometry getGeometry(final String[] attributes) {
...
final Double lat = (double) Integer.parseInt(attributes[LATITUDE_COL_IDX]) / 1e7;
final Double lon = (double) Integer.parseInt(attributes[LONGITUDE_COL_IDX]) / 1e7;
return geometryFactory.createPoint(new Coordinate(lon, lat));
}
public void execute(Tuple tuple) {
...
featureBuilder.reset();
final SimpleFeature simpleFeature = featureBuilder.buildFeature(String.valueOf(UUID.randomUUID().getMostSignificantBits()));
simpleFeature.setDefaultGeometry(getGeometry(attributes));
try {
final SimpleFeature next = featureWriter.next();
for (int i = 0; i < simpleFeature.getAttributeCount(); i++) {
next.setAttribute(i, simpleFeature.getAttribute(i));
}
((FeatureIdImpl) next.getIdentifier()).setID(simpleFeature.getID());
featureWriter.write();
} catch (Exception e) {
...
}
}
在Geoserver中注册该图层¶
使用您的凭据登录到Geoserver。点击左侧边沟的“Stores”和“Add new Store”。如果未在矢量数据源下看到Acumulo Data Store,请确保插件和依存关系位于正确的目录中,然后重新启动Geoserver。
选择 Accumulo (GeoMesa)
向量数据源,并使用上面使用的命令行参数配置它。使用 geomesa
作为工作空间-如果您使用不同的内容,则需要修改下面的WMS请求。
将所有其他字段留空或保留缺省值。
单击“保存”,Geoserver将在您的数据存储中搜索任何可用的要素类型。
发布该层¶
Geoserver应该找到 OSM
输入要素类型并将其显示为可以发布的图层。点击“发布”链接。您将被带到编辑层屏幕。
您可以将大多数字段保留为默认字段。在数据窗格中,您需要输入边界框的值。在这种情况下,您可以单击链接以根据数据计算这些值。点击“保存”。
将数据可视化¶
让我们来看看芝加哥的事件。默认点样式是不符合我们的目的的红色正方形。添加 OSMPoint.sld
文件到Geoserver,然后浏览到以下URL:
http://localhost:8080/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=geomesa:OSM&styles=OSMPoint&bbox=-87.63,41.88,-87.61,41.9&width=1400&height=600&srs=EPSG:4326&format=application/openlayers

显示2012年3月12日之前在芝加哥举行的所有OSM活动¶
热图¶
使用热图可以更清晰地显示同一位置的大量数据。
备注
热点图样式要求在您的Geoserver中安装Geoserver WPS插件,如中所述 GeoMesa过程 。
添加 heatmap.sld
文件到Geoserver,然后浏览到以下URL:
http://localhost:8080/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=geomesa:OSM&styles=heatmap&bbox=-87.63,41.88,-87.61,41.9&width=1400&height=600&srs=EPSG:4326&format=application/openlayers

显示2012年3月12日之前芝加哥的OSM活动热图¶
结论¶
尽管这款快速入门软件使用静态文件作为输入,但Storm在读取实时数据方面表现出色。当数据进入时,Storm拓扑可以解析它并将其摄取到GeoMesa中进行检索。可以对拓扑中的数据运行其他分析,以进一步增强或通知输出。对于实时可视化,GeoMesa还支持由Kafka而不是Acumulo提供支持的地图。请参阅 GeoMesa Kafka快速入门 教程以了解更多详细信息。