-
Notifications
You must be signed in to change notification settings - Fork 0
/
DbRoutes.scala
76 lines (57 loc) · 2.28 KB
/
DbRoutes.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.svend.demo.web
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.Source
import akka.stream.typed.scaladsl.ActorFlow
import akka.util.Timeout
import com.svend.demo.web.MockDb.{DbProtocol, DbQuery, Rows}
import scala.collection.immutable._
import scala.concurrent.duration._
import scala.language.postfixOps
class DbRoutes(dbReader: ActorRef[DbProtocol], actorContext: ActorContext[Nothing])(implicit val system: ActorSystem[_]) {
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
val pageSize = 20
val dbRoutes: Route =
// curl -X GET 'http://localhost:8080/events?fromRowId=5000' --limit-rate 10K
path("events") {
get {
parameter("fromRowId".as[Long]) { fromRowId =>
complete {
implicit val timeout = Timeout.durationToTimeout(1 minute)
Source
// infinite stream of paginated "fromRowId"
.fromIterator(() => Iterator.from(0, pageSize).map(_ + fromRowId))
// async requests to DB
.via(
ActorFlow
.ask(10)(dbReader)((fromRowId, replyTo: ActorRef[Rows]) => DbQuery(fromRowId, pageSize, replyTo))
)
// flatten rows + fix out-of orderness due to concurrent DB access (assuming a maximum delay...)
.mapConcat(dbResponse => dbResponse.rows)
.statefulMapConcat(sort(pageSize * 20, (row: String) => row.drop(15).toInt))
// output as SSE events
.map(ServerSentEvent(_))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
}
/**
* partial sort of a stream: wait for <bufferSize> to be buffered, the start flushing them out in order
**/
def sort[T, S](bufferSize: Int, order: T => S)(implicit ordering: Ordering[S]): () => T => Iterable[T] = () => {
var buffer = List.empty[T]
t: T => {
buffer = (buffer :+ t).sortBy(order)
if (buffer.size < bufferSize) Iterable.empty[T]
else {
val r = buffer.head
buffer = buffer.tail
List(r)
}
}
}
}