Skip to content

Commit

Permalink
Merge pull request #4226 from armanbilge/feature/selector-helpers
Browse files Browse the repository at this point in the history
Add helpers to `Selector` poller
  • Loading branch information
armanbilge authored Jan 5, 2025
2 parents b6adfcb + 1b79545 commit 625006b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
11 changes: 11 additions & 0 deletions core/jvm/src/main/scala/cats/effect/Selector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package cats.effect

import cats.syntax.all._

import java.nio.channels.SelectableChannel
import java.nio.channels.spi.SelectorProvider

Expand All @@ -34,3 +36,12 @@ trait Selector {
def select(ch: SelectableChannel, ops: Int): IO[Int]

}

object Selector {
def find: IO[Option[Selector]] =
IO.pollers.map(_.collectFirst { case selector: Selector => selector })

def get = find.flatMap(
_.liftTo[IO](new RuntimeException("No Selector installed in this IORuntime"))
)
}
16 changes: 7 additions & 9 deletions tests/jvm/src/test/scala/cats/effect/SelectorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ import java.nio.channels.SelectionKey._

class SelectorSpec extends BaseSpec {

def getSelector: IO[Selector] =
IO.pollers.map(_.collectFirst { case selector: Selector => selector }).map(_.get)

def mkPipe: Resource[IO, Pipe] =
Resource
.eval(getSelector)
.eval(Selector.get)
.flatMap { selector =>
Resource.make(IO(selector.provider.openPipe())) { pipe =>
IO(pipe.sink().close()).guarantee(IO(pipe.source().close()))
Expand All @@ -49,7 +46,7 @@ class SelectorSpec extends BaseSpec {
"notify read-ready events" in real {
mkPipe.use { pipe =>
for {
selector <- getSelector
selector <- Selector.get
buf <- IO(ByteBuffer.allocate(4))
_ <- IO(pipe.sink.write(ByteBuffer.wrap(Array(1, 2, 3)))).background.surround {
selector.select(pipe.source, OP_READ) *> IO(pipe.source.read(buf))
Expand All @@ -64,7 +61,7 @@ class SelectorSpec extends BaseSpec {
"setup multiple callbacks" in real {
mkPipe.use { pipe =>
for {
selector <- getSelector
selector <- Selector.get
_ <- selector.select(pipe.source, OP_READ).parReplicateA_(10) <&
IO(pipe.sink.write(ByteBuffer.wrap(Array(1, 2, 3))))
} yield ok
Expand All @@ -74,7 +71,7 @@ class SelectorSpec extends BaseSpec {
"works after blocking" in real {
mkPipe.use { pipe =>
for {
selector <- getSelector
selector <- Selector.get
_ <- IO.blocking(())
_ <- selector.select(pipe.sink, OP_WRITE)
} yield ok
Expand All @@ -84,7 +81,7 @@ class SelectorSpec extends BaseSpec {
"gracefully handles illegal ops" in real {
mkPipe.use { pipe =>
// get off the wstp to test async codepaths
IO.blocking(()) *> getSelector.flatMap { selector =>
IO.blocking(()) *> Selector.get.flatMap { selector =>
selector.select(pipe.sink, OP_READ).attempt.map {
case Left(_: IllegalArgumentException) => true
case _ => false
Expand All @@ -99,7 +96,8 @@ class SelectorSpec extends BaseSpec {
IORuntime.builder().setCompute(pool, shutdown).addPoller(poller, () => ()).build()

try {
val test = getSelector
val test = Selector
.get
.flatMap { selector =>
mkPipe.allocated.flatMap {
case (pipe, close) =>
Expand Down

0 comments on commit 625006b

Please sign in to comment.