Skip to content

Commit

Permalink
Merge pull request #4227 from armanbilge/issue/4225
Browse files Browse the repository at this point in the history
Always report timers/events when worker unparked
  • Loading branch information
djspiewak authored Jan 6, 2025
2 parents 625006b + e372a87 commit e1da97e
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 14 deletions.
25 changes: 11 additions & 14 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -474,21 +474,18 @@ private[effect] final class WorkerThread[P <: AnyRef](
pool.shutdown()
false // we know `done` is `true`
} else {
if (parked.get()) {
// we were either awakened spuriously, or we timed out or polled an event
if (polled || (triggerTime - now <= 0)) {
// we timed out or polled an event
if (parked.getAndSet(false)) {
pool.doneSleeping()
}
true
} else {
// awakened spuriously, re-check next sleeper
parkUntilNextSleeper()
// no matter why we woke up, there may be timers or events ready
if (polled || (triggerTime - now <= 0)) {
// we timed out or polled an event
if (parked.getAndSet(false)) {
pool.doneSleeping()
}
} else {
// awakened intentionally
false
true
} else { // we were either awakened spuriously or intentionally
if (parked.get()) // awakened spuriously, re-check next sleeper
parkUntilNextSleeper()
else // awakened intentionally, but not due to a timer or event
false
}
}
} else {
Expand Down
65 changes: 65 additions & 0 deletions tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,71 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala
}
}

"poll punctually on a single-thread runtime with concurrent sleepers" in {

trait DummyPoller {
def poll: IO[Unit]
}

object DummySystem extends PollingSystem {
type Api = DummyPoller
type Poller = AtomicReference[List[Either[Throwable, Unit] => Unit]]

def close() = ()

def makePoller() = new AtomicReference(List.empty[Either[Throwable, Unit] => Unit])
def needsPoll(poller: Poller) = poller.get.nonEmpty
def closePoller(poller: Poller) = ()
def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop

def interrupt(targetThread: Thread, targetPoller: Poller) =
SleepSystem.interrupt(targetThread, SleepSystem.makePoller())

def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit) = {
poller.getAndSet(Nil) match {
case Nil =>
SleepSystem.poll(SleepSystem.makePoller(), nanos, reportFailure)
case cbs =>
cbs.foreach(_.apply(Right(())))
true
}
}

def makeApi(ctx: PollingContext[Poller]): DummySystem.Api =
new DummyPoller {
def poll = IO.async_[Unit] { cb =>
ctx.accessPoller { poller =>
poller.getAndUpdate(cb :: _)
()
}
}
}
}

val (pool, poller, shutdown) = IORuntime.createWorkStealingComputeThreadPool(
threads = 1,
pollingSystem = DummySystem)

implicit val runtime: IORuntime =
IORuntime.builder().setCompute(pool, shutdown).addPoller(poller, () => ()).build()

try {
val test = IO.sleep(1.minute).background.surround {
IO.pollers.map(_.head.asInstanceOf[DummyPoller]).flatMap { poller =>
// in #4225 the fiber rescheduled during this poll does not execute until the next timer fires
poller.poll.as(true)
}
}

// NOTE!!!
// We cannot use a timeout *on* the runtime, because that causes the polling fiber
// to become unstuck sooner and pass the test
test.unsafeRunTimed(1.second) must beSome(beTrue)
} finally {
runtime.shutdown()
}
}

if (javaMajorVersion >= 21)
"block in-place on virtual threads" in real {
val loomExec = classOf[Executors]
Expand Down

0 comments on commit e1da97e

Please sign in to comment.