diff --git a/core/jvm/src/main/scala/cats/effect/Selector.scala b/core/jvm/src/main/scala/cats/effect/Selector.scala index da39e74d90..1777ea58b8 100644 --- a/core/jvm/src/main/scala/cats/effect/Selector.scala +++ b/core/jvm/src/main/scala/cats/effect/Selector.scala @@ -16,6 +16,8 @@ package cats.effect +import cats.syntax.all._ + import java.nio.channels.SelectableChannel import java.nio.channels.spi.SelectorProvider @@ -34,3 +36,12 @@ trait Selector { def select(ch: SelectableChannel, ops: Int): IO[Int] } + +object Selector { + def find: IO[Option[Selector]] = + IO.pollers.map(_.collectFirst { case selector: Selector => selector }) + + def get = find.flatMap( + _.liftTo[IO](new RuntimeException("No Selector installed in this IORuntime")) + ) +} diff --git a/tests/jvm/src/test/scala/cats/effect/SelectorSpec.scala b/tests/jvm/src/test/scala/cats/effect/SelectorSpec.scala index 78ea55d4aa..3fb0341c30 100644 --- a/tests/jvm/src/test/scala/cats/effect/SelectorSpec.scala +++ b/tests/jvm/src/test/scala/cats/effect/SelectorSpec.scala @@ -26,12 +26,9 @@ import java.nio.channels.SelectionKey._ class SelectorSpec extends BaseSpec { - def getSelector: IO[Selector] = - IO.pollers.map(_.collectFirst { case selector: Selector => selector }).map(_.get) - def mkPipe: Resource[IO, Pipe] = Resource - .eval(getSelector) + .eval(Selector.get) .flatMap { selector => Resource.make(IO(selector.provider.openPipe())) { pipe => IO(pipe.sink().close()).guarantee(IO(pipe.source().close())) @@ -49,7 +46,7 @@ class SelectorSpec extends BaseSpec { "notify read-ready events" in real { mkPipe.use { pipe => for { - selector <- getSelector + selector <- Selector.get buf <- IO(ByteBuffer.allocate(4)) _ <- IO(pipe.sink.write(ByteBuffer.wrap(Array(1, 2, 3)))).background.surround { selector.select(pipe.source, OP_READ) *> IO(pipe.source.read(buf)) @@ -64,7 +61,7 @@ class SelectorSpec extends BaseSpec { "setup multiple callbacks" in real { mkPipe.use { pipe => for { - selector <- getSelector + selector <- Selector.get _ <- selector.select(pipe.source, OP_READ).parReplicateA_(10) <& IO(pipe.sink.write(ByteBuffer.wrap(Array(1, 2, 3)))) } yield ok @@ -74,7 +71,7 @@ class SelectorSpec extends BaseSpec { "works after blocking" in real { mkPipe.use { pipe => for { - selector <- getSelector + selector <- Selector.get _ <- IO.blocking(()) _ <- selector.select(pipe.sink, OP_WRITE) } yield ok @@ -84,7 +81,7 @@ class SelectorSpec extends BaseSpec { "gracefully handles illegal ops" in real { mkPipe.use { pipe => // get off the wstp to test async codepaths - IO.blocking(()) *> getSelector.flatMap { selector => + IO.blocking(()) *> Selector.get.flatMap { selector => selector.select(pipe.sink, OP_READ).attempt.map { case Left(_: IllegalArgumentException) => true case _ => false @@ -99,7 +96,8 @@ class SelectorSpec extends BaseSpec { IORuntime.builder().setCompute(pool, shutdown).addPoller(poller, () => ()).build() try { - val test = getSelector + val test = Selector + .get .flatMap { selector => mkPipe.allocated.flatMap { case (pipe, close) =>