17.3. 数据生产者¶
GeoMesa Kafka数据存储可以充当Kafka生产者,并将特写写入Kafka主题。
备注
仅用于写入的Kafka数据存储可以通过设置数据存储配置来禁用消费消息 kafka.consumer.count
设置为0。
首先,创建数据存储:
import org.geotools.data.DataStoreFinder;
String brokers = ...
String zookeepers = ...
// build parameters map
Map<String, Serializable> params = new HashMap<>();
params.put("kafka.brokers", brokers);
params.put("kafka.zookeepers", zookeepers);
// create the data store
KafkaDataStore ds = (KafkaDataStore) DataStoreFinder.getDataStore(params);
接下来,创建模式。每个数据存储可以有0到多个模式。例如:
SimpleFeatureType sft = ...
ds.createSchema(sft);
现在,您可以创建或更新简单的要素。请注意,Kafka数据仓库只支持通过功能ID更新。您可以使用ID过滤器显式创建修改功能编写器,也可以简单地使用追加功能编写器,它会覆盖任何具有相同功能ID的已有功能:
// the name of the simple feature type - will be the same as sft.getTypeName();
String typeName = sft.getTypeName();
SimpleFeatureWriter fw = ds.getFeatureWriterAppend(typeName, Transaction.AUTO_COMMIT);
SimpleFeature sf = fw.next();
// set properties on sf
fw.write();
备注
当使用修改特征编写器时,返回的特征将不具有实际当前特征的属性,但将具有正确的特征ID。
警告
要使Kafka要素编写器使用提供的要素ID,标准GeoTools Hints.USE_PROVIDED_FID
或 Hints.PROVIDED_FID
必须使用。否则,将生成新的功能ID。
删除简单功能:
SimpleFeatureStore store = (SimpleFeatureStore) ds.getFeatureSource(typeName);
FilterFactory2 ff = CommonFactoryFinder.getFilterFactory2();
String id = ...
store.removeFeatures(ff.id(ff.featureId(id)));
并且,清除(删除所有)功能:
store.removeFeatures(Filter.INCLUDE);
创建、修改、删除或清除简单要素的每个操作都会导致向Kafka主题发送一条消息。