6. GeoTools概述¶
GeoMesa中的主要抽象是GeoTool DataStore
. Understanding the GeoTools API is important to integrating with GeoMesa. The full GeoTools documentation is available here ,但本节简要概述了与数据存储交互的主要方式。
备注
本部分主要针对希望通过代码与GeoMesa集成的用户。许多用例不需要这样做;数据可以使用GeoMesa命令行工具或ApacheNiFi处理器获取,并通过Geoserver OGC请求或Spark访问。即便如此,本页仍可提供有关这些操作背后的概念的有用背景。
数据存储提供对空间数据的读写访问。API本身不区分不同的存储格式。因此,用于访问存储在本地形状文件中的数据的API将与用于访问存储在HBase集群中的数据的API相同。GeoMesa提供了几种不同的数据存储实现,包括HBase、Acumulo、Kafka等。看见 GeoMesa数据存储 有关可用的不同数据存储的更多信息。
6.1. SimpleFeatureType和SimpleFeature¶
在GeoTools中, SimpleFeatureType
定义给定架构中的属性的名称和类型。它类似于关系数据库的表定义。 SimpleFeatureType
S可以用类型名称和规范来描述,通常是表示属性名称和类型的字符串。一个 SimpleFeature
是一种结构数据类型,相当于关系数据库表中的单行。每个 SimpleFeature
与一个 SimpleFeatureType
,并具有唯一的标识符(要素ID)和与 SimpleFeatureType
。有关创建和管理的示例,请参阅以下内容 SimpleFeatureType
S。
中的“简单” SimpleFeatureType
指的是它的平面数据结构。也可以具有“复杂”的要素类型,这类似于关系数据库中的连接。但是,复杂的要素类型并未得到广泛使用或支持。
6.2. 获取数据存储实例¶
可通过访问数据存储 org.geotools.data.DataStoreFinder#getDataStore
。该函数接受一个参数映射,该参数映射用于动态加载数据存储。例如,要加载GeoMesa HBase数据存储,请包括参数键 "hbase.catalog"
。数据存储是动态加载的;适当的数据存储实现及其所有必需的依赖项必须位于类路径上。
GeoMesa数据存储是线程安全的(尽管并非数据存储上的所有方法都返回线程安全的对象)。通常,数据存储应该加载一次,然后重复使用。当不再需要数据存储时,应通过调用 dispose()
方法。
查看中的链接 GeoMesa数据存储 有关每个数据存储实现的参数的说明,请参见。
import org.geotools.data.DataStore;
import org.geotools.data.DataStoreFinder;
import org.locationtech.geomesa.hbase.data.HBaseDataStoreParams;
Map<String, String> parameters = new HashMap<>();
// HBaseDataStoreParams.HBaseCatalogParam().key is the string "hbase.catalog"
// the GeoMesa HBase data store will recognize the key and attempt to load itself
parameters.put(HBaseDataStoreParams.HBaseCatalogParam().key, "mycatalog");
DataStore store = null;
try {
store = DataStoreFinder.getDataStore(parameters);
} catch (IOException e) {
e.printStackTrace();
}
// when finished, be sure to clean up the store
if (store != null) {
store.dispose();
}
import org.geotools.data.DataStoreFinder
import org.locationtech.geomesa.hbase.data.HBaseDataStoreParams
import scala.collection.JavaConverters._
// HBaseDataStoreParams.HBaseCatalogParam.key is the string "hbase.catalog"
// the GeoMesa HBase data store will recognize the key and attempt to load itself
val params = Map(HBaseDataStoreParams.HBaseCatalogParam.key -> "mycatalog")
val store = DataStoreFinder.getDataStore(params.asJava)
// when finished, be sure to clean up the store
store.dispose()
6.3. 创建架构¶
每个数据存储可以包含多个 SimpleFeatureType
S,或图式。现有架构可以使用 getTypeNames
和 getSchema
方法:研究方法。可以通过创建、更新和删除架构 createSchema
, updateSchema
和 removeSchema
方法分别进行了实验研究。
看见 GeoTools要素类型 以获取可用的属性类型绑定的列表。
import org.locationtech.geomesa.utils.interop.SimpleFeatureTypes;
import org.opengis.feature.simple.SimpleFeatureType;
try {
String[] types = store.getTypeNames();
boolean exists = false;
for (String type: types) {
if (type.equals("purchases")) {
exists = true;
break;
}
}
if (!exists) {
SimpleFeatureType myType =
SimpleFeatureTypes.createType(
"purchases", "item:String,amount:Double,date:Date,location:Point:srid=4326");
store.createSchema(myType);
}
} catch (IOException e) {
e.printStackTrace();
}
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
if (!store.getTypeNames.contains("purchases")) {
val myType =
SimpleFeatureTypes.createType(
"purchases", "item:String,amount:Double,date:Date,location:Point:srid=4326")
store.createSchema(myType)
}
6.4. 正在写入数据¶
数据存储支持逐行写入数据。有两种不同的写入路径-附加写入和修改写入。
警告
密切关注……的使用 PROVIDED_FID
在以下各节中。此提示控制每个功能ID的行为。
一些数据存储支持事务,事务可用于隔离一组操作。GeoMesa不支持事务,因此默认的GeoTools Transaction.AUTO_COMMIT
在示例中使用。通常,一旦成功关闭编写器,数据就会持久化到基础存储区。在此之前,数据可能会在本地缓存和缓冲,并且可能无法持久化或可供查询。
6.4.1. 追加写入¶
附加编写器可以通过 getFeatureWriterAppend
方法。特写人员类似于迭代器; next
以获取新要素,则使用要写入的值更新该要素,然后 write
被调用来持久化它。一旦所有写入完成,特写程序就应该关闭。
用于唯一标识要素的ID称为要素ID,或 FID
。默认情况下,GeoTools将为每个要素生成一个新要素ID。若要指定功能ID,请将 PROVIDED_FID
提示在功能用户数据中,如下所示。
警告
使用附加的特征编写器多次写入相同的特征ID是逻辑错误。这可能导致持久化数据中的不一致。有关如何安全更新现有功能的信息,请参阅下一节。
import org.geotools.data.FeatureWriter;
import org.geotools.data.Transaction;
import org.geotools.util.factory.Hints;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
// use try-with-resources to close the writer when done
try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
store.getFeatureWriterAppend("purchases", Transaction.AUTO_COMMIT)) {
// repeat as needed, once per feature
// note: hasNext() will always return false, but can be ignored
SimpleFeature next = writer.next();
next.getUserData().put(Hints.PROVIDED_FID, "id-01");
next.setAttribute("item", "swag");
next.setAttribute("amount", 20.0);
// attributes will be converted to the appropriate type if needed
next.setAttribute("date", "2020-01-01T00:00:00.000Z");
next.setAttribute("location", "POINT (-82.379 34.1782)");
writer.write();
} catch (IOException e) {
e.printStackTrace();
}
import org.geotools.util.factory.Hints
val writer = store.getFeatureWriterAppend("purchases", Transaction.AUTO_COMMIT)
try {
// repeat as needed, once per feature
// note: hasNext will always return false, but can be ignored
val next = writer.next()
next.getUserData.put(Hints.PROVIDED_FID, "id-01")
next.setAttribute("item", "swag")
next.setAttribute("amount", 20.0)
// attributes will be converted to the appropriate type if needed
next.setAttribute("date", "2020-01-01T00:00:00.000Z")
next.setAttribute("location", "POINT (-82.379 34.1782)")
writer.write()
} finally {
writer.close()
}
进行追加写入的另一种方法是使用 FeatureStore
。GeoTools定义了 FeatureSource
为只读。 FeatureStore
延展 FeatureSource
并提供写入功能,但必须使用运行时强制转换进行检查。
import org.geotools.data.simple.SimpleFeatureCollection;
import org.geotools.data.simple.SimpleFeatureSource;
import org.geotools.data.simple.SimpleFeatureStore;
import org.geotools.feature.DefaultFeatureCollection;
try {
SimpleFeatureSource source = store.getFeatureSource("purchases");
if (source instanceof SimpleFeatureStore) {
SimpleFeatureCollection collection = new DefaultFeatureCollection();
// omitted - add features to the collection
((SimpleFeatureStore) source).addFeatures(collection);
} else {
throw new IllegalStateException("Store is read only");
}
} catch (IOException e) {
e.printStackTrace();
}
import org.geotools.data.simple.SimpleFeatureStore
import org.geotools.feature.DefaultFeatureCollection
store.getFeatureSource("purchases") match {
case s: SimpleFeatureStore =>
val collection = new DefaultFeatureCollection()
collection.add(???)
s.addFeatures(collection)
case _ => throw new IllegalStateException("Store is read only")
}
6.4.2. 修改写入¶
要更新现有要素,必须通过方法使用修改编写器 getFeatureWriter
,这需要指定要更新的要素的过滤器。修改要素编写器类似于附加要素编写器,不同之处在于该方法 hasNext
会回来的 true
只要有其他功能需要修改即可。从返回的要素 next
将预先填充每个要素的当前数据。
可通过GeoTools方法创建过滤器 ECQL.toFilter
. See the GeoTools documentation 有关CQL筛选器的详细信息。
import org.geotools.data.FeatureWriter;
import org.geotools.data.Transaction;
import org.geotools.filter.text.cql2.CQLException;
import org.geotools.filter.text.ecql.ECQL;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
store.getFeatureWriter("purchases", ECQL.toFilter("IN ('id-01')"), Transaction.AUTO_COMMIT)) {
while (writer.hasNext()) {
SimpleFeature next = writer.next();
next.setAttribute("amount", 21.0);
writer.write(); // or, to delete it: writer.remove();
}
} catch (IOException | CQLException e) {
e.printStackTrace();
}
import org.geotools.data.Transaction
import org.geotools.filter.text.ecql.ECQL
val filter = ECQL.toFilter("IN ('id-01')")
val writer = store.getFeatureWriter("purchases", filter, Transaction.AUTO_COMMIT)
try {
while (writer.hasNext) {
val next = writer.next
next.setAttribute("amount", 21.0)
writer.write() // or, to delete it: writer.remove()
}
} finally {
writer.close()
}
6.5. 正在读取数据¶
数据持久化后,可以通过 getFeatureReader
方法。GeoTools返回可能指向远程位置的结果的“实时”迭代器。通常,直到需要数据时才会从后备存储中实际读取数据,因此可以在不读取整个结果集的情况下读取几条记录。
要过滤返回的结果,可以使用“公共查询语言”CQL创建谓词。可通过GeoTools方法创建过滤器 ECQL.toFilter
. See the GeoTools documentation 有关CQL筛选器的详细信息。
import org.geotools.data.DataUtilities;
import org.geotools.data.FeatureReader;
import org.geotools.data.Query;
import org.geotools.data.Transaction;
import org.geotools.filter.text.cql2.CQLException;
import org.geotools.filter.text.ecql.ECQL;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
try {
Query query = new Query("purchases", ECQL.toFilter("bbox(location,-85,30,-80,35)"));
try (FeatureReader<SimpleFeatureType, SimpleFeature> reader =
store.getFeatureReader(query, Transaction.AUTO_COMMIT)) {
while (reader.hasNext()) {
SimpleFeature next = reader.next();
System.out.println(DataUtilities.encodeFeature(next));
}
}
} catch (IOException | CQLException e) {
e.printStackTrace();
}
import org.geotools.data.{DataUtilities, Query, Transaction}
import org.geotools.filter.text.ecql.ECQL
val query = new Query("purchases", ECQL.toFilter("bbox(location,-85,30,-80,35)"))
val reader = store.getFeatureReader(query, Transaction.AUTO_COMMIT)
try {
while (reader.hasNext) {
val next = reader.next
println(DataUtilities.encodeFeature(next))
}
} finally {
reader.close()
}