12.11. 实例

12.11.1. 在HBase中加入一个GeoMesa Kafka话题

常见的使用案例是将对象的最新状态存储在GeoMesa Kafka数据存储中,以驱动实时地图视图,并将对象的每个状态持久存储到HBase或Acumulo中的长期存储中,以用于历史记录和分析。此示例将展示如何从GeoMesa Kafka主题中提取数据,并以最小的配置将其持久化到HBase中。

12.11.1.1. 安装GeoMesa Nars

按照下面的步骤操作 安装 在您的NiFi实例中安装GeoMesa处理器。本教程需要使用GeoMesa Kafka和HBase Nars,以及标准的GeoMesa服务Nars。

12.11.1.2. 添加和配置处理器

第一步是向流中添加两个处理器,一个 GetGeoMesaKafkaRecord 处理器和AN AvroToPutGeoMesa 处理器。将输出连接到 Get 处理器的输入端 Put 处理器:

../../_images/nifi-kafka-to-hbase-flow.png

对于健壮的系统,您可能希望添加进一步的处理来处理成功和失败。在本例中,为简单起见,只需自动终止所有其他连接。

接下来,通过点击NiFi流上的‘配置’创建一个新的控制器服务,然后转到控制器服务选项卡并点击 + 按钮,然后选择 GeoAvroRecordSetWriterFactory 。这个 GeoAvroRecordSetWriterFactory 可以保留其默认配置,即使用NiFi表达式。这个 GetGeoMesaKafkaRecord 处理器将根据正在读取的要素类型填充必要的属性:

../../_images/nifi-avro-record-writer-config.png

这个 GetGeoMesaKafkaRecord 处理器将从GeoMesa Kafka主题中提取数据,并使用NiFi记录器将其写出。处理器可以以NiFi Record API支持的任何格式输出数据,但在本例中,我们将使用GeoAvro来简化对HBase的摄取。通过设置适当的Kafka连接参数和设置 GeoAvroRecordWriterFactory 您刚刚为输出创建了。设置唯一的Kafka群组ID,以确保处理器读取来自Theme的所有数据:

../../_images/nifi-get-kafka-record-config.png

这个 AvroToPutGeoMesa 处理器需要配置有 DataStoreService 为了HBase。返回到NiFi中的控制器服务选项卡,并添加另一个服务,这次选择 HBaseDataStoreService 。您将需要使用指向 hbase-site.xml 用于您的集群的文件,它将使其连接到HBase,以及一个目录表,它将在其中写出数据:

../../_images/nifi-hbase-service-config.png

启用控制器服务后,返回以配置 AvroToPutGeoMesa 处理器带有 HBaseDataStoreService 。集 Use provided feature ID 设置为False,因为我们希望保留所有历史要素,而不仅仅是每个要素的最新实例:

../../_images/nifi-avro-put-config.png

配置完所有处理器和控制器服务后,在NiFi用户界面中启用它们。如果一切正常,你会开始看到你的卡夫卡数据出现在HBase中。