Skip to content

Commit

Permalink
GEOMESA-3420 Add Kafka readiness endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Nov 25, 2024
1 parent 366052f commit 9c05bb3
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 30 deletions.
15 changes: 0 additions & 15 deletions build/assembly.xml

This file was deleted.

7 changes: 7 additions & 0 deletions geomesa-gs-kafka-status-endpoint/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# geomesa-gs-kafka-status-endpoint

This module provides a readiness check available as a REST endpoint which indicates when any initial Kafka loads have
completed. In order for this to matter, Kafka DataStores need to have `kafka.consumer.start-on-demand` set to false and
`kafka.consumer.read-back` set to a non-zero value.

The endpoint is available at `/geoserver/rest/kafka` (assuming a default context path).
35 changes: 35 additions & 0 deletions geomesa-gs-kafka-status-endpoint/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.geomesa.geoserver</groupId>
<artifactId>geomesa-geoserver_2.12</artifactId>
<version>5.2.0-SNAPSHOT</version>
</parent>

<artifactId>geomesa-gs-kafka-status-endpoint_2.12</artifactId>
<name>GeoMesa GeoServer Kafka Status Endpoint</name>

<dependencies>
<dependency>
<groupId>org.geoserver</groupId>
<artifactId>gs-rest</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-kafka-datastore_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="org.geomesa.gs.kafka.status"/>
</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the GNU GENERAL PUBLIC LICENSE,
* Version 2 which accompanies this distribution and is available at
* https://opensource.org/licenses/GPL-2.0.
***********************************************************************/

package org.geomesa.gs.kafka.status

import com.typesafe.scalalogging.StrictLogging
import org.geoserver.catalog.event._
import org.geoserver.catalog.{Catalog, DataStoreInfo, FeatureTypeInfo}
import org.geoserver.rest.RestBaseController
import org.locationtech.geomesa.kafka.data.KafkaCacheLoader
import org.locationtech.geomesa.utils.concurrent.CachedThreadPool
import org.springframework.beans.factory.InitializingBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.{HttpStatus, MediaType, ResponseEntity}
import org.springframework.web.bind.annotation.{GetMapping, RequestMapping, RestController}

@RestController
@RequestMapping(path = Array("/rest/kafka"), produces = Array(MediaType.APPLICATION_JSON_VALUE))
class KafkaLoadStatusController extends RestBaseController with CatalogListener with InitializingBean with StrictLogging {

import scala.collection.JavaConverters._

@Autowired
private var catalog: Catalog = _

@volatile
private var loaded: Boolean = false

@GetMapping
// noinspection ScalaUnusedSymbol
def status(): ResponseEntity[String] = {
if (loaded && KafkaCacheLoader.LoaderStatus.allLoaded()) {
new ResponseEntity("", HttpStatus.OK)
} else {
new ResponseEntity("Kafka layers are still loading", HttpStatus.SERVICE_UNAVAILABLE)
}
}

override def afterPropertiesSet(): Unit = {
catalog.addListener(this)
reloaded()
}

override def handleAddEvent(event: CatalogAddEvent): Unit = loadStore(event)
override def handleModifyEvent(event: CatalogModifyEvent): Unit = loadStore(event)
override def handlePostModifyEvent(event: CatalogPostModifyEvent): Unit = loadStore(event)
override def handleRemoveEvent(event: CatalogRemoveEvent): Unit = {}

override def reloaded(): Unit = {
logger.info("Starting to load all datastores")
val start = System.currentTimeMillis()
CachedThreadPool.submit(() => {
try {
val futures = catalog.getDataStores.asScala.toList.map { dsi =>
CachedThreadPool.submit(() => {
val start = System.currentTimeMillis()
try { loadStore(dsi) } finally {
logger.info(s"Loaded store ${name(dsi)} in ${System.currentTimeMillis() - start}ms")
}
})
}
futures.foreach(_.get)
logger.info(s"Finished loading datastores in ${System.currentTimeMillis() - start}ms")
} finally {
loaded = true
}
})
}

private def loadStore(event: CatalogEvent): Unit = {
logger.debug(s"Received event: $event")
event.getSource match {
case dsi: DataStoreInfo => loadStore(dsi)
case fti: FeatureTypeInfo => loadStore(fti.getStore)
case _ => // not a new layer - no action necessary
}
}

private def loadStore(dsi: DataStoreInfo): Unit = {
try { dsi.getDataStore(null) } catch {
case e: Throwable => logger.error(s"Error loading store ${name(dsi)}:", e)
}
}

private def name(dsi: DataStoreInfo): String = s"${dsi.getWorkspace.getName}:${dsi.getName}"

def setCatalog(catalog: Catalog): Unit = this.catalog = catalog
def getCatalog: Catalog = this.catalog
}
28 changes: 13 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<packaging>pom</packaging>

<modules>
<module>geomesa-gs-kafka-status-endpoint</module>
<module>geomesa-gs-monitor-elasticsearch</module>
<module>geomesa-gs-styling</module>
<module>geomesa-gs-wfs</module>
Expand Down Expand Up @@ -224,6 +225,18 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.geoserver</groupId>
<artifactId>gs-rest</artifactId>
<version>${geoserver.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.geoserver.extension</groupId>
<artifactId>gs-wps-core</artifactId>
Expand Down Expand Up @@ -402,21 +415,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven.assembly.plugin.version}</version>
<executions>
<execution>
<id>create-archive</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>${project.basedir}/../build/assembly.xml</descriptor>
</descriptors>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
Expand Down

0 comments on commit 9c05bb3

Please sign in to comment.