17.13. Kafka Streams整合

Kafka Data Store可以将Kafka Streams应用程序作为源和目标进行集成。

要读取或写入GeoMesa Kafka主题,请使用 org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder 班级。这个类包装了一个普通的卡夫卡 StreamsBuilder 具有将要素类型映射到主题和加载序列化程序的便捷方法。

17.13.1. 流数据模型

消息基于SimpleFeature并建模为简单的案例类:

case class GeoMesaMessage(action: MessageAction, attributes: Seq[AnyRef], userData: Map[String, String])

object MessageAction extends Enumeration {
  type MessageAction = Value
  val Upsert, Delete = Value
}

17.13.2. 阅读示例

以下内容改编自字数统计 example 在卡夫卡溪中:

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder

// these are the parameters normally passed to DataStoreFinder
val params = Map[String, String](
  "kafka.brokers"    -> "localhost:9092",
  "kafka.zookeepers" -> "localhost:2181"
)

val builder = GeoMesaStreamsBuilder(params)

// read in the feature type and turn the attributes into a space-separated string
val textLines: KStream[String, String] =
  builder.stream("my-feature-type")
      .map((k, v) => (k, v.attributes.map(_.toString.replaceAll(" ", "_")).mkString(" ")))

val wordCounts: KTable[String, Long] =
  textLines
      .flatMapValues(textLine => textLine.split(" +"))
      .groupBy((_, word) => word)
      .count()(Materialized.as("counts-store"))

wordCounts.toStream.to("word-count")

val topology = builder.build()
// construct the streams app as normal

17.13.3. 写例子

下面显示了如何将数据持久化到GeoMesa主题:

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.locationtech.geomesa.kafka.streams.GeoMesaMessage
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder

// these are the parameters normally passed to DataStoreFinder
val params = Map[String, String](
  "kafka.brokers"    -> "localhost:9092",
  "kafka.zookeepers" -> "localhost:2181"
)

val builder = GeoMesaStreamsBuilder(params)

// use the wrapped native streams builder to create an input based on csv records
val input: KStream[String, String] =
  builder.wrapped.stream[String, String]("input-csv-topic")

// the columns in the csv need to map to the attributes in the feature type
val output: KStream[String, GeoMesaMessage] =
  input.mapValues(lines => GeoMesaMessage.upsert(lines.split(",")))

// write the output to GeoMesa - the feature type must already exist
builder.to("my-feature-type", output)

val topology = builder.build()
// construct the streams app as normal

17.13.4. 联接和主题划分

警告

当使用来自GeoMesa 3.4.x或更早版本的数据时,需要共同分区的Kafka Streams操作通常需要重新分区步骤。

对于在3.5.0版本之前创建的要素类型,GeoMesa将对写入Kafka的数据使用自定义分区程序。当使用需要共同分区的Kafka Streams操作时,例如JOIN,需要对GeoMesa主题进行重新分区。

现有要素类型可以更新为使用默认的Kafka分割器,这将允许在不重新划分的情况下连接。但是,请注意,给定功能的更新可能会转到错误的分区,这可能会导致在查询GeoMesa时返回较旧的数据。随着Kafka日志保留策略删除较旧的数据(除非主题启用了Kafka日志压缩),这个问题通常会随着时间的推移自行解决。

要更新现有要素类型,请添加用户数据条目 geomesa.kafka.partitioning=default 。通过GeoMesa命令行工具,以下命令将禁用自定义分区:

geomesa-kafka update-schema --add-user-data "geomesa.kafka.partitioning:default"