17.13. Kafka Streams整合¶
Kafka Data Store可以将Kafka Streams应用程序作为源和目标进行集成。
要读取或写入GeoMesa Kafka主题,请使用 org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder
班级。这个类包装了一个普通的卡夫卡 StreamsBuilder
具有将要素类型映射到主题和加载序列化程序的便捷方法。
17.13.1. 流数据模型¶
消息基于SimpleFeature并建模为简单的案例类:
case class GeoMesaMessage(action: MessageAction, attributes: Seq[AnyRef], userData: Map[String, String])
object MessageAction extends Enumeration {
type MessageAction = Value
val Upsert, Delete = Value
}
17.13.2. 阅读示例¶
以下内容改编自字数统计 example 在卡夫卡溪中:
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder
// these are the parameters normally passed to DataStoreFinder
val params = Map[String, String](
"kafka.brokers" -> "localhost:9092",
"kafka.zookeepers" -> "localhost:2181"
)
val builder = GeoMesaStreamsBuilder(params)
// read in the feature type and turn the attributes into a space-separated string
val textLines: KStream[String, String] =
builder.stream("my-feature-type")
.map((k, v) => (k, v.attributes.map(_.toString.replaceAll(" ", "_")).mkString(" ")))
val wordCounts: KTable[String, Long] =
textLines
.flatMapValues(textLine => textLine.split(" +"))
.groupBy((_, word) => word)
.count()(Materialized.as("counts-store"))
wordCounts.toStream.to("word-count")
val topology = builder.build()
// construct the streams app as normal
import org.locationtech.geomesa.kafka.jstreams.GeoMesaStreamsBuilder;
// these are the parameters normally passed to DataStoreFinder
Map<String, String> params = new HashMap<>();
params.put("kafka.brokers", "localhost:9092");
params.put("kafka.zookeepers", "localhost:2181");
GeoMesaStreamsBuilder builder = GeoMesaStreamsBuilder.create(params)
// read in the feature type and turn the attributes into a space-separated string
KStream<String, String> textLines =
builder.stream("my-feature-type")
.mapValues(v -> v.asJava().stream().map(Object::toString).collect(Collectors.joining(" ")));
KTable<String, Long> wordCounts =
textLines
.flatMapValues(textLine -> Arrays.asList(textLine.split(" +")))
.groupBy((k, word) -> word)
.count(Materialized.as("counts-store"));
wordCounts.toStream().to("word-count", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
// construct the streams app as normal
17.13.3. 写例子¶
下面显示了如何将数据持久化到GeoMesa主题:
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.locationtech.geomesa.kafka.streams.GeoMesaMessage
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder
// these are the parameters normally passed to DataStoreFinder
val params = Map[String, String](
"kafka.brokers" -> "localhost:9092",
"kafka.zookeepers" -> "localhost:2181"
)
val builder = GeoMesaStreamsBuilder(params)
// use the wrapped native streams builder to create an input based on csv records
val input: KStream[String, String] =
builder.wrapped.stream[String, String]("input-csv-topic")
// the columns in the csv need to map to the attributes in the feature type
val output: KStream[String, GeoMesaMessage] =
input.mapValues(lines => GeoMesaMessage.upsert(lines.split(",")))
// write the output to GeoMesa - the feature type must already exist
builder.to("my-feature-type", output)
val topology = builder.build()
// construct the streams app as normal
import org.locationtech.geomesa.kafka.jstreams.GeoMesaStreamsBuilder;
import org.locationtech.geomesa.kafka.streams.GeoMesaMessage;
// these are the parameters normally passed to DataStoreFinder
Map<String, String> params = new HashMap<>();
params.put("kafka.brokers", "localhost:9092");
params.put("kafka.zookeepers", "localhost:2181");
GeoMesaStreamsBuilder builder = GeoMesaStreamsBuilder.create(params)
// use the wrapped native streams builder to create an input based on csv records
KStream<String, String> input =
builder.wrapped().stream("input-topic",
Consumed.with(Serdes.String(), Serdes.String()).withTimestampExtractor(new WallclockTimestampExtractor()));
// the columns in the csv need to map to the attributes in the feature type
KStream<String, GeoMesaMessage> output =
input.mapValues(lines -> GeoMesaMessage.upsert(Arrays.asList(lines.split(","))));
// write the output to GeoMesa - the feature type must already exist
builder.to(sft.getTypeName(), output);
Topology topology = builder.build();
// construct the streams app as normal
17.13.4. 联接和主题划分¶
警告
当使用来自GeoMesa 3.4.x或更早版本的数据时,需要共同分区的Kafka Streams操作通常需要重新分区步骤。
对于在3.5.0版本之前创建的要素类型,GeoMesa将对写入Kafka的数据使用自定义分区程序。当使用需要共同分区的Kafka Streams操作时,例如JOIN,需要对GeoMesa主题进行重新分区。
现有要素类型可以更新为使用默认的Kafka分割器,这将允许在不重新划分的情况下连接。但是,请注意,给定功能的更新可能会转到错误的分区,这可能会导致在查询GeoMesa时返回较旧的数据。随着Kafka日志保留策略删除较旧的数据(除非主题启用了Kafka日志压缩),这个问题通常会随着时间的推移自行解决。
要更新现有要素类型,请添加用户数据条目 geomesa.kafka.partitioning=default
。通过GeoMesa命令行工具,以下命令将禁用自定义分区:
geomesa-kafka update-schema --add-user-data "geomesa.kafka.partitioning:default"