17.3. 数据生产者

GeoMesa Kafka数据存储可以充当Kafka生产者,并将特写写入Kafka主题。

备注

仅用于写入的Kafka数据存储可以通过设置数据存储配置来禁用消费消息 kafka.consumer.count 设置为0。

首先,创建数据存储:

import org.geotools.data.DataStoreFinder;

String brokers = ...
String zookeepers = ...

// build parameters map
Map<String, Serializable> params = new HashMap<>();
params.put("kafka.brokers", brokers);
params.put("kafka.zookeepers", zookeepers);

// create the data store
KafkaDataStore ds = (KafkaDataStore) DataStoreFinder.getDataStore(params);

接下来,创建模式。每个数据存储可以有0到多个模式。例如:

SimpleFeatureType sft = ...
ds.createSchema(sft);

现在,您可以创建或更新简单的要素。请注意,Kafka数据仓库只支持通过功能ID更新。您可以使用ID过滤器显式创建修改功能编写器,也可以简单地使用追加功能编写器,它会覆盖任何具有相同功能ID的已有功能:

// the name of the simple feature type -  will be the same as sft.getTypeName();
String typeName = sft.getTypeName();

SimpleFeatureWriter fw = ds.getFeatureWriterAppend(typeName, Transaction.AUTO_COMMIT);
SimpleFeature sf = fw.next();
// set properties on sf
fw.write();

备注

当使用修改特征编写器时,返回的特征将不具有实际当前特征的属性,但将具有正确的特征ID。

警告

要使Kafka要素编写器使用提供的要素ID,标准GeoTools Hints.USE_PROVIDED_FIDHints.PROVIDED_FID 必须使用。否则,将生成新的功能ID。

删除简单功能:

SimpleFeatureStore store = (SimpleFeatureStore) ds.getFeatureSource(typeName);
FilterFactory2 ff = CommonFactoryFinder.getFilterFactory2();

String id = ...
store.removeFeatures(ff.id(ff.featureId(id)));

并且,清除(删除所有)功能:

store.removeFeatures(Filter.INCLUDE);

创建、修改、删除或清除简单要素的每个操作都会导致向Kafka主题发送一条消息。