17.10. 侦听要素事件

GeoTools API包括一种机制,用于启动 FeatureEvent 对象中添加、更改或删除数据时发生的“事件”。 SimpleFeatureSource 。客户端可以实现 FeatureListener ,它只有一个名为 changed() 在每次调用时, FeatureEvent 被解雇了。

GeoMesa Kafka制作人制作了三种类型的消息。每条消息都会导致 FeatureEvent 当GeoMesa Kafka消费者阅读时将被触发。所有要素事件类都会扩展 org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent 并包含在同名的伴随对象中。

邮件已读

激发的事件类别

FeatureEvent.Type

滤器

CreateOrUpdate

KafkaFeatureChanged

CHANGED

IN (<id>)

已添加具有给定ID的单个要素;这可能是新要素或现有要素的更新

删除

KafkaFeatureRemoved

REMOVED

IN (<id>)

已移除具有给定ID的要素

清除

KafkaFeatureCleared

REMOVED

Filter.INCLUDE

所有功能均已移除

中的常规信息之外 FeatureEvent ,CreateOrUpdate消息会暴露相关的 SimpleFeature 用这种方法 feature() 。删除消息使用方法公开要素ID id() ,还包括 SimpleFeature 如果它可用(它可能为空)。所有事件都使用该方法公开原始的Kafka时间戳 time()

要注册一个 FeatureListener ,创建 SimpleFeatureSource 来自GeoMesa Kafka消费者数据存储,并使用 addFeatureListener() 方法。例如,下面的监听程序简单地打印出它收到的事件:

import org.geotools.data.FeatureEvent;
import org.geotools.data.FeatureListener;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureChanged;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureRemoved;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent.KafkaFeatureCleared;

// unless specified, the consumer will only read data written after its instantiation
SimpleFeatureSource source = ds.getFeatureSource(sftName);
FeatureListener listener = new FeatureListener() {
  @Override
  public void changed(FeatureEvent featureEvent) {
    if (featureEvent instanceof KafkaFeatureChanged) {
      KafkaFeatureChanged event = ((KafkaFeatureChanged) featureEvent);
      System.out.println("Received add/update for " + event.feature() +
                         " at " + new java.util.Date(event.time()));
    } else if (featureEvent instanceof KafkaFeatureRemoved) {
      KafkaFeatureRemoved event = ((KafkaFeatureRemoved) featureEvent);
      System.out.println("Received delete for " + event.id() + " " + event.feature() +
                         " at " + new java.util.Date(event.time()));
    } else if (featureEvent instanceof KafkaFeatureCleared) {
      KafkaFeatureCleared event = ((KafkaFeatureCleared) featureEvent);
      System.out.println("Received clear at " + new java.util.Date(event.time()));
    }
  }
};
store.addFeatureListener(listener);

清理时,使用取消注册要素侦听器非常重要 removeFeatureListener() 。例如,对于在Geoserver的Bean中运行的代码, javax.annotation.PreDestroy 注释可用于标记执行注销的方法:

@PreDestroy
public void dispose() throws Exception {
    store.removeFeatureListener(listener);
    // other cleanup
}