17.4. 数据消费者¶
一个GeoMesa Kafka数据存储将读取由另一个GeoMesa Kafka数据存储(或其自身)写入Kafka的要素。它将不断地从Kafka消息队列中提取数据,并始终表示简单功能的最新状态。
备注
仅用于写入的Kafka数据存储可以通过设置数据存储配置来禁用消费消息 kafka.consumer.count
设置为0。
首先,创建数据存储。例如:
import org.geotools.data.DataStoreFinder;
String brokers = ...
String zookeepers = ...
// build parameters map
Map<String, Serializable> params = new HashMap<>();
params.put("kafka.brokers", brokers);
params.put("kafka.zookeepers", zookeepers);
// optional - to read all existing messages on the kafka topic
params.put("kafka.consumer.from-beginning", java.lang.Boolean.TRUE);
// create the data store
KafkaDataStore ds = (KafkaDataStore) DataStoreFinder.getDataStore(params);
kakfa.brokers
, kafka.zookeepers
(和可选 kafka.zk.path
)必须与用于创建Kafka数据存储生产者的值一致。
据推测 createSchema
已经被一家卡夫卡数据存储生产商调用。
String typeName = ...
SimpleFeatureStore store = ds.getFeatureSource(typeName);
Filter filter = ...
store.getFeatures(filter);
请注意,数据存储不会开始使用给定的Kafka中的要素 SimpleFeatureType
直到可以通过以下任一方式访问它 getFeatureSource()
或 getFeatureReader()
。一旦被访问,存储将继续消费消息,直到 dispose()
被称为。