Skip to content

Commit

Permalink
Refactor to use the new HashedIndexedFileStorage storage
Browse files Browse the repository at this point in the history
  • Loading branch information
dacr committed Dec 31, 2024
1 parent 4b1374c commit e1926c5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.json4s.jackson.Serialization.write
import org.slf4j.LoggerFactory
import webecho.ServiceConfig
import webecho.model.{EchoInfo, EchoWebSocket, EchoesInfo, OperationOrigin}
import webecho.tools.{JsonImplicits, UniqueIdentifiers}
import webecho.tools.{HashedIndexedFileStorageLive, JsonImplicits, UniqueIdentifiers}

import java.io.{File, FileFilter, FilenameFilter}
import java.time.Instant
Expand All @@ -36,8 +36,8 @@ object EchoStoreFileSystem {
// TODO not absolutely "thread safe" move to an actor based implementation
// But not so bad as everything is based on distinct files... => immutable file content ;)
class EchoStoreFileSystem(config: ServiceConfig) extends EchoStore with JsonImplicits {
private val logger = LoggerFactory.getLogger(getClass)
private val storeConfig = config.webEcho.behavior.fileSystemCache
private val logger = LoggerFactory.getLogger(getClass)
private val storeConfig = config.webEcho.behavior.fileSystemCache
private val storeBaseDirectory = {
val path = new File(storeConfig.path)
if (!path.exists()) {
Expand Down Expand Up @@ -70,13 +70,10 @@ class EchoStoreFileSystem(config: ServiceConfig) extends EchoStore with JsonImpl

private def fsEntryFiles(uuid: UUID): Option[Array[File]] = {
val entryFilter = new FilenameFilter {
override def accept(dir: File, name: String): Boolean = name.endsWith(".json")
override def accept(dir: File, name: String): Boolean =
name.endsWith(".data") || name.endsWith(".index")
}
def encodedFileCreatedTimestamp(file: File): Long = {
file.getName.split("-", 2).head.toLongOption.getOrElse(0L)
}

Option(fsEntryBaseDirectory(uuid).listFiles(entryFilter)).map(_.sortBy(f => -encodedFileCreatedTimestamp(f)))
Option(fsEntryBaseDirectory(uuid).listFiles(entryFilter))
}

private def fsEntryUUIDs(): Iterable[UUID] = {
Expand Down Expand Up @@ -115,15 +112,18 @@ class EchoStoreFileSystem(config: ServiceConfig) extends EchoStore with JsonImpl
}

override def echoInfo(uuid: UUID): Option[EchoInfo] = {
fsEntryFiles(uuid).map { files =>
val dest = fsEntryBaseDirectory(uuid)
// TODO add caching to avoid systematic allocation
// TODO switch to effect system to take into account the Try
HashedIndexedFileStorageLive(dest.getAbsolutePath).toOption.map { storage =>
val origin = jsonRead(fsEntryInfo(uuid)).extractOpt[OperationOrigin]
EchoInfo(
count = files.length,
lastUpdated =
files
.map(_.lastModified())
.maxOption
.map(Instant.ofEpochMilli),
count = storage.size().toOption.getOrElse(0),
lastUpdated = storage
.lastUpdated()
.toOption
.flatten
.map(Instant.ofEpochMilli),
origin = origin
)
}
Expand Down Expand Up @@ -154,30 +154,35 @@ class EchoStoreFileSystem(config: ServiceConfig) extends EchoStore with JsonImpl
}

override def echoAdd(uuid: UUID, origin: Option[OperationOrigin]): Unit = {
val dest = fsEntryBaseDirectory(uuid)
dest.mkdir()
val dest = fsEntryBaseDirectory(uuid)
// TODO add caching to avoid systematic allocation
// TODO switch to effect system to take into account the Try
val storage = HashedIndexedFileStorageLive(dest.getAbsolutePath).get
jsonWrite(fsEntryInfo(uuid), decompose(origin))
}

override def echoGet(uuid: UUID): Option[Iterator[JValue]] = {
fsEntryFiles(uuid).map { files =>
files
.to(Iterator)
.map(jsonRead)
val dest = fsEntryBaseDirectory(uuid)
if (!dest.exists()) None else {
// TODO add caching to avoid systematic allocation
// TODO switch to effect system to take into account the Try
HashedIndexedFileStorageLive(dest.getAbsolutePath).toOption
.map { storage =>
storage
.list(reverseOrder = true)
.get
.map(entry => parse(entry))
}
}
}

private def makeEntryValueJsonFile(uuid: UUID) = {
val baseDir = fsEntryBaseDirectory(uuid)
val ts = System.currentTimeMillis()
val fileUUID = UniqueIdentifiers.randomUUID().toString
new File(baseDir, s"$ts-$fileUUID.json")
}

override def echoAddValue(uuid: UUID, value: JValue): Unit = {
// In fact just a new file with a timestamp encoded in its name...
val jsonFile = makeEntryValueJsonFile(uuid)
jsonWrite(jsonFile, value)
val dest = fsEntryBaseDirectory(uuid)
// TODO add caching to avoid systematic allocation
// TODO switch to effect system to take into account the Try
HashedIndexedFileStorageLive(dest.getAbsolutePath).foreach { storage =>
storage.append(write(value))
}
}

private def makeWebSocketJsonFile(entryUUID: UUID, uuid: UUID) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ import scala.io.Codec
trait HashedIndexedFileStorage {
def list(reverseOrder: Boolean = false, fromEpoch: Option[Long] = None): Try[Iterator[String]]
def append(data: String): Try[SHA]
def lastUpdated(): Try[Option[Long]]
def size(): Try[Long]
}
15 changes: 12 additions & 3 deletions src/main/scala/webecho/tools/HashedIndexedFileStorageLive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,19 @@ private class HashedIndexedFileStorageLive(
} yield dataIterator
}

private def getCurrentLastEntrySHA(indexFile: RandomAccessFile): Option[Array[Byte]] = {
private def getIndexLastEntry(indexFile: RandomAccessFile): Option[IndexEntry] = {
if (indexFile.length() == 0) None
else {
val offset = indexFile.length() - indexEntrySize
val entry = indexReadEntry(indexFile, offset)
entry.toOption.map(_.sha.bytes)
entry.toOption
}
}

private def getIndexLastEntrySHA(indexFile: RandomAccessFile): Option[Array[Byte]] = {
getIndexLastEntry(indexFile).map(_.sha.bytes)
}

def append(data: String): Try[SHA] = {
val bytes = data.getBytes(codec.charSet)
if (bytes.length == 0) Failure(IllegalArgumentException("Input string is empty"))
Expand All @@ -169,7 +173,7 @@ private class HashedIndexedFileStorageLive(
dataIndex
}.flatMap { dataIndex =>
Using(new RandomAccessFile(indexFile, "rwd")) { output =>
val prevLastSHA = getCurrentLastEntrySHA(output)
val prevLastSHA = getIndexLastEntrySHA(output)
val dataSHA = shaEngine.digest(bytes, prevLastSHA)
val timestamp = System.currentTimeMillis()
output.seek(output.length())
Expand All @@ -189,4 +193,9 @@ private class HashedIndexedFileStorageLive(
}
}

override def lastUpdated(): Try[Option[Timestamp]] = {
Using(new RandomAccessFile(indexFile, "r")) { indexFile =>
getIndexLastEntry(indexFile).map(_.timestamp)
}
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "1.2.9-SNAPSHOT"
ThisBuild / version := "1.3.0-SNAPSHOT"

0 comments on commit e1926c5

Please sign in to comment.