GeoMesa Storm快速入门

阿帕奇风暴是“一个免费和开源的分布式实时计算系统”。

您可以利用Storm近乎实时地分析数据并将数据摄取到GeoMesa中。在本教程中,我们将:

  1. 使用ApacheKakfa将消息发送到Storm拓扑。

  2. 使用Storm解析开放式街道地图(OSM)数据文件,并将其摄取到Acumulo中。

  3. 利用Geoserver查询和可视化数据。

先决条件

您将需要访问以下内容:

  • 累积式运算的一个实例 2.0 or 2.1 ,

  • 具有CREATE-TABLE和WRITE权限的Acumulo用户,

  • 卡夫卡 2.0 or later 集群,

  • 安装Storm 0.9+,以及

  • Geoserver的一个实例 2.22.2 使用GeoMesa Acumulo插件。安装好

要安装GeoMesa Acumulo Geoserver插件,请参阅 在Geoserver中安装GeoMesa Acumulo

您还需要:

下载并构建教程

在您的计算机上选择一个合理的目录,然后运行:

$ git clone https://github.com/geomesa/geomesa-tutorials.git
$ cd geomesa-tutorials

备注

您可能需要下载教程项目的特定版本才能针对特定的GeoMesa版本。

要构建,请运行

$ mvn clean install -pl geomesa-quickstart-storm

备注

确保根目录中的Acumulo、Hadoop、Storm等版本 pom.xml 与您的环境相匹配。

备注

根据版本的不同,您可能还需要在本地构建GeoMesa。有关说明,请参阅 安装

获取OSM数据

在本演示中,我们将使用 simple-gps-points OSM data that contains only the location of an observation. Download the OSM 数据 here

备注

文件大小约为7 GB。

使用以下命令解压数据:

$ xz simple-gps-points-120312.txt.xz

部署接收拓扑

快速入门拓扑将从卡夫卡主题中读取消息,将它们解析为 SimpleFeature S,并把它们写给阿库库洛。

使用 storm jar 要将拓扑提交到Storm实例,请执行以下操作:

$ storm jar geomesa-quickstart-storm/target/geomesa-quickstart-storm-$VERSION.jar \
    com.example.geomesa.storm.OSMIngest \
    -instanceId <accumulo-instance-id>  \
    -zookeepers <zookeepers>            \
    -user <accumulo username>           \
    -password <accumulo password>       \
    -tableName OSM                      \
    -featureName event                  \
    -topic OSM

在系统中运行数据

我们使用Kafka作为Storm拓扑的输入。首先,创建一个主题来发送数据:

对于Kafka 0.8,使用以下命令。

$ kafka-create-topic.sh      \
    --zookeeper <zookeepers> \
    --replica 3              \
    --partition 10           \
    --topic OSM

对于Kafka 0.9+,请使用以下命令。

$ kafka-topics.sh          \
    --create               \
    --zookeeper localhost  \
    --replication-factor 3 \
    --partitions 10        \
    --topic OSM

请注意,我们创建了一个具有多个分区的主题,以便并行处理来自生产者端和消费者(Storm)端的摄取。

接下来,使用教程代码将OSM文件作为一系列Kafka消息发送:

$ java -cp geomesa-quickstart-storm/target/geomesa-quickstart-storm-$VERSION.jar \
    com.example.geomesa.storm.OSMIngestProducer \
    -ingestFile simple-gps-points-120312.txt    \
    -topic OSM                                  \
    -brokers <kafka broker list>

请注意,Kafka的默认分区程序类基于所提供的键的散列来分配消息分区。如果未提供密钥,则为所有消息分配相同的分区。

for (String x = bufferedReader.readLine(); x != null; x = bufferedReader.readLine()) {
    producer.send(new KeyedMessage<String, String>(topic, String.valueOf(rnd.nextInt()), x));
}

风暴喷口和螺栓

在快速启动代码中,Storm Spout S消费卡夫卡话题消息,通过摄取拓扑发送:

public void nextTuple() {
    if (kafkaIterator.hasNext()) {
        List<Object> messages = new ArrayList<Object>();
        messages.add(kafkaIterator.next().message());
        _collector.emit(messages);
    }
}

这个 Bolt S解析消息,创建并撰写 SimpleFeature S。在 prepare 的方法。 Bolt 类中,我们获取在构造函数中初始化的连接参数,并在 FeatureWriter

ds = DataStoreFinder.getDataStore(connectionParams);
SimpleFeatureType featureType = ds.getSchema(featureName);
featureBuilder = new SimpleFeatureBuilder(featureType);
featureWriter = ds.getFeatureWriter(featureName, Transaction.AUTO_COMMIT);

对的输入 Bolt 的Execute方法是一个 Tuple 包含一个 String 。我们平分了 String 在‘%’上获得个人积分。对于每个点,我们使用逗号分隔以提取属性。我们解析纬度和经度字段以设置 SimpleFeature 。请注意,OSM纬度和经度值存储为必须除以107的整数。

private Geometry getGeometry(final String[] attributes) {
    ...
    final Double lat = (double) Integer.parseInt(attributes[LATITUDE_COL_IDX]) / 1e7;
    final Double lon = (double) Integer.parseInt(attributes[LONGITUDE_COL_IDX]) / 1e7;
    return geometryFactory.createPoint(new Coordinate(lon, lat));
}

public void execute(Tuple tuple) {
    ...
    featureBuilder.reset();
    final SimpleFeature simpleFeature = featureBuilder.buildFeature(String.valueOf(UUID.randomUUID().getMostSignificantBits()));
    simpleFeature.setDefaultGeometry(getGeometry(attributes));

    try {
        final SimpleFeature next = featureWriter.next();
        for (int i = 0; i < simpleFeature.getAttributeCount(); i++) {
            next.setAttribute(i, simpleFeature.getAttribute(i));
        }
        ((FeatureIdImpl) next.getIdentifier()).setID(simpleFeature.getID());
        featureWriter.write();
    } catch (Exception e) {
      ...
    }
}

在Geoserver中注册该图层

使用您的凭据登录到Geoserver。点击左侧边沟的“Stores”和“Add new Store”。如果未在矢量数据源下看到Acumulo Data Store,请确保插件和依存关系位于正确的目录中,然后重新启动Geoserver。

选择 Accumulo (GeoMesa) 向量数据源,并使用上面使用的命令行参数配置它。使用 geomesa 作为工作空间-如果您使用不同的内容,则需要修改下面的WMS请求。

将所有其他字段留空或保留缺省值。

单击“保存”,Geoserver将在您的数据存储中搜索任何可用的要素类型。

发布该层

Geoserver应该找到 OSM 输入要素类型并将其显示为可以发布的图层。点击“发布”链接。您将被带到编辑层屏幕。

您可以将大多数字段保留为默认字段。在数据窗格中,您需要输入边界框的值。在这种情况下,您可以单击链接以根据数据计算这些值。点击“保存”。

将数据可视化

让我们来看看芝加哥的事件。默认点样式是不符合我们的目的的红色正方形。添加 OSMPoint.sld 文件到Geoserver,然后浏览到以下URL:

http://localhost:8080/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=geomesa:OSM&styles=OSMPoint&bbox=-87.63,41.88,-87.61,41.9&width=1400&height=600&srs=EPSG:4326&format=application/openlayers
显示2012年3月12日之前在芝加哥举行的所有OSM活动

显示2012年3月12日之前在芝加哥举行的所有OSM活动

热图

使用热图可以更清晰地显示同一位置的大量数据。

备注

热点图样式要求在您的Geoserver中安装Geoserver WPS插件,如中所述 GeoMesa过程

添加 heatmap.sld 文件到Geoserver,然后浏览到以下URL:

http://localhost:8080/geoserver/wms?service=WMS&version=1.1.0&request=GetMap&layers=geomesa:OSM&styles=heatmap&bbox=-87.63,41.88,-87.61,41.9&width=1400&height=600&srs=EPSG:4326&format=application/openlayers
显示2012年3月12日之前芝加哥的OSM活动热图

显示2012年3月12日之前芝加哥的OSM活动热图

结论

尽管这款快速入门软件使用静态文件作为输入,但Storm在读取实时数据方面表现出色。当数据进入时,Storm拓扑可以解析它并将其摄取到GeoMesa中进行检索。可以对拓扑中的数据运行其他分析,以进一步增强或通知输出。对于实时可视化,GeoMesa还支持由Kafka而不是Acumulo提供支持的地图。请参阅 GeoMesa Kafka快速入门 教程以了解更多详细信息。