Skip to content

Commit

Permalink
tweak Fs2ServerCall and Fs2UnaryServerCallHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
naoh87 committed Feb 11, 2022
1 parent 2badf4f commit 6dd15b8
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package server

import cats.effect._
import cats.effect.std.Dispatcher
import fs2.grpc.internal.server.Fs2UnaryServerCallHandler
import fs2.grpc.server.internal.Fs2UnaryServerCallHandler
import io.grpc._

class Fs2ServerCallHandler[F[_]: Async] private (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2.grpc.internal.server
package fs2.grpc.server.internal

import cats.effect._
import cats.effect.std.Dispatcher
import fs2._
import fs2.grpc.server.ServerCallOptions
import io.grpc._
import fs2._

object Fs2ServerCall {
private[server] object Fs2ServerCall {
type Cancel = SyncIO[Unit]

def setup[I, O](
Expand All @@ -41,20 +41,20 @@ object Fs2ServerCall {
}
}

final class Fs2ServerCall[Request, Response](
private[server] final class Fs2ServerCall[Request, Response](
call: ServerCall[Request, Response]
) {

import Fs2ServerCall.Cancel

def stream[F[_]](response: Stream[F, Response], dispatcher: Dispatcher[F])(implicit F: Async[F]): SyncIO[Cancel] =
def stream[F[_]](response: Stream[F, Response], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] =
run(
response.pull.peek1
.flatMap {
case Some((_, tail)) =>
case Some((_, stream)) =>
Pull.suspend {
call.sendHeaders(new Metadata())
tail.map(call.sendMessage).pull.echo
stream.map(call.sendMessage).pull.echo
}
case None => Pull.done
}
Expand All @@ -64,7 +64,7 @@ final class Fs2ServerCall[Request, Response](
dispatcher
)

def unary[F[_]](response: F[Response], dispatcher: Dispatcher[F])(implicit F: Async[F]): SyncIO[Cancel] =
def unary[F[_]](response: F[Response], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] =
run(
F.map(response) { message =>
call.sendHeaders(new Metadata())
Expand All @@ -82,25 +82,17 @@ final class Fs2ServerCall[Request, Response](
private def run[F[_]](completed: F[Unit], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] = {
SyncIO {
val cancel = dispatcher.unsafeRunCancelable(F.guaranteeCase(completed) {
case Outcome.Succeeded(_) => closeStreamF(Status.OK, new Metadata())
case Outcome.Errored(e) =>
e match {
case ex: StatusException =>
closeStreamF(ex.getStatus, Option(ex.getTrailers).getOrElse(new Metadata()))
case ex: StatusRuntimeException =>
closeStreamF(ex.getStatus, Option(ex.getTrailers).getOrElse(new Metadata()))
case ex =>
closeStreamF(Status.INTERNAL.withDescription(ex.getMessage).withCause(ex), new Metadata())
}
case Outcome.Canceled() => closeStreamF(Status.CANCELLED, new Metadata())
case Outcome.Succeeded(_) => close(Status.OK, new Metadata()).to[F]
case Outcome.Errored(e) => handleError(e).to[F]
case Outcome.Canceled() => close(Status.CANCELLED, new Metadata()).to[F]
})
SyncIO {
cancel()
()
}
SyncIO(cancel()).void
}
}

private def closeStreamF[F[_]](status: Status, metadata: Metadata)(implicit F: Sync[F]): F[Unit] =
F.delay(call.close(status, metadata))
private def handleError(t: Throwable): SyncIO[Unit] = t match {
case ex: StatusException => close(ex.getStatus, Option(ex.getTrailers).getOrElse(new Metadata()))
case ex: StatusRuntimeException => close(ex.getStatus, Option(ex.getTrailers).getOrElse(new Metadata()))
case ex => close(Status.INTERNAL.withDescription(ex.getMessage).withCause(ex), new Metadata())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2.grpc.internal.server
package fs2.grpc.server.internal

import cats.effect.Async
import cats.effect.Ref
Expand All @@ -29,7 +29,7 @@ import fs2.grpc.server.ServerCallOptions
import fs2.grpc.server.ServerOptions
import io.grpc._

object Fs2UnaryServerCallHandler {
private[server] object Fs2UnaryServerCallHandler {

import Fs2ServerCall.Cancel

Expand Down Expand Up @@ -111,6 +111,8 @@ object Fs2UnaryServerCallHandler {
)(f: Fs2ServerCall[Request, Response] => Request => SyncIO[Cancel]): SyncIO[ServerCall.Listener[Request]] = {
for {
call <- Fs2ServerCall.setup(options, call)
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it.
_ <- call.request(2)
ctx <- Context.init(f(call))
} yield mkListener[Request, Response](call, ctx)
Expand Down
3 changes: 1 addition & 2 deletions runtime/src/test/scala/fs2/grpc/server/ServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import scala.concurrent.duration._
import cats.effect._
import cats.effect.std.Dispatcher
import cats.effect.testkit.TestContext
import fs2._
import fs2.grpc.internal.server._
import fs2.grpc.server.internal.Fs2UnaryServerCallHandler
import io.grpc._

class ServerSuite extends Fs2GrpcSuite {
Expand Down

0 comments on commit 6dd15b8

Please sign in to comment.