17.8. 数据管理

17.8.1. 卡夫卡主题名称

每个SimpleFeatureType(或模式)都将写入一个唯一的Kafka主题。默认情况下,主题将基于 kafka.zk.path data store parameter and the SimpleFeatureType name, by appending the two together and replacing any / characters with `` -。例如,使用默认的ZooKeeper路径 (``geomesa/ds/kafka ),则SimpleFeatureType名称‘foo’将导致主题 geomesa-ds-kafka-foo

如果需要,可以通过设置用户数据密钥将主题名称设置为任意值 geomesa.kafka.topic 在呼叫之前 createSchema

SimpleFeatureType sft = ....;
sft.getUserData().put("geomesa.kafka.topic", "myTopicName");

有关如何设置架构选项的详细信息,请参见 设置架构选项

17.8.2. 卡夫卡主题配置

调用时将创建给定SimpleFeatureType的Kafka主题 createSchema (如果它还不存在)。GeoMesa通过数据存储参数公开了一些配置选项。可以通过设置用户数据密钥来配置其他选项 kafka.topic.config 在呼叫之前 createSchema

SimpleFeatureType sft = ....;
sft.getUserData().put("kafka.topic.config", "cleanup.policy=compact\nretention.ms=86400000");

该值应采用标准Java属性格式。有关可用配置的列表,请参阅 Kafka documentation 。有关如何设置架构选项的详细信息,请参见 设置架构选项

卡夫卡中的并行是通过使用多个主题划分来实现的。每个分区只能由一个Kafka消费者读取。消费者的数量可以通过 kafka.consumer.count 数据存储参数;但是,如果只有一个主题分区,这将不起作用。若要创建多个分区,请使用 kafka.topic.partitions 数据存储参数。

Kafka中的复制可确保数据不会丢失。要启用复制,请使用 kafka.topic.replication 数据存储参数。

17.8.3. 卡夫卡话题压缩

卡夫卡有多种选择来防止数据无限增长。最简单的方法是设置基于大小或时间的保留策略。当主题达到某个阈值时,这将导致较旧的消息被删除。

从GeoMesa 2.1.0开始,Kafka数据存储支持Kafka log compaction 。这允许管理主题大小,同时保留每个功能的最新状态。当与 初始加载(重播) ,系统的持久状态可以通过重启和停机时间来维护。请注意,在使用日志压缩时,为每个要素发送显式删除非常重要,否则该要素将永远不会从日志中压缩出来,并且日志大小将开始无限增长。

如果从2.1.0之前的GeoMesa版本升级,则在启用压缩之前,应使用基于大小或时间的保留策略运行该主题一段时间,因为使用较早版本的GeoMesa编写的消息永远不会被压缩。

17.8.4. 与其他系统集成

通过消费卡夫卡话题,卡夫卡数据存储很容易集成。这些消息是更新的更改日志。消息密钥由简单的功能ID组成,如UTF-8字节。消息正文是序列化的简单功能,或者为空以指示删除。内部序列化版本被设置为项下的消息头 "v"

默认情况下,消息正文使用自定义Kryo序列化程序进行序列化。对于Java/Scala客户端, org.locationtech.geomesa.features.kryo.KryoFeatureSerializer 类可用于对消息进行解码,可在 geomesa-feature-kryo 模块通过Maven.或者,也可以将生产者配置为通过 kafka.serialization.type 数据存储参数。Avro库以多种语言存在,并且Avro消息遵循允许跨平台解析的定义架构。

如果您正在使用合流平台管理Kafka,请参阅 融合集成