Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread parking for Kotlin/Common #498

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open

Conversation

bbrockbernd
Copy link
Collaborator

Allows to pause and resume thread execution. On native platforms it is based on pthread_cond_wait, on JVM it uses LockSupport.

Threads can be pre unparked (calling unpark before park cancels the parking operation). And thread can be parker with a timeout.

Suspend the current thread by calling:

Parker.park()

Resume a thread by calling:

Parker.unpark(thread)

Get current thread reference by calling:

val thread = KThread.currentThread()

Parking with timout of 500 nano seconds:

Parker.parkNanos(500)

@bbrockbernd bbrockbernd marked this pull request as ready for review December 19, 2024 11:10
import kotlin.time.DurationUnit
import kotlin.time.TimeSource.Monotonic

// Only for testing purposes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only for testing :)

// Only for testing purposes
internal class JvmParkingDelegator: ParkingDelegator {
private var thread: Thread? = null
private val atomicLong: AtomicLong = AtomicLong(0L)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename it to state, document that these states are "PARKED" and "UNPARKED", and use either AtomicBoolean provided AtomicFU or AtomicIntegerFieldUpdater if this is easier due to the compilation complexity.

Comment on lines +15 to +18
val combo = ParkingData(nativeHeap.alloc<pthread_mutex_t>().ptr, nativeHeap.alloc<pthread_cond_t>().ptr)
pthread_mutex_init(combo.mut, null)
pthread_cond_init(combo.cond, null)
return combo

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, style:

Suggested change
val combo = ParkingData(nativeHeap.alloc<pthread_mutex_t>().ptr, nativeHeap.alloc<pthread_cond_t>().ptr)
pthread_mutex_init(combo.mut, null)
pthread_cond_init(combo.cond, null)
return combo
val mut = nativeHeap.alloc<pthread_mutex_t>().ptr
val cond = nativeHeap.alloc<pthread_cond_t>().ptr
pthread_mutex_init(mut, null)
pthread_cond_init(cond, null)
return ParkingData(mut, cond)

I think it's more straightforward: you don't have to understand that combo.mut is the same thing that's passed to ParkingData as the first argument.

* Internal utility that delegates the thread suspending and resuming calls calls in the platform specific way (darwin, linux, windows).
* On jvm delegates to LockSupport.Park.
*/
internal interface ParkingDelegator {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something, on each platform, there is a single implementation of this interface, so extracting it is premature, I feel.

internal expect class ParkingData

internal expect object ParkingDelegator {
    fun createRef(): ParkingData
    fun wait(ref: ParkingData) 
    fun timedWait(ref: ParkingData, nanos: Long)
    fun wake(ref: ParkingData)
    fun destroyRef(ref: ParkingData)
}

On the JVM, internal actual typealias ParkingData = Unit would achieve the same effect as what we have now.


override fun createRef(): Long {
thread = Thread.currentThread()
atomicLong.set(0L)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me like the atomic long here is a protection against spurious wake-ups. Is that it? I don't understand the utility of this safety mechanism. Code using thread parking is often already structured in a way that takes spurious wake-ups into account.

If we do remove the safety mechanism, however, it means that JvmParkingDelegator can be stateless and use Thread as the reference that's passed around. That, in turn, means that there's no need to create a parking delegator instance per thread parker.

actual override fun wake(ref: Any) {
if (ref !is ParkingData) throw IllegalArgumentException("ParkingDelegator got incompatible parking object")
pthread_mutex_lock(ref.mut)
ref.wake.value = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto: if this is only a protection against spurious wake-ups, it's more efficient to omit it, because it's likely that the calling code can easily add the same logic on top of the provided primitive, and it will usually be able to do that without creating extra mutable state (like the AtomicBoolean here).


actual override fun timedWait(ref: Any, nanos: Long) {
if (ref !is ParkingData) throw IllegalArgumentException("ParkingDelegator got incompatible parking object")
val ts = nativeHeap.alloc<timespec>().ptr
Copy link

@dkhalanskyjb dkhalanskyjb Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: there's the https://kotlinlang.org/api/core/kotlin-stdlib/kotlinx.cinterop/mem-scoped.html function, which allows binding allocations to lexical scopes.

memScoped {
  val ts = alloc<timespec>()
  clock_gettime(CLOCK_REALTIME.convert(), ts.ptr)
  // ...
  pthread_mutex_unlock(ref.mut)
} // `ts` gets deallocated when this lambda exits

var rc = 0
pthread_mutex_lock(ref.mut)
while (!ref.wake.value && rc == 0) {
rc = pthread_cond_timedwait(ref.cond, ref.mut, ts)
Copy link

@dkhalanskyjb dkhalanskyjb Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is notably different from what atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/parking/PosixParkingDelegator.kt has: instead of adding nanos to the existing timespec, this code sets the timespec to nanos. Is this difference intentional? If yes, it should be documented, because it looks like a bug on the surface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this should be added, thanks! I will try to run the tests on a 32Bit system.

actual override fun destroyRef(ref: Any) {
if (ref !is ParkingData) throw IllegalArgumentException("ParkingDelegator got incompatible parking object")
pthread_mutex_destroy(ref.mut)
pthread_cond_destroy(ref.cond)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth checking the error codes returned from native functions and emit at least some diagnostic information. You never know.

}
}
private val thisKthread = KThread()
actual fun currentThreadId(): Long = 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do decide to avoid spurious wake-ups, this implementation is incorrect. But in any case, I think it's also useless to provide it: I'm not sure what anyone using thread synchronization on single-threaded runtimes even expects. As part of the mutex implementation, this particular implementation makes sense, but not for other primitives.

Proposal: create a source set combining JVM and Native (in kotlinx.coroutines, we call it concurrent), and instead of providing parking in common code, only provide it in concurrent. Also, with this source set, it's easier to share tests between jvmTest and nativeTest: for most of them, it seems possible to put them in concurrentTest.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants