diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/CursorMark.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/CursorMark.scala new file mode 100644 index 00000000..59c1f050 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/CursorMark.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.{Decoder, Encoder} + +/** Allow paged results using a cursor as described here: + * https://solr.apache.org/guide/solr/latest/query-guide/pagination-of-results.html#fetching-a-large-number-of-sorted-results-cursors + */ +enum CursorMark: + case Start + case Mark(value: String) + + def render: String = this match + case Start => "*" + case Mark(v) => v + +object CursorMark: + + given Encoder[CursorMark] = + Encoder.forString.contramap(_.render) + + given Decoder[CursorMark] = + Decoder.forString.map(s => if ("*" == s) CursorMark.Start else CursorMark.Mark(s)) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/QueryData.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryData.scala index 2f59312f..ebc95af5 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/QueryData.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryData.scala @@ -20,6 +20,7 @@ package io.renku.solr.client import io.bullet.borer.Encoder import io.bullet.borer.derivation.MapBasedCodecs.deriveEncoder +import io.renku.solr.client.SolrSort.Direction import io.renku.solr.client.facet.Facets import io.renku.solr.client.schema.FieldName @@ -37,12 +38,25 @@ final case class QueryData( copy(offset = offset + limit) def withSort(sort: SolrSort): QueryData = copy(sort = sort) + def appendSort(field: FieldName, dir: Direction = Direction.Asc): QueryData = + copy(sort = sort + (field -> dir)) def withFields(field: FieldName*) = copy(fields = field) def withFilter(fq: Seq[String]): QueryData = copy(filter = fq) def addFilter(q: String*): QueryData = copy(filter = filter ++ q) def withFacet(facet: Facets): QueryData = copy(facet = facet) def withLimit(limit: Int): QueryData = copy(limit = limit) def withOffset(offset: Int): QueryData = copy(offset = offset) + def withCursor(cursorMark: CursorMark): QueryData = + copy(params = params.updated("cursorMark", cursorMark.render)) + + /** When using a cursor, it is required to add a `uniqueKey`field to the sort clause to + * guarantee a deterministic order. + */ + def withCursor(cursorMark: CursorMark, keyField: FieldName): QueryData = + copy( + params = params.updated("cursorMark", cursorMark.render), + sort = sort + (keyField -> SolrSort.Direction.Asc) + ) def addSubQuery(field: FieldName, sq: SubQuery): QueryData = copy( diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/QueryResponse.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryResponse.scala index 6ea476be..2ff3db55 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/QueryResponse.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/QueryResponse.scala @@ -27,7 +27,8 @@ import io.renku.solr.client.facet.FacetResponse final case class QueryResponse[A]( responseHeader: ResponseHeader, @key("response") responseBody: ResponseBody[A], - @key("facets") facetResponse: Option[FacetResponse] = None + @key("facets") facetResponse: Option[FacetResponse] = None, + @key("nextCursorMark") nextCursor: Option[CursorMark] = None ): def map[B](f: A => B): QueryResponse[B] = copy(responseBody = responseBody.map(f)) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrSort.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrSort.scala index 41d061c8..9cdb2a05 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrSort.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrSort.scala @@ -50,6 +50,10 @@ object SolrSort: def nonEmpty: Boolean = !self.isEmpty def ++(next: SolrSort): SolrSort = Monoid[SolrSort].combine(self, next) + + def +(n: (FieldName, Direction)): SolrSort = + self ++ Seq(n) + private[client] def toSolr: String = self.map { case (f, d) => s"${f.name} ${d.name}" }.mkString(",") diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldName.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldName.scala index de42cbb9..78c9fa50 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldName.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/schema/FieldName.scala @@ -25,6 +25,7 @@ opaque type FieldName = String object FieldName: val all: FieldName = "*" val score: FieldName = "score" + val id: FieldName = "id" def apply(name: String): FieldName = name diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala index 23a29cd2..285356b3 100644 --- a/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala @@ -48,6 +48,44 @@ class SolrClientSpec override def munitFixtures: Seq[munit.AnyFixture[?]] = List(solrServer, solrClient) + test("use cursor for pagination") { + val courses = List.unfold(0) { n => + if (n > 10) None + else Some((Course(f"c-$n%03d", s"fp in scala $n", DocVersion.NotExists), n + 1)) + } + for + client <- IO(solrClient()) + _ <- client.deleteIds(NonEmptyList.fromListUnsafe(courses.map(_.id))) + r0 <- client.upsert(courses) + _ = assert(r0.isSuccess, clue = "Expected successful insert") + + query = QueryData(QueryString("*:*")) + .withCursor(CursorMark.Start, FieldName.id) + .appendSort(FieldName("name_s")) + .withLimit(4) + + r1 <- client.query[Course](query) + _ = assert(r1.nextCursor.isDefined) + _ = assertEquals(r1.responseBody.docs.map(_.id), courses.take(4).map(_.id)) + + r2 <- client.query[Course](query.withCursor(r1.nextCursor.get)) + _ = assert(r2.nextCursor.isDefined) + _ = assertEquals(r2.responseBody.docs.map(_.id), courses.drop(4).take(4).map(_.id)) + + r3 <- client.query[Course](query.withCursor(r2.nextCursor.get)) + _ = assert(r3.nextCursor.isDefined) + _ = assertEquals(r3.responseBody.docs.map(_.id), courses.drop(8).take(4).map(_.id)) + + r4 <- client.query[Course](query.withCursor(r3.nextCursor.get)) + _ = assert(r4.nextCursor.isDefined) + _ = assertEquals(r4.responseBody.docs, Nil) + + r5 <- client.query[Course](query.withCursor(r4.nextCursor.get)) + _ = assert(r5.nextCursor.isDefined) + _ = assertEquals(r5.nextCursor, r4.nextCursor) + yield () + } + test("optimistic locking: fail if exists") { val c0 = Course("c1", "fp in scala", DocVersion.NotExists) for {