Skip to content

Commit

Permalink
Merge branch 'master' into 4076-annotate-schema-source
Browse files Browse the repository at this point in the history
  • Loading branch information
imsdu authored Aug 7, 2024
2 parents 6b7f102 + ccbcdb9 commit 87f2bad
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.kernel.utils
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceError.{InvalidJson, InvalidJsonObject, ResourcePathNotFound}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.ClasspathResourceLoader.handlebarsExpander
import fs2.io.file.Path
import fs2.text
import io.circe.parser.parse
import io.circe.{Json, JsonObject}
Expand All @@ -21,6 +22,8 @@ class ClasspathResourceLoader private (classLoader: ClassLoader) {
.map(_.getPath)
}

final def absoluteFs2Path(resourcePath: String) = absolutePath(resourcePath).map(Path(_))

/**
* Loads the content of the argument classpath resource as an [[InputStream]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,11 @@ object FileUtils {
}
}

def filenameWithoutExtension(filename: String): Option[String] = {
val lastDotIndex = filename.lastIndexOf('.')
Option.when(lastDotIndex > 0) {
filename.substring(0, lastDotIndex)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,22 @@ class FileUtilsSuite extends FunSuite {
assertEquals(obtained, expected)
}

test("Extract the filename without the extension when there is a single dot") {
val obtained = FileUtils.filenameWithoutExtension("my-file.json")
val expected = Some("my-file")
assertEquals(obtained, expected)
}

test("Extract the filename without the extension when there are several dots") {
val obtained = FileUtils.filenameWithoutExtension("my.dotted.file.json")
val expected = Some("my.dotted.file")
assertEquals(obtained, expected)
}

test("Extract no filename when there is nothing before the dot") {
val obtained = FileUtils.filenameWithoutExtension(".json")
val expected = None
assertEquals(obtained, expected)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ch.epfl.bluebrain.nexus.ship

import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.FileUtils
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import fs2.io.file.Path

object DirectoryReader {

final private case class FileResult(headOffset: Option[Offset], head: Option[Path], tail: List[Path])

private val empty = FileResult(None, None, List.empty)

private def filenameAsOffset(path: Path) =
FileUtils
.filenameWithoutExtension(path.fileName.toString)
.flatMap(_.toLongOption.map(Offset.at))

/**
* From the given files, filter out the ones where given the filename, no event is candidate by import:
* - All files where the filename is an offset greater than the provided one are conserved
* - The file with the offset immediately lower than the provided one is also conserved as it may contain some
* events to import
* - The conserved paths are sorted by name
*/
def apply(paths: List[Path], fromOffset: Offset): List[Path] = {
val result = paths.foldLeft(empty) {
case (acc, path) if path.extName == ".json" =>
filenameAsOffset(path).fold(acc) { newOffset =>
if (newOffset >= fromOffset) {
acc.copy(tail = acc.tail.appended(path))
} else {
if (acc.headOffset.isEmpty || acc.headOffset.exists(_ < newOffset)) {
acc.copy(headOffset = Some(newOffset), head = Some(path))
} else acc
}
}
case (acc, _) => acc
}

(result.head.toList ++ result.tail).sortBy(_.fileName.toString)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ trait EventStreamer {
.filter { event => event.ordering.value >= fromOffset.value }

private def streamFromDirectory(path: Path, fromOffset: Offset): Stream[IO, RowEvent] = {
val sortedImportFiles = fileList(path)
.map(_.filter(_.extName.equals(".json")))
.map(_.sortBy(_.fileName.toString))
val sortedImportFiles = fileList(path).map(DirectoryReader(_, fromOffset))
Stream.evals(sortedImportFiles).flatMap(streamFromFile(_, fromOffset))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
{"ordering":2163821,"type":"project","org":"public","project":"sscx","id":"projects/public/sscx","rev":1,"value":{"rev": 1, "base": "https://bbp.epfl.ch/neurosciencegraph/data/", "uuid": "c7d70522-4305-480a-b190-75d757ed9a49", "@type": "ProjectCreated", "label": "sscx", "vocab": "https://bbp.epfl.ch/nexus/v1/resources/public/sscx/_/", "instant": "2020-01-22T16:03:33.105Z", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}, "apiMappings": {"prov": "http://www.w3.org/ns/prov#", "context": "https://incf.github.io/neuroshapes/contexts/", "schemaorg": "http://schema.org/", "datashapes": "https://neuroshapes.org/dash/", "ontologies": "https://neuroshapes.org/dash/ontology", "taxonomies": "https://neuroshapes.org/dash/taxonomy", "commonshapes": "https://neuroshapes.org/commons/", "provdatashapes": "https://provshapes.org/datashapes/", "provcommonshapes": "https://provshapes.org/commons/"}, "description": "This project contains somatosensorycortex dissemination publication data.", "organizationUuid": "d098a020-508b-4131-a28d-75f73b7f5f0e", "organizationLabel": "public"},"instant":"2020-01-22T17:03:33.105+01:00"}
{"ordering":2163822,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/defaultInProject","rev":1,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/defaultInProject", "rev": 1, "@type": "ResolverCreated", "value": {"name": "Default resolver", "@type": "InProjectValue", "priority": 1, "description": "Resolver created with the project"}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/defaultInProject", "@type": ["InProject", "Resolver"], "@context": "https://bluebrain.github.io/nexus/contexts/resolvers.json", "priority": 1}, "instant": "2020-01-22T16:03:34.841Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-01-22T17:03:34.841+01:00"}
{"ordering":2163826,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject1","rev":1,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject1", "rev": 1, "@type": "ResolverCreated", "value": {"@type": "CrossProjectValue", "priority": 40, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "ProvidedIdentities", "value": [{"@type": "Authenticated", "realm": "bbp"}]}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject1", "@type": ["Resolver", "CrossProject"], "@context": "https://bluebrain.github.io/nexus/contexts/resolvers.json", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "identities": [{"@id": "https://bbp.epfl.ch/nexus/v1/realms/bbp/authenticated", "@type": "Authenticated", "realm": "bbp"}]}, "instant": "2020-01-22T16:03:35.040Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-01-22T17:03:35.04+01:00"}
{"ordering":2173453,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject1","rev":2,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject1", "rev": 2, "tpe": "CrossProject", "@type": "ResolverDeprecated", "instant": "2020-01-28T08:23:36.270Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-01-28T09:23:36.27+01:00"}
{"ordering":2408475,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":1,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "rev": 1, "@type": "ResolverCreated", "value": {"@type": "CrossProjectValue", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "ProvidedIdentities", "value": [{"@type": "Authenticated", "realm": "bbp"}]}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "@type": ["CrossProject", "Resolver"], "@context": "https://bluebrain.github.io/nexus/contexts/resolvers.json", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "identities": [{"@id": "https://bbp.epfl.ch/nexus/v1/realms/bbp/authenticated", "@type": "Authenticated", "realm": "bbp"}]}, "instant": "2020-03-09T13:36:19.246Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-03-09T14:36:19.246+01:00"}
{"ordering":4879496,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":2,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "rev": 2, "@type": "ResolverUpdated", "value": {"@type": "CrossProjectValue", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "UseCurrentCaller"}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "@type": ["Resolver", "CrossProject"], "@context": ["https://bluebrain.github.io/nexus/contexts/resolvers.json", "https://bluebrain.github.io/nexus/contexts/metadata.json"], "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "useCurrentCaller": true}, "instant": "2022-11-16T13:42:07.498Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2022-11-16T14:42:07.498+01:00"}
{"ordering":2173453,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject1","rev":2,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject1", "rev": 2, "tpe": "CrossProject", "@type": "ResolverDeprecated", "instant": "2020-01-28T08:23:36.270Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-01-28T09:23:36.27+01:00"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"ordering":2408475,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":1,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "rev": 1, "@type": "ResolverCreated", "value": {"@type": "CrossProjectValue", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "ProvidedIdentities", "value": [{"@type": "Authenticated", "realm": "bbp"}]}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "@type": ["CrossProject", "Resolver"], "@context": "https://bluebrain.github.io/nexus/contexts/resolvers.json", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "identities": [{"@id": "https://bbp.epfl.ch/nexus/v1/realms/bbp/authenticated", "@type": "Authenticated", "realm": "bbp"}]}, "instant": "2020-03-09T13:36:19.246Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2020-03-09T14:36:19.246+01:00"}
{"ordering":4879496,"type":"resolver","org":"public","project":"sscx","id":"https://bluebrain.github.io/nexus/vocabulary/crossProject2","rev":2,"value":{"id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "rev": 2, "@type": "ResolverUpdated", "value": {"@type": "CrossProjectValue", "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "identityResolution": {"@type": "UseCurrentCaller"}}, "source": {"@id": "https://bluebrain.github.io/nexus/vocabulary/crossProject2", "@type": ["Resolver", "CrossProject"], "@context": ["https://bluebrain.github.io/nexus/contexts/resolvers.json", "https://bluebrain.github.io/nexus/contexts/metadata.json"], "priority": 50, "projects": ["neurosciencegraph/datamodels"], "resourceTypes": [], "useCurrentCaller": true}, "instant": "2022-11-16T13:42:07.498Z", "project": "public/sscx", "subject": {"@type": "User", "realm": "bbp", "subject": "bob"}},"instant":"2022-11-16T14:42:07.498+01:00"}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ch.epfl.bluebrain.nexus.ship

import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import fs2.io.file.Path

class DirectoryReaderSuite extends NexusSuite {

test("Filter out non json files") {
val input = List(Path("import/0000.txt"))
assertEquals(DirectoryReader(input, Offset.start), List.empty)
}

test("Filter out files with non-offset names") {
val input = List(Path("import/f4il.json"))
assertEquals(DirectoryReader(input, Offset.start), List.empty)
}

test("Sort input files") {
val input = List(
Path("import/004.json"),
Path("import/001.json"),
Path("import/002.json")
)
val expected = List(
Path("import/001.json"),
Path("import/002.json"),
Path("import/004.json")
)
assertEquals(DirectoryReader(input, Offset.start), expected)
}

test("Skip files with offset 21") {
val input = List(
Path("import/0030.json"),
Path("import/0020.json"),
Path("import/0010.json"),
Path("import/0040.json")
)
val expected = List(
Path("import/0020.json"),
Path("import/0030.json"),
Path("import/0040.json")
)
assertEquals(DirectoryReader(input, Offset.at(21L)), expected)
}

test("Skip files with offset 31") {
val input = List(
Path("import/0030.json"),
Path("import/0020.json"),
Path("import/0010.json"),
Path("import/0040.json")
)
val expected = List(
Path("import/0030.json"),
Path("import/0040.json")
)
assertEquals(DirectoryReader(input, Offset.at(31L)), expected)
}

test("Skip files with offset 41") {
val input = List(
Path("import/0030.json"),
Path("import/0020.json"),
Path("import/0010.json"),
Path("import/0040.json")
)
val expected = List(
Path("import/0040.json")
)
assertEquals(DirectoryReader(input, Offset.at(41L)), expected)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ch.epfl.bluebrain.nexus.ship

import cats.kernel.Order
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import fs2.Stream
import munit.Location

class EventStreamerSuite extends NexusSuite {

private def readFrom(fromOffset: Offset)(expectedMin: Offset, expectedMax: Offset)(implicit location: Location) = {
Stream
.eval(loader.absoluteFs2Path("import/multi-part-import"))
.flatMap { path =>
EventStreamer.localStreamer.stream(path, fromOffset)
}
.scan((Offset.at(Long.MaxValue), Offset.at(0L))) { case ((min, max), event) =>
(Order.min(min, event.ordering), Order.max(max, event.ordering))
}
.compile
.lastOrError
.assertEquals((expectedMin, expectedMax))
}

test("Streaming from directory and keep the expected min / max from the beginning") {
readFrom(Offset.start)(Offset.at(2163821L), Offset.at(9999999L))
}

test("Streaming from directory and keep the expected min / max from offset 2500000L") {
readFrom(Offset.at(2500000L))(Offset.at(4879496L), Offset.at(9999999L))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class RunShipSuite
private val fileDigest =
ComputedDigest(DigestAlgorithm.SHA256, Hex.valueOf(DigestAlgorithm.SHA256.digest.digest(fileContent.getBytes)))

private def asPath(path: String): IO[Path] = loader.absolutePath(path).map(Path(_))
private def asPath(path: String): IO[Path] = loader.absoluteFs2Path(path)

private def uploadFile(path: String) = {
val contentAsBuffer = StandardCharsets.UTF_8.encode(fileContent).asReadOnlyBuffer()
Expand Down Expand Up @@ -89,7 +89,7 @@ class RunShipSuite

test("Run import by providing the path to a file") {
for {
events <- eventsStream("import/import.json")
events <- eventsStream("import/single/00263821.json")
_ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport)
_ <- checkFor("elasticsearch", nxv + "defaultElasticSearchIndex", xas).assertEquals(1)
_ <- checkFor("blazegraph", nxv + "defaultSparqlIndex", xas).assertEquals(1)
Expand All @@ -108,7 +108,7 @@ class RunShipSuite
test("Test the increment") {
val start = Offset.at(2)
for {
events <- eventsStream("import/two-projects.json", offset = start)
events <- eventsStream("import/two-projects/000000001.json", offset = start)
_ <- RunShip(events, s3Client, inputConfig, xas).map { report =>
assert(report.offset == Offset.at(2L))
assert(thereIsOneProjectEventIn(report))
Expand All @@ -123,7 +123,7 @@ class RunShipSuite
projectMapping = Map(original -> target)
)
for {
events <- eventsStream("import/import.json")
events <- eventsStream("import/single/00263821.json")
_ <- RunShip(events, s3Client, configWithProjectMapping, xas)
_ <- getDistinctOrgProjects(xas).map { project =>
assertEquals(project, target)
Expand All @@ -137,7 +137,7 @@ class RunShipSuite
test("Import files in S3 and in the primary store") {
val textPlain = MediaTypes.`text/plain`.withMissingCharset
for {
events <- eventsStream("import/file-import.json")
events <- eventsStream("import/file-import/000000001.json")
report <- RunShip(events, s3Client, inputConfig, xas)
project = ProjectRef.unsafe("public", "sscx")
// File with an old path to be rewritten
Expand Down Expand Up @@ -223,7 +223,7 @@ object RunShipSuite {
}
}

// The expected import report for the import.json file, as well as for the /import/multi-part-import directory
// The expected import report for the 00263821.json file, as well as for the /import/multi-part-import directory
val expectedImportReport: ImportReport = ImportReport(
Offset.at(9999999L),
Instant.parse("2099-12-31T22:59:59.999Z"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class S3RunShipSuite
private lazy val (s3Client, fs2S3client, _) = localStackS3Client()

test("Run import from S3 providing a single file") {
val importFilePath = Path("/import/import.json")
val importFilePath = Path("/import/single/00263821.json")
for {
_ <- uploadFileToS3(fs2S3client, bucket, importFilePath)
events = EventStreamer.s3eventStreamer(s3Client, bucket).stream(importFilePath, Offset.start)
Expand All @@ -37,9 +37,10 @@ class S3RunShipSuite
test("Run import from S3 providing a directory") {
val directoryPath = Path("/import/multi-part-import")
for {
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.json"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-05T14:38:31.165389Z.success"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/2024-04-06T11:34:31.165389Z.json"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/002163821.json"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/002408475.json"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/004900000.json"))
_ <- uploadFileToS3(fs2S3client, bucket, Path("/import/multi-part-import/002163821.success"))
events = EventStreamer.s3eventStreamer(s3Client, bucket).stream(directoryPath, Offset.start)
_ <- RunShip(events, s3Client, inputConfig, xas).assertEquals(expectedImportReport)
} yield ()
Expand Down

0 comments on commit 87f2bad

Please sign in to comment.