Skip to content

Commit

Permalink
Introduce PollResult
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Jan 8, 2025
1 parent e6570f8 commit 60204b6
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,14 @@ abstract class PollingSystem {
* indefinitely.
*
* @return
* whether any events are ready. e.g. if the method returned due to timeout, this should be
* `false`. If `true`, [[processReadyEvents]] must be called before calling any other method
* on this poller.
* whether any ready events were polled and should be handled with [[processReadyEvents]].
* If result is incomplete, then [[poll]] should be called again after
* [[processReadyEvents]].
*/
def poll(poller: Poller, nanos: Long): Boolean
def poll(poller: Poller, nanos: Long): PollResult

/**
* Processes ready events e.g. collects their results and resumes the corresponding tasks.
* This method should only be called after [[poll]] and only if it returned `true`.
*
* @param poller
* the thread-local [[Poller]] with ready events
Expand Down Expand Up @@ -149,3 +148,23 @@ object PollingSystem {
type Poller = P
}
}

sealed abstract class PollResult
object PollResult {

/**
* Polled all of the available ready events.
*/
case object Complete extends PollResult

/**
* Polled some, but not all, of the available ready events. Poll should be called again to
* reap additional ready events.
*/
case object Incomplete extends PollResult

/**
* The poll was interrupted or timed out before any events became ready.
*/
case object Interrupted extends PollResult
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
def closePoller(poller: Poller): Unit =
poller.selector.close()

def poll(poller: Poller, nanos: Long): Boolean = {
def poll(poller: Poller, nanos: Long): PollResult = {
val millis = if (nanos >= 0) nanos / 1000000 else -1
val selector = poller.selector

Expand All @@ -50,7 +50,10 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
else selector.select()

// closing selector interrupts select
selector.isOpen() && !selector.selectedKeys().isEmpty()
if (selector.isOpen() && !selector.selectedKeys().isEmpty())
PollResult.Complete
else
PollResult.Interrupted
}

def processReadyEvents(poller: Poller): Boolean = {
Expand Down
4 changes: 2 additions & 2 deletions core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ object SleepSystem extends PollingSystem {

def closePoller(Poller: Poller): Unit = ()

def poll(poller: Poller, nanos: Long): Boolean = {
def poll(poller: Poller, nanos: Long): PollResult = {
if (nanos < 0)
LockSupport.park()
else if (nanos > 0)
LockSupport.parkNanos(nanos)
else
()
false
PollResult.Interrupted
}

def processReadyEvents(poller: Poller): Boolean = false
Expand Down
25 changes: 18 additions & 7 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -421,24 +421,34 @@ private[effect] final class WorkerThread[P <: AnyRef](
}
}

@tailrec
def drainReadyEvents(result: PollResult, acc: Boolean): Boolean =
if (result ne PollResult.Interrupted) {
val tasksScheduled = system.processReadyEvents(_poller) | acc
if (result eq PollResult.Complete) tasksScheduled
else drainReadyEvents(system.poll(_poller, 0), tasksScheduled)
} else {
acc
}

// returns true if polled event, false if unparked
def parkLoop(): Boolean = {
while (!done.get()) {
// Park the thread until further notice.
val start = System.nanoTime()
metrics.incrementPolledCount()
val polled = system.poll(_poller, -1)
val pollResult = system.poll(_poller, -1)
now = System.nanoTime() // update now
metrics.addIdleTime(now - start)

// the only way we can be interrupted here is if it happened *externally* (probably sbt)
if (isInterrupted()) {
pool.shutdown()
} else if (polled) {
} else if (pollResult ne PollResult.Interrupted) {
if (parked.getAndSet(false))
pool.doneSleeping()
// TODO, if no tasks scheduled could fastpath back to park?
val _ = system.processReadyEvents(_poller)
val _ = drainReadyEvents(pollResult, false)
return true
} else if (!parked.get()) { // Spurious wakeup check.
return false
Expand All @@ -465,7 +475,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
if (nanos > 0L) {
val start = now
metrics.incrementPolledCount()
val polled = system.poll(_poller, nanos)
val pollResult = system.poll(_poller, nanos)
// we already parked and time passed, so update time again
// it doesn't matter if we timed out or were awakened, the update is free-ish
now = System.nanoTime()
Expand All @@ -476,13 +486,14 @@ private[effect] final class WorkerThread[P <: AnyRef](
false // we know `done` is `true`
} else {
// no matter why we woke up, there may be timers or events ready
val polled = pollResult ne PollResult.Interrupted
if (polled || (triggerTime - now <= 0)) {
// we timed out or polled an event
if (parked.getAndSet(false)) {
pool.doneSleeping()
}
if (polled) { // TODO, if no tasks scheduled and no timers could fastpath back to park?
val _ = system.processReadyEvents(_poller)
val _ = drainReadyEvents(pollResult, false)
}
true
} else { // we were either awakened spuriously or intentionally
Expand Down Expand Up @@ -583,8 +594,8 @@ private[effect] final class WorkerThread[P <: AnyRef](
sleepers.packIfNeeded()
// give the polling system a chance to discover events
metrics.incrementPolledCount()
if (system.needsPoll(_poller) && system.poll(_poller, 0)) {
val _ = system.processReadyEvents(_poller)
if (system.needsPoll(_poller)) {
val _ = drainReadyEvents(system.poll(_poller, 0), false)
}

// Obtain a fiber or batch of fibers from the external queue.
Expand Down
27 changes: 8 additions & 19 deletions core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import cats.syntax.all._

import org.typelevel.scalaccompat.annotation._

import scala.annotation.tailrec
import scala.scalanative.annotation.alwaysinline
import scala.scalanative.libc.errno._
import scala.scalanative.meta.LinktimeInfo
Expand Down Expand Up @@ -60,7 +59,7 @@ object EpollSystem extends PollingSystem {

def closePoller(poller: Poller): Unit = poller.close()

def poll(poller: Poller, nanos: Long): Boolean =
def poll(poller: Poller, nanos: Long): PollResult =
poller.poll(nanos)

def processReadyEvents(poller: Poller): Boolean =
Expand Down Expand Up @@ -193,21 +192,22 @@ object EpollSystem extends PollingSystem {
if (unistd.close(epfd) != 0)
throw new IOException(fromCString(strerror(errno)))

private[EpollSystem] def poll(timeout: Long): Boolean = {
private[EpollSystem] def poll(timeout: Long): PollResult = {

val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt
val rtn = epoll_wait(epfd, events, MaxEvents, timeoutMillis)
if (rtn >= 0) {
readyEventCount = rtn
rtn > 0
if (rtn > 0) {
if (rtn < MaxEvents) PollResult.Complete else PollResult.Incomplete
} else PollResult.Interrupted
} else if (errno == EINTR) { // spurious wake-up by signal
false
PollResult.Interrupted
} else {
throw new IOException(fromCString(strerror(errno)))
}
}

@tailrec
private[EpollSystem] def processReadyEvents(): Boolean = {
var i = 0
while (i < readyEventCount) {
Expand All @@ -216,19 +216,8 @@ object EpollSystem extends PollingSystem {
handle.notify(event.events.toInt)
i += 1
}

if (readyEventCount >= MaxEvents) { // drain the ready list
val rtn = epoll_wait(epfd, events, MaxEvents, 0)
if (rtn >= 0) {
readyEventCount = rtn
processReadyEvents()
} else {
throw new IOException(fromCString(strerror(errno)))
}
} else {
readyEventCount = 0
true
}
readyEventCount = 0
true
}

private[EpollSystem] def needsPoll(): Boolean = !handles.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cats.effect
package unsafe

import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration._
import scala.scalanative.libc.errno._
Expand Down Expand Up @@ -128,9 +129,13 @@ private[effect] final class EventLoopExecutorScheduler[P](
* test framework, including MUnit, specs2, and Weaver.
*/
if (system.needsPoll(poller) || timeout != -1) {
if (system.poll(poller, timeout)) {
val _ = system.processReadyEvents(poller)
}
@tailrec def loop(result: PollResult): Unit =
if (result ne PollResult.Interrupted) {
system.processReadyEvents(poller)
if (result eq PollResult.Incomplete) loop(system.poll(poller, 0))
}

loop(system.poll(poller, timeout))
}

continue = !executeQueue.isEmpty() || !sleepQueue.isEmpty() || system.needsPoll(poller)
Expand Down
43 changes: 17 additions & 26 deletions core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import cats.syntax.all._

import org.typelevel.scalaccompat.annotation._

import scala.annotation.tailrec
import scala.collection.mutable.LongMap
import scala.scalanative.libc.errno._
import scala.scalanative.posix.errno._
Expand Down Expand Up @@ -59,7 +58,7 @@ object KqueueSystem extends PollingSystem {

def closePoller(poller: Poller): Unit = poller.close()

def poll(poller: Poller, nanos: Long): Boolean =
def poll(poller: Poller, nanos: Long): PollResult =
poller.poll(nanos)

def processReadyEvents(poller: Poller): Boolean =
Expand Down Expand Up @@ -144,7 +143,8 @@ object KqueueSystem extends PollingSystem {
private[this] val buffer = new Array[Byte](sizeof[kevent64_s].toInt * MaxEvents)
@inline private[this] def eventlist =
buffer.atUnsafe(0).asInstanceOf[Ptr[kevent64_s]]
private[this] var eventCount = 0
private[this] var changeCount = 0
private[this] var readyEventCount = 0

private[this] val callbacks = new LongMap[Either[Throwable, Unit] => Unit]()

Expand All @@ -154,15 +154,15 @@ object KqueueSystem extends PollingSystem {
flags: CUnsignedShort,
cb: Either[Throwable, Unit] => Unit
): Unit = {
val event = eventlist + eventCount.toLong
val event = eventlist + changeCount.toLong

event.ident = ident.toULong
event.filter = filter
event.flags = (flags.toInt | EV_ONESHOT).toUShort

callbacks.update(encodeKevent(ident, filter), cb)

eventCount += 1
changeCount += 1
}

private[KqueueSystem] def removeCallback(ident: Int, filter: Short): Unit = {
Expand All @@ -174,7 +174,7 @@ object KqueueSystem extends PollingSystem {
if (unistd.close(kqfd) != 0)
throw new IOException(fromCString(strerror(errno)))

private[KqueueSystem] def poll(timeout: Long): Boolean = {
private[KqueueSystem] def poll(timeout: Long): PollResult = {

val timeoutSpec =
if (timeout <= 0) null
Expand All @@ -190,28 +190,30 @@ object KqueueSystem extends PollingSystem {
val rtn = kevent64(
kqfd,
eventlist,
eventCount,
changeCount,
eventlist,
MaxEvents,
flags.toUInt,
timeoutSpec
)
changeCount = 0

if (rtn >= 0) {
eventCount = rtn
rtn > 0
readyEventCount = rtn
if (rtn > 0) {
if (rtn < MaxEvents) PollResult.Complete else PollResult.Incomplete
} else PollResult.Interrupted
} else if (errno == EINTR) { // spurious wake-up by signal
false
PollResult.Interrupted
} else {
throw new IOException(fromCString(strerror(errno)))
}
}

@tailrec
private[KqueueSystem] def processReadyEvents(): Boolean = {
var i = 0
var event = eventlist
while (i < eventCount) {
while (i < readyEventCount) {
val kevent = encodeKevent(event.ident.toInt, event.filter)
val cb = callbacks.getOrNull(kevent)
callbacks -= kevent
Expand All @@ -227,22 +229,11 @@ object KqueueSystem extends PollingSystem {
event += 1
}

if (eventCount >= MaxEvents) { // drain the ready list
val rtn =
kevent64(kqfd, null, 0, eventlist, MaxEvents, KEVENT_FLAG_IMMEDIATE.toUInt, null)
if (rtn >= 0) {
eventCount = rtn
processReadyEvents()
} else {
throw new IOException(fromCString(strerror(errno)))
}
} else {
eventCount = 0
true
}
readyEventCount = 0
true
}

private[KqueueSystem] def needsPoll(): Boolean = eventCount > 0 || callbacks.nonEmpty
private[KqueueSystem] def needsPoll(): Boolean = changeCount > 0 || callbacks.nonEmpty
}

@nowarn212
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ abstract class PollingExecutorScheduler(pollEvery: Int)
def makeApi(ctx: PollingContext[Poller]): Api = outer
def makePoller(): Poller = outer
def closePoller(poller: Poller): Unit = ()
def poll(poller: Poller, nanos: Long): Boolean = {
def poll(poller: Poller, nanos: Long): PollResult = {
needsPoll =
if (nanos == -1)
poller.poll(Duration.Inf)
else
poller.poll(nanos.nanos)
true
PollResult.Complete
}
def processReadyEvents(poller: Poller): Boolean = true
def needsPoll(poller: Poller) = needsPoll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ object SleepSystem extends PollingSystem {

def closePoller(poller: Poller): Unit = ()

def poll(poller: Poller, nanos: Long): Boolean = {
def poll(poller: Poller, nanos: Long): PollResult = {
if (nanos > 0)
Thread.sleep(nanos / 1000000, (nanos % 1000000).toInt)
false
PollResult.Interrupted
}

def processReadyEvents(poller: Poller): Boolean = false
Expand Down
Loading

0 comments on commit 60204b6

Please sign in to comment.