Skip to content

Commit

Permalink
Add pagination with cursor to solr client (#230)
Browse files Browse the repository at this point in the history
This adds the ability to use a cursor for paginated results.
  • Loading branch information
eikek authored Oct 28, 2024
1 parent ab6384d commit 5d262ec
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.solr.client

import io.bullet.borer.{Decoder, Encoder}

/** Allow paged results using a cursor as described here:
* https://solr.apache.org/guide/solr/latest/query-guide/pagination-of-results.html#fetching-a-large-number-of-sorted-results-cursors
*/
enum CursorMark:
case Start
case Mark(value: String)

def render: String = this match
case Start => "*"
case Mark(v) => v

object CursorMark:

given Encoder[CursorMark] =
Encoder.forString.contramap(_.render)

given Decoder[CursorMark] =
Decoder.forString.map(s => if ("*" == s) CursorMark.Start else CursorMark.Mark(s))
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.renku.solr.client

import io.bullet.borer.Encoder
import io.bullet.borer.derivation.MapBasedCodecs.deriveEncoder
import io.renku.solr.client.SolrSort.Direction
import io.renku.solr.client.facet.Facets
import io.renku.solr.client.schema.FieldName

Expand All @@ -37,12 +38,25 @@ final case class QueryData(
copy(offset = offset + limit)

def withSort(sort: SolrSort): QueryData = copy(sort = sort)
def appendSort(field: FieldName, dir: Direction = Direction.Asc): QueryData =
copy(sort = sort + (field -> dir))
def withFields(field: FieldName*) = copy(fields = field)
def withFilter(fq: Seq[String]): QueryData = copy(filter = fq)
def addFilter(q: String*): QueryData = copy(filter = filter ++ q)
def withFacet(facet: Facets): QueryData = copy(facet = facet)
def withLimit(limit: Int): QueryData = copy(limit = limit)
def withOffset(offset: Int): QueryData = copy(offset = offset)
def withCursor(cursorMark: CursorMark): QueryData =
copy(params = params.updated("cursorMark", cursorMark.render))

/** When using a cursor, it is required to add a `uniqueKey`field to the sort clause to
* guarantee a deterministic order.
*/
def withCursor(cursorMark: CursorMark, keyField: FieldName): QueryData =
copy(
params = params.updated("cursorMark", cursorMark.render),
sort = sort + (keyField -> SolrSort.Direction.Asc)
)

def addSubQuery(field: FieldName, sq: SubQuery): QueryData =
copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import io.renku.solr.client.facet.FacetResponse
final case class QueryResponse[A](
responseHeader: ResponseHeader,
@key("response") responseBody: ResponseBody[A],
@key("facets") facetResponse: Option[FacetResponse] = None
@key("facets") facetResponse: Option[FacetResponse] = None,
@key("nextCursorMark") nextCursor: Option[CursorMark] = None
):
def map[B](f: A => B): QueryResponse[B] =
copy(responseBody = responseBody.map(f))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ object SolrSort:
def nonEmpty: Boolean = !self.isEmpty
def ++(next: SolrSort): SolrSort =
Monoid[SolrSort].combine(self, next)

def +(n: (FieldName, Direction)): SolrSort =
self ++ Seq(n)

private[client] def toSolr: String =
self.map { case (f, d) => s"${f.name} ${d.name}" }.mkString(",")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ opaque type FieldName = String
object FieldName:
val all: FieldName = "*"
val score: FieldName = "score"
val id: FieldName = "id"

def apply(name: String): FieldName = name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,44 @@ class SolrClientSpec
override def munitFixtures: Seq[munit.AnyFixture[?]] =
List(solrServer, solrClient)

test("use cursor for pagination") {
val courses = List.unfold(0) { n =>
if (n > 10) None
else Some((Course(f"c-$n%03d", s"fp in scala $n", DocVersion.NotExists), n + 1))
}
for
client <- IO(solrClient())
_ <- client.deleteIds(NonEmptyList.fromListUnsafe(courses.map(_.id)))
r0 <- client.upsert(courses)
_ = assert(r0.isSuccess, clue = "Expected successful insert")

query = QueryData(QueryString("*:*"))
.withCursor(CursorMark.Start, FieldName.id)
.appendSort(FieldName("name_s"))
.withLimit(4)

r1 <- client.query[Course](query)
_ = assert(r1.nextCursor.isDefined)
_ = assertEquals(r1.responseBody.docs.map(_.id), courses.take(4).map(_.id))

r2 <- client.query[Course](query.withCursor(r1.nextCursor.get))
_ = assert(r2.nextCursor.isDefined)
_ = assertEquals(r2.responseBody.docs.map(_.id), courses.drop(4).take(4).map(_.id))

r3 <- client.query[Course](query.withCursor(r2.nextCursor.get))
_ = assert(r3.nextCursor.isDefined)
_ = assertEquals(r3.responseBody.docs.map(_.id), courses.drop(8).take(4).map(_.id))

r4 <- client.query[Course](query.withCursor(r3.nextCursor.get))
_ = assert(r4.nextCursor.isDefined)
_ = assertEquals(r4.responseBody.docs, Nil)

r5 <- client.query[Course](query.withCursor(r4.nextCursor.get))
_ = assert(r5.nextCursor.isDefined)
_ = assertEquals(r5.nextCursor, r4.nextCursor)
yield ()
}

test("optimistic locking: fail if exists") {
val c0 = Course("c1", "fp in scala", DocVersion.NotExists)
for {
Expand Down

0 comments on commit 5d262ec

Please sign in to comment.