Skip to content

Commit

Permalink
fix #1722
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Nov 20, 2023
1 parent cf1fe6a commit 1c7404f
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 15 deletions.
37 changes: 35 additions & 2 deletions otoroshi/app/env/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import otoroshi.gateway.{AnalyticsQueue, CircuitBreakersHolder}
import otoroshi.health.HealthCheckerActor
import otoroshi.jobs.updates.Version
import otoroshi.models._
import otoroshi.next.extensions.{AdminExtensionConfig, AdminExtensions}
import otoroshi.next.extensions.{AdminExtensionConfig, AdminExtensionId, AdminExtensions}
import otoroshi.next.models.NgRoute
import otoroshi.next.proxy.NgProxyState
import otoroshi.next.tunnel.{TunnelAgent, TunnelManager}
Expand All @@ -31,7 +31,7 @@ import otoroshi.script.{AccessValidatorRef, JobManager, ScriptCompiler, ScriptMa
import otoroshi.security.{ClaimCrypto, IdGenerator}
import otoroshi.ssl.pki.BouncyCastlePki
import otoroshi.ssl.{Cert, DynamicSSLEngineProvider}
import otoroshi.storage.DataStores
import otoroshi.storage.{DataStores, DataStoresBuilder}
import otoroshi.storage.drivers.cassandra._
import otoroshi.storage.drivers.inmemory._
import otoroshi.storage.drivers.lettuce._
Expand Down Expand Up @@ -877,6 +877,39 @@ class Env(
configuration.getOptionalWithFileSupport[String]("app.storage").getOrElse("lettuce") match {
case _ if clusterConfig.mode == ClusterMode.Worker =>
new SwappableInMemoryDataStores(configuration, environment, lifecycle, this)
case v if v.startsWith("cp:") =>
scriptManager.getAnyScript[DataStoresBuilder](v)(otoroshiExecutionContext) match {
case Left(err) => {
logger.error(s"specified datastore with name '${v}' does not exists or failed to instanciate: ${err}")
System.exit(-1)
???
}
case Right(dsb) => dsb.build(configuration, environment, lifecycle, clusterConfig.mode, this)
}
case v if v.startsWith("ext:") =>
val parts = v.split(":").toSeq
if (parts.size == 2) {
val name = parts.apply(1)
adminExtensions.datastore(name) match {
case None => {
logger.error(s"specified datastore with name '${v}' does not exists")
System.exit(-1)
???
}
case Some(dsb) => dsb.build(configuration, environment, lifecycle, clusterConfig.mode, this)
}
} else {
val extensionId = parts.apply(1)
val name = parts.apply(2)
adminExtensions.datastoreFrom(AdminExtensionId(extensionId), name) match {
case None => {
logger.error(s"specified datastore with name '${v}' does not exists")
System.exit(-1)
???
}
case Some(dsb) => dsb.build(configuration, environment, lifecycle, clusterConfig.mode, this)
}
}
case "redis-pool" if clusterConfig.mode == ClusterMode.Leader =>
new RedisCPDataStores(configuration, environment, lifecycle, this)
case "redis-mpool" if clusterConfig.mode == ClusterMode.Leader =>
Expand Down
99 changes: 90 additions & 9 deletions otoroshi/app/next/extensions/example.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
package otoroshi.next.extensions

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.util.ByteString
import otoroshi.api._
import otoroshi.cluster.ClusterMode
import otoroshi.env.Env
import otoroshi.models._
import otoroshi.next.plugins.api.{
NgAccess,
NgAccessContext,
NgAccessValidator,
NgPluginCategory,
NgPluginConfig,
NgPluginVisibility,
NgStep
}
import otoroshi.storage._
import otoroshi.utils.cache.types.UnboundedTrieMap
import otoroshi.utils.syntax.implicits._
import play.api.inject.ApplicationLifecycle
import play.api.libs.json._
import play.api.mvc.Results
import play.api.{Configuration, Environment, Logger}
import storage.drivers.generic.{GenericDataStores, GenericRedisLike, GenericRedisLikeBuilder}

import scala.collection.concurrent.TrieMap
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -89,6 +88,84 @@ class FooAdminExtensionState(env: Env) {
}
}

class FooRedisLike(env: Env, actorSystem: ActorSystem) extends GenericRedisLike {

val redis = new otoroshi.storage.drivers.inmemory.SwappableInMemoryRedis(false, env, actorSystem)
implicit val ec = actorSystem.dispatcher

override def setCounter(key: String, value: Long): Future[Unit] = redis.set(key, value.toString).map(_ => ())
override def rawGet(key: String): Future[Option[Any]] = redis.rawGet(key)
override def health()(implicit ec: ExecutionContext): Future[DataStoreHealth] = redis.health()
override def stop(): Unit = redis.stop()
override def flushall(): Future[Boolean] = redis.flushall()

override def get(key: String): Future[Option[ByteString]] = redis.get(key)
override def mget(keys: String*): Future[Seq[Option[ByteString]]] = redis.mget(keys: _*)
override def set(key: String, value: String, exSeconds: Option[Long], pxMilliseconds: Option[Long]): Future[Boolean] = redis.set(key, value, exSeconds, pxMilliseconds)
override def setBS(key: String, value: ByteString, exSeconds: Option[Long], pxMilliseconds: Option[Long]): Future[Boolean] = redis.setBS(key, value, exSeconds, pxMilliseconds)
override def del(keys: String*): Future[Long] = redis.del(keys: _*)
override def incr(key: String): Future[Long] = redis.incr(key)
override def incrby(key: String, increment: Long): Future[Long] = redis.incrby(key, increment)
override def exists(key: String): Future[Boolean] = redis.exists(key)
override def keys(pattern: String): Future[Seq[String]] = redis.keys(pattern)

override def hdel(key: String, fields: String*): Future[Long] = redis.hdel(key, fields: _*)
override def hgetall(key: String): Future[Map[String, ByteString]] = redis.hgetall(key)
override def hset(key: String, field: String, value: String): Future[Boolean] = redis.hset(key, field, value)
override def hsetBS(key: String, field: String, value: ByteString): Future[Boolean] = redis.hsetBS(key, field, value)

override def llen(key: String): Future[Long] = redis.llen(key)
override def lpush(key: String, values: String*): Future[Long] = redis.lpush(key, values:_*)
override def lpushLong(key: String, values: Long*): Future[Long] = redis.lpushLong(key, values:_*)
override def lpushBS(key: String, values: ByteString*): Future[Long] = redis.lpushBS(key, values:_*)
override def lrange(key: String, start: Long, stop: Long): Future[Seq[ByteString]] = redis.lrange(key, start, stop)
override def ltrim(key: String, start: Long, stop: Long): Future[Boolean] = redis.ltrim(key, start, stop)

override def pttl(key: String): Future[Long] = redis.pttl(key)
override def ttl(key: String): Future[Long] = redis.ttl(key)
override def expire(key: String, seconds: Int): Future[Boolean] = redis.expire(key, seconds)
override def pexpire(key: String, milliseconds: Long): Future[Boolean] = redis.pexpire(key, milliseconds)

override def sadd(key: String, members: String*): Future[Long] = redis.sadd(key, members: _*)
override def saddBS(key: String, members: ByteString*): Future[Long] = redis.saddBS(key, members: _*)
override def sismember(key: String, member: String): Future[Boolean] = redis.sismember(key, member)
override def sismemberBS(key: String, member: ByteString): Future[Boolean] = redis.sismemberBS(key, member)
override def smembers(key: String): Future[Seq[ByteString]] = redis.smembers(key)
override def srem(key: String, members: String*): Future[Long] = redis.srem(key, members: _*)
override def sremBS(key: String, members: ByteString*): Future[Long] = redis.sremBS(key, members: _*)
override def scard(key: String): Future[Long] = redis.scard(key)

override def typ(key: String): Future[String] = {
rawGet(key) map {
case Some(_: String) => "string"
case Some(_: ByteString) => "string"
case Some(_: Long) => "string"
case Some(_: java.util.concurrent.ConcurrentHashMap[String, ByteString]) => "hash"
case Some(_: TrieMap[String, ByteString]) => "hash"
case Some(_: java.util.concurrent.CopyOnWriteArrayList[ByteString]) => "list"
case Some(_: scala.collection.mutable.MutableList[ByteString]) => "list"
case Some(_: java.util.concurrent.CopyOnWriteArraySet[ByteString]) => "set"
case Some(_: scala.collection.mutable.HashSet[ByteString]) => "set"
case _ => "none"
}
}
}

class FooRedisLikeBuilder extends GenericRedisLikeBuilder {
override def build(configuration: Configuration, environment: Environment, lifecycle: ApplicationLifecycle, clusterMode: ClusterMode, redisStatsItems: Int, actorSystem: ActorSystem, mat: Materializer, logger: Logger, env: Env): GenericRedisLike = {
new FooRedisLike(env, actorSystem)
}
}
class FooDataStoresBuilder extends DataStoresBuilder {
override def build(configuration: Configuration, environment: Environment, lifecycle: ApplicationLifecycle, clusterMode: ClusterMode, env: Env): DataStores = {
new GenericDataStores(configuration, environment, lifecycle, clusterMode, redisStatsItems = 100, builder = new FooRedisLikeBuilder(), env)
}
}

object FooDataStoresBuilder {
def apply(): DataStoresBuilder = new FooDataStoresBuilder()
}

class FooAdminExtension(val env: Env) extends AdminExtension {

private lazy val datastores = new FooAdminExtensionDatastores(env, id)
Expand All @@ -110,6 +187,10 @@ class FooAdminExtension(val env: Env) extends AdminExtension {
"stop example extension".debugPrintln
}

override def datastoreBuilders(): Map[String, DataStoresBuilder] = Map(
"foo" -> FooDataStoresBuilder()
)

override def syncStates(): Future[Unit] = {
implicit val ec = env.otoroshiExecutionContext
implicit val ev = env
Expand Down
15 changes: 13 additions & 2 deletions otoroshi/app/next/extensions/extension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import otoroshi.api.Resource
import otoroshi.env.Env
import otoroshi.models.{ApiKey, BackOfficeUser, EntityLocationSupport, PrivateAppsUser}
import otoroshi.next.utils.Vault
import otoroshi.storage.BasicStore
import otoroshi.storage.DataStoresBuilder
import otoroshi.utils.cache.types.UnboundedTrieMap
import otoroshi.utils.syntax.implicits._
import play.api.Configuration
import play.api.libs.json.{Format, JsObject, JsResult, JsSuccess, JsValue, Reads}
import play.api.libs.json.{JsObject, JsResult, JsValue, Reads}
import play.api.mvc._
import play.twirl.api.Html

Expand Down Expand Up @@ -160,6 +160,7 @@ trait AdminExtension {

// TODO: add util function to access and update global_config extensions with id cleanup as key

def datastoreBuilders(): Map[String, DataStoresBuilder] = Map.empty
def entities(): Seq[AdminExtensionEntity[EntityLocationSupport]] = Seq.empty
def frontendExtensions(): Seq[AdminExtensionFrontendExtension] = Seq.empty
def globalConfigExtensions(): Seq[AdminExtensionGlobalConfigExtension] = Seq.empty
Expand Down Expand Up @@ -302,6 +303,16 @@ class AdminExtensions(env: Env, _extensions: Seq[AdminExtension]) {
}
}

def datastoreFrom(extId: AdminExtensionId, name: String): Option[DataStoresBuilder] = {
extensions.find(_.id == extId).flatMap(_.datastoreBuilders().get(name))
}

def datastore(name: String): Option[DataStoresBuilder] = {
extensions.collect {
case e if e.datastoreBuilders().nonEmpty => e.datastoreBuilders()
}.foldLeft(Map.empty[String, DataStoresBuilder])((a, b) => a ++ b).get(name)
}

def getAssetsCallHandler(
request: RequestHeader,
actionBuilder: ActionBuilder[Request, AnyContent],
Expand Down
Loading

0 comments on commit 1c7404f

Please sign in to comment.