17.4. 数据消费者

一个GeoMesa Kafka数据存储将读取由另一个GeoMesa Kafka数据存储(或其自身)写入Kafka的要素。它将不断地从Kafka消息队列中提取数据,并始终表示简单功能的最新状态。

备注

仅用于写入的Kafka数据存储可以通过设置数据存储配置来禁用消费消息 kafka.consumer.count 设置为0。

首先,创建数据存储。例如:

import org.geotools.data.DataStoreFinder;

 String brokers = ...
 String zookeepers = ...

 // build parameters map
 Map<String, Serializable> params = new HashMap<>();
 params.put("kafka.brokers", brokers);
 params.put("kafka.zookeepers", zookeepers);

 // optional - to read all existing messages on the kafka topic
 params.put("kafka.consumer.from-beginning", java.lang.Boolean.TRUE);

 // create the data store
 KafkaDataStore ds = (KafkaDataStore) DataStoreFinder.getDataStore(params);

kakfa.brokerskafka.zookeepers (和可选 kafka.zk.path )必须与用于创建Kafka数据存储生产者的值一致。

据推测 createSchema 已经被一家卡夫卡数据存储生产商调用。

String typeName = ...
SimpleFeatureStore store = ds.getFeatureSource(typeName);

Filter filter = ...
store.getFeatures(filter);

请注意,数据存储不会开始使用给定的Kafka中的要素 SimpleFeatureType 直到可以通过以下任一方式访问它 getFeatureSource()getFeatureReader() 。一旦被访问,存储将继续消费消息,直到 dispose() 被称为。