diff --git a/atomicfu/api/atomicfu.api b/atomicfu/api/atomicfu.api index e1428174..f5902f9a 100644 --- a/atomicfu/api/atomicfu.api +++ b/atomicfu/api/atomicfu.api @@ -135,3 +135,20 @@ public final class kotlinx/atomicfu/TraceKt { public static final fun named (Lkotlinx/atomicfu/TraceBase;Ljava/lang/String;)Lkotlinx/atomicfu/TraceBase; } +public final class kotlinx/atomicfu/locks/Mutex { + public fun ()V + public final fun getReentrantLock ()Ljava/util/concurrent/locks/ReentrantLock; + public final fun isLocked ()Z + public final fun lock ()V + public final fun tryLock ()Z + public final fun unlock ()V +} + +public final class kotlinx/atomicfu/locks/MutexKt { + public static final fun withLock (Lkotlinx/atomicfu/locks/Mutex;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object; +} + +public final class kotlinx/atomicfu/locks/ThreadIdentifier_jvmKt { + public static final fun currentThreadId ()J +} + diff --git a/atomicfu/build.gradle.kts b/atomicfu/build.gradle.kts index 5e18fa6d..f21da281 100644 --- a/atomicfu/build.gradle.kts +++ b/atomicfu/build.gradle.kts @@ -90,6 +90,7 @@ kotlin { implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-test") implementation("org.jetbrains.kotlin:kotlin-test-junit") + implementation("org.jetbrains.kotlinx:lincheck:2.34") implementation(libs.junit.junit) } } @@ -98,82 +99,113 @@ kotlin { // Support of all non-deprecated targets from the official tier list: https://kotlinlang.org/docs/native-target-support.html kotlin { - // Tier 1 - macosX64() - macosArm64() - iosSimulatorArm64() - iosX64() - - // Tier 2 - linuxX64() - linuxArm64() - watchosSimulatorArm64() - watchosX64() - watchosArm32() - watchosArm64() - tvosSimulatorArm64() - tvosX64() - tvosArm64() - iosArm64() - - // Tier 3 - androidNativeArm32() - androidNativeArm64() - androidNativeX86() - androidNativeX64() - mingwX64() - watchosDeviceArm64() + val appleTargets = listOf( + // Tier 1 + macosX64(), + macosArm64(), + iosSimulatorArm64(), + iosX64(), + + // Tier 2, + watchosSimulatorArm64(), + watchosX64(), + watchosArm32(), + watchosArm64(), + tvosSimulatorArm64(), + tvosX64(), + tvosArm64(), + iosArm64(), + + // Tier 3 + watchosDeviceArm64(), + ) - @Suppress("DEPRECATION") //https://github.com/Kotlin/kotlinx-atomicfu/issues/207 - linuxArm32Hfp() - - @OptIn(ExperimentalKotlinGradlePluginApi::class) - applyDefaultHierarchyTemplate { - group("native") { - group("nativeUnixLike") { - withLinux() - withApple() + val linuxTargets = listOf( + // Tier 2, + linuxX64(), + linuxArm64(), + ) + + val androidNativeTargets = listOf( + // Tier 3 + androidNativeArm32(), + androidNativeArm64(), + androidNativeX86(), + androidNativeX64(), + ) + + val windowsTargets = listOf( + mingwX64(), + ) + + (linuxTargets + androidNativeTargets).forEach { + it.compilations.getByName("main").cinterops { + // This is a hack to fix commonization bug: KT-73136 + val ulock by creating { + defFile(project.file("src/nativeInterop/cinterop/stub.def")) + packageName = "stub" } - } - group("androidNative32Bit") { - withAndroidNativeX86() - withCompilations { compilation -> - (compilation.target as? KotlinNativeTarget)?.konanTarget?.name == "android_arm32" + val posixparking by creating { + defFile(project.file("src/nativeInterop/cinterop/posixparking.def")) + packageName = "platform.posix" } } - group("androidNative64Bit") { - withAndroidNativeArm64() - withAndroidNativeX64() - } - } - - sourceSets { - val nativeUnixLikeMain by getting { - kotlin.srcDir("src/nativeUnixLikeMain/kotlin") - dependsOn(nativeMain.get()) + + appleTargets.forEach { + it.compilations.getByName("main").cinterops { + val ulock by creating { + defFile(project.file("src/nativeInterop/cinterop/ulock.def")) + packageName = "platform.darwin.ulock" + includeDirs("${project.rootDir}/atomicfu/src/nativeInterop/cinterop") + } + val posixparking by creating { + defFile(project.file("src/nativeInterop/cinterop/posixparking.def")) + packageName = "platform.posix" + } } - - val androidNative32BitMain by getting { - kotlin.srcDir("src/androidNative32BitMain/kotlin") - dependsOn(nativeMain.get()) + } + + windowsTargets.forEach { + it.binaries.all { + linkerOpts += "-lSynchronization" } - - val androidNative64BitMain by getting { - kotlin.srcDir("src/androidNative64BitMain/kotlin") - dependsOn(nativeMain.get()) + it.compilations.getByName("main").cinterops { + // This is a hack to fix commonization bug: KT-73136 + val ulock by creating { + defFile(project.file("src/nativeInterop/cinterop/stub.def")) + packageName = "stub" + } + val posixparking by creating { + defFile(project.file("src/nativeInterop/cinterop/posixparking.def")) + packageName = "platform.posix" + } } + } - val androidNative32BitTest by getting { - kotlin.srcDir("src/androidNative32BitTest/kotlin") - dependsOn(nativeTest.get()) + @Suppress("DEPRECATION") //https://github.com/Kotlin/kotlinx-atomicfu/issues/207 + linuxArm32Hfp { + compilations.getByName("main").cinterops { + // This is a hack to fix commonization bug: KT-73136 + val ulock by creating { + defFile(project.file("src/nativeInterop/cinterop/stub.def")) + packageName = "stub" + } + val posixparking by creating { + defFile(project.file("src/nativeInterop/cinterop/posixparking.def")) + packageName = "platform.posix" + } } + } - val androidNative64BitTest by getting { - kotlin.srcDir("src/androidNative64BitTest/kotlin") - dependsOn(nativeTest.get()) - } + applyDefaultHierarchyTemplate() + sourceSets { + val linux64Main by creating { dependsOn(nativeMain.get()) } + linuxX64Main.get().dependsOn(linux64Main) + linuxArm64Main.get().dependsOn(linux64Main) + val linux32Main by creating { dependsOn(nativeMain.get()) } + linuxArm32HfpMain.get().dependsOn(linux32Main) } // atomicfu-cinterop-interop.klib with an empty interop.def file will still be published for compatibility reasons (see KT-68411) @@ -378,4 +410,3 @@ val jvmTest by tasks.getting(Test::class) { ) // run them only for transformed code } - diff --git a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.linux64.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.linux64.kt new file mode 100644 index 00000000..259cc5fd --- /dev/null +++ b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.linux64.kt @@ -0,0 +1,8 @@ +package kotlinx.atomicfu.locks + +internal actual object FutexParkingDelegator: ParkingDelegator { + actual override fun createFutexPtr(): Long = PosixParkingDelegator.createFutexPtr() + actual override fun wait(futexPrt: Long): Boolean = PosixParkingDelegator.wait(futexPrt) + actual override fun wake(futexPrt: Long): Int = PosixParkingDelegator.wake(futexPrt) + actual override fun manualDeallocate(futexPrt: Long) = PosixParkingDelegator.manualDeallocate(futexPrt) +} diff --git a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt similarity index 94% rename from atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename to atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt index 032e6768..f166afc7 100644 --- a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt @@ -1,5 +1,7 @@ package kotlinx.atomicfu.locks +import kotlinx.cinterop.ptr + import kotlinx.cinterop.* import platform.posix.* import kotlin.concurrent.* @@ -22,7 +24,7 @@ public actual class NativeMutexNode { pthread_mutex_unlock(pMutex.ptr) } - actual fun unlock() { + actual fun unlock() { pthread_mutex_lock(pMutex.ptr) isLocked = false pthread_cond_broadcast(pCond.ptr) diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.apple.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.apple.kt new file mode 100644 index 00000000..1be061a8 --- /dev/null +++ b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.apple.kt @@ -0,0 +1,36 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.* +import platform.darwin.UInt32 +import platform.darwin.UInt64Var +import platform.darwin.ulock.__ulock_wait +import platform.darwin.ulock.__ulock_wake + +@OptIn(ExperimentalForeignApi::class) +internal actual object FutexParkingDelegator: ParkingDelegator { + actual override fun createFutexPtr(): Long { + val signal = nativeHeap.alloc() + signal.value = 0u + return signal.ptr.toLong() + } + + actual override fun wait(futexPrt: Long): Boolean { + val cPointer = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + val result = __ulock_wait(UL_COMPARE_AND_WAIT, cPointer, 0u, 0u) + nativeHeap.free(cPointer) + // THere is very little information about ulock so not sure what returned int stands for an interrupt + // In any case it should be 0 + return result != 0 + } + + actual override fun wake(futexPrt: Long): Int { + return __ulock_wake(UL_COMPARE_AND_WAIT, futexPrt.toCPointer(), 0u) + } + + actual override fun manualDeallocate(futexPrt: Long) { + val cPointer = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + nativeHeap.free(cPointer) + } + + private const val UL_COMPARE_AND_WAIT: UInt32 = 1u +} diff --git a/atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt similarity index 91% rename from atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename to atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt index 5b6ff7cf..f166afc7 100644 --- a/atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt @@ -1,8 +1,10 @@ package kotlinx.atomicfu.locks +import kotlinx.cinterop.ptr + import kotlinx.cinterop.* import platform.posix.* -import kotlin.concurrent.Volatile +import kotlin.concurrent.* public actual class NativeMutexNode { @@ -22,7 +24,7 @@ public actual class NativeMutexNode { pthread_mutex_unlock(pMutex.ptr) } - actual fun unlock() { + actual fun unlock() { pthread_mutex_lock(pMutex.ptr) isLocked = false pthread_cond_broadcast(pCond.ptr) diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt new file mode 100644 index 00000000..09ccc8e4 --- /dev/null +++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt @@ -0,0 +1,30 @@ +package kotlinx.atomicfu.locks + +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract + +/** + * Multiplatform mutex. + * On native based on futex(-like) system calls. + * On JVM delegates to ReentrantLock. + */ +expect class Mutex() { + fun isLocked(): Boolean + fun tryLock(): Boolean + fun lock() + fun unlock() +} +@OptIn(ExperimentalContracts::class) +fun Mutex.withLock(block: () -> T): T { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } + lock() + return try { + block() + } finally { + unlock() + } +} + diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt new file mode 100644 index 00000000..528ae8cb --- /dev/null +++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/NativeMutex.kt @@ -0,0 +1,148 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.AtomicRef +import kotlinx.atomicfu.atomic + +/** + * Mutex implementation for Kotlin/Native. + * It is placed in common sourceSet such that it can be verified by LinCheck (which only runs on JVM) in jvmTest. + * + * The [state] variable stands for: 0 -> Lock is free + * 1 -> Lock is locked but no waiters + * 4 -> Lock is locked with 3 waiters + * + * The state.incrementAndGet() call makes my claim on the lock. + * The returned value either means I acquired it (when it is 1). + * Or I need to enqueue and park (when it is > 1). + * + * The [holdCount] variable is to enable reentrancy. + * + * Works by using a [parkingQueue]. + * When a thread tries to acquire the lock, but finds it is already locked it enqueues by appending to the [parkingQueue]. + * On enqueue the parking queue provides the second last node, this node is used to park on. + * When our thread is woken up that means that the thread parked on the thrid last node called unpark on the second last node. + * Since a woken up thread is first inline it means that it's node is the head and can therefore dequeue. + * + * Unlocking happens by calling state.decrementAndGet(). + * When the returned value is 0 it means the lock is free and we can simply return. + * If the new state is > 0, then there are waiters. We wake up the first by unparking the head of the queue. + * This even works when a thread is not parked yet, + * since the ThreadParker can be pre-unparked resulting in the parking call to return immediately. + */ +internal class NativeMutex(private val createDelegator: () -> ParkingDelegator) { + private val parkingQueue = ParkingQueue() + private val owningThread = atomic(-1L) + private val state = atomic(0) + private val holdCount = atomic(0) + + + fun lock() { + val currentThreadId = currentThreadId() + + // Has to be checked in this order! + if (holdCount.value > 0 && currentThreadId == owningThread.value) { + // Is reentring thread + holdCount.incrementAndGet() + return + } + + // Otherwise try acquire lock + val newState = state.incrementAndGet() + // If new state 1 than I have acquired lock skipping queue. + if (newState == 1) { + owningThread.value = currentThreadId + holdCount.incrementAndGet() + return + } + + // If state larger than 1 -> enqueue and park + // When woken up thread has acquired lock and his node in the queue is therefore at the head. + // Remove head + if (newState > 1) { + val prevNode = parkingQueue.enqueue() + prevNode.parker.park() + parkingQueue.dequeue() + owningThread.value = currentThreadId + holdCount.incrementAndGet() + return + } + } + + fun unlock() { + val currentThreadId = currentThreadId() + val currentOwnerId = owningThread.value + if (currentThreadId != currentOwnerId) throw IllegalStateException("Thread is not holding the lock") + + // dec hold count + val newHoldCount = holdCount.decrementAndGet() + if (newHoldCount > 0) return + if (newHoldCount < 0) throw IllegalStateException("Thread unlocked more than it locked") + + // Lock is released by decrementing (only if decremented to 0) + val currentState = state.decrementAndGet() + if (currentState == 0) return + + // If waiters wake up the first in line. The woken up thread will dequeue the node. + if (currentState > 0) { + val nextParker = parkingQueue.getHead() + nextParker.parker.unpark() + return + } + } + + fun isLocked(): Boolean { + return state.value > 0 + } + + fun tryLock(): Boolean { + val currentThreadId = currentThreadId() + if (holdCount.value > 0 && owningThread.value == currentThreadId || state.compareAndSet(0, 1)) { + owningThread.value = currentThreadId + holdCount.incrementAndGet() + return true + } + return false + } + + // Based on Micheal-Scott Queue + internal inner class ParkingQueue { + private val head: AtomicRef + private val tail: AtomicRef + + init { + val first = Node() + head = atomic(first) + tail = atomic(first) + } + + fun getHead(): Node { + return head.value + } + + fun enqueue(): Node { + while (true) { + val node = Node() + val curTail = tail.value + if (curTail.next.compareAndSet(null, node)) { + tail.compareAndSet(curTail, node) + return curTail + } + else tail.compareAndSet(curTail, curTail.next.value!!) + } + } + + fun dequeue() { + while (true) { + val currentHead = head.value + val currentHeadNext = currentHead.next.value ?: throw IllegalStateException("Dequeing parker but already empty, should not be possible") + if (head.compareAndSet(currentHead, currentHeadNext)) return + } + } + + } + + internal inner class Node { + val parker = ThreadParker(createDelegator()) + val next = atomic(null) + } +} \ No newline at end of file diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt new file mode 100644 index 00000000..04d37eb3 --- /dev/null +++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt @@ -0,0 +1,12 @@ +package kotlinx.atomicfu.locks + +/** + * Internal utility that delegates the futex or posix calls in the platform specific way (darwin, linux, windows). + * On jvm delegates to LockSupport.Park. (The reason we need this on jvm is to verify the mutex with lincheck) + */ +internal interface ParkingDelegator { + fun createFutexPtr(): Long + fun wait(futexPrt: Long): Boolean + fun wake(futexPrt: Long): Int + fun manualDeallocate(futexPrt: Long) +} \ No newline at end of file diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.kt new file mode 100644 index 00000000..83af3b4b --- /dev/null +++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.kt @@ -0,0 +1,3 @@ +package kotlinx.atomicfu.locks + +internal expect fun currentThreadId(): Long \ No newline at end of file diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt new file mode 100644 index 00000000..2578fde9 --- /dev/null +++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt @@ -0,0 +1,80 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic + +/** + * This is defined in common to be testable with lincheck. + * Should in practice never be used on jvm. + * + * [park] Parks suspends the calling thread. Unless [unpark] is already called. + * [unpark] resumes the suspended thread, or prevents the next call to [park] from suspending the caller. + * [state] Keeps track of whether the parker is in the normnal state ([STATE_FREE]), parked state ([STATE_PARKED]), + * or pre unparked state ([STATE_UNPARKED]). + * [atomicPtr] points to the memory address of the futex or posix struct used for signalling. + */ +internal class ThreadParker(private val delegator: ParkingDelegator) { + private val state = atomic(STATE_FREE) + private val atomicPtr = atomic(-1L) + + fun park() { + while (true) { + when (val currentState = state.value) { + + STATE_FREE -> { + if (!state.compareAndSet(currentState, STATE_PARKED)) continue + initPtrIfAbsent() + val interrupted = delegator.wait(atomicPtr.value) + // Interrupted exception does not exist on native + if (interrupted) throw IllegalStateException("Thread was interrupted") + state.value = STATE_FREE + atomicPtr.value = -1L + return + } + + STATE_UNPARKED -> { + if (!state.compareAndSet(currentState, STATE_FREE)) continue + return + } + + STATE_PARKED -> + throw IllegalStateException("Thread should not be able to call park when it is already parked") + + } + } + } + + // Avoid calling this multiple times, not thread safe. + // Enough for mutex impl. + fun unpark() { + while (true) { + when (val currentState = state.value) { + + STATE_UNPARKED -> return + + STATE_FREE -> { + if (!state.compareAndSet(currentState, STATE_UNPARKED)) continue + return + } + + STATE_PARKED -> { + initPtrIfAbsent() + val result = delegator.wake(atomicPtr.value) + if (result == 0) return + } + } + } + } + + private fun initPtrIfAbsent() { + val ptrVal = atomicPtr.value + if (ptrVal == -1L) { + val currentPtr = delegator.createFutexPtr() + if (atomicPtr.compareAndSet(ptrVal, currentPtr)) return + delegator.manualDeallocate(currentPtr) + } + } +} + +internal const val STATE_UNPARKED = 0 +internal const val STATE_FREE = 1 +internal const val STATE_PARKED = 2 diff --git a/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt new file mode 100644 index 00000000..a359afab --- /dev/null +++ b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt @@ -0,0 +1,9 @@ +package kotlinx.atomicfu.locks + +actual class Mutex { + private var state = 0 + actual fun isLocked(): Boolean = state != 0 + actual fun tryLock(): Boolean = true + actual fun lock(): Unit { state++ } + actual fun unlock(): Unit { if (state-- < 0) throw IllegalStateException("Mutex already unlocked") } +} \ No newline at end of file diff --git a/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.kt b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.kt new file mode 100644 index 00000000..6f9b8170 --- /dev/null +++ b/atomicfu/src/jsAndWasmSharedMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.kt @@ -0,0 +1,4 @@ +package kotlinx.atomicfu.locks + +internal actual fun currentThreadId(): Long = 42 + diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt new file mode 100644 index 00000000..d02e643c --- /dev/null +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt @@ -0,0 +1,31 @@ +package kotlinx.atomicfu.locks +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.LockSupport + +// Only for testing purposes +internal class JvmParkingDelegator: ParkingDelegator { + private var thread: Thread? = null + private val atomicLong: AtomicLong = AtomicLong(0L) + + override fun createFutexPtr(): Long { + thread = Thread.currentThread() + return 0L + } + + override fun wait(futexPrt: Long): Boolean { + while (atomicLong.get() == 0L) { + LockSupport.park() + } + thread = null + return false + } + + override fun wake(futexPrt: Long): Int { + if (atomicLong.compareAndSet(0L, 1L)) { + LockSupport.unpark(thread) + } + return 0 + } + + override fun manualDeallocate(futexPrt: Long) {} +} diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/Mutex.jvm.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/Mutex.jvm.kt new file mode 100644 index 00000000..1f80b51e --- /dev/null +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/Mutex.jvm.kt @@ -0,0 +1,11 @@ +package kotlinx.atomicfu.locks + +import java.util.concurrent.locks.ReentrantLock + +actual class Mutex actual constructor() { + val reentrantLock: ReentrantLock = ReentrantLock() + actual fun isLocked(): Boolean = reentrantLock.isLocked + actual fun tryLock(): Boolean = reentrantLock.tryLock() + actual fun lock() = reentrantLock.lock() + actual fun unlock() = reentrantLock.unlock() +} diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.jvm.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.jvm.kt new file mode 100644 index 00000000..3e47b135 --- /dev/null +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.jvm.kt @@ -0,0 +1,3 @@ +package kotlinx.atomicfu.locks + +actual fun currentThreadId(): Long = Thread.currentThread().id diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/test/NativeMutexLincheckReentrantTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/test/NativeMutexLincheckReentrantTest.kt new file mode 100644 index 00000000..7f402950 --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/test/NativeMutexLincheckReentrantTest.kt @@ -0,0 +1,57 @@ +package kotlinx.atomicfu.test +import kotlinx.atomicfu.locks.JvmParkingDelegator +import kotlinx.atomicfu.locks.NativeMutex +import org.jetbrains.kotlinx.lincheck.LoggingLevel +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.check +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions +import kotlin.test.Test +import kotlin.test.Ignore + +class NativeMutexLincheckReentrantTest { + + private val lock = NativeMutex { JvmParkingDelegator() } + private val mutableList = mutableListOf(0) + + @Ignore + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(300) + .invocationsPerIteration(10_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun add(n: Int) { + lock.lock() + lock.lock() + mutableList.add(n) + lock.unlock() + lock.unlock() + } + + @Operation + fun removeFirst(): Int? { + lock.lock() + lock.lock() + val bla = mutableList.removeFirstOrNull() + lock.unlock() + lock.unlock() + return bla + } + + @Operation + fun removeLast(): Int? { + lock.lock() + lock.lock() + val bla = mutableList.removeLastOrNull() + lock.unlock() + lock.unlock() + return bla + } +} diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/test/NativeMutexLincheckTest.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/test/NativeMutexLincheckTest.kt new file mode 100644 index 00000000..8331c05a --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/test/NativeMutexLincheckTest.kt @@ -0,0 +1,53 @@ +package kotlinx.atomicfu.test + +import kotlinx.atomicfu.locks.JvmParkingDelegator +import kotlinx.atomicfu.locks.NativeMutex +import org.jetbrains.kotlinx.lincheck.LoggingLevel +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.check +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions +import kotlin.test.Ignore +import kotlin.test.Test + +class NativeMutexLincheckTest { + + private val lock = NativeMutex { JvmParkingDelegator() } + private val mutableList = mutableListOf(0) + + // Ignored because takes a lot of time + @Ignore + @Test + fun modelCheckingTest(): Unit = ModelCheckingOptions() + .iterations(300) + .invocationsPerIteration(10_000) + .actorsBefore(1) + .threads(3) + .actorsPerThread(3) + .actorsAfter(0) + .hangingDetectionThreshold(100) + .logLevel(LoggingLevel.INFO) + .check(this::class.java) + + @Operation + fun add(n: Int) { + lock.lock() + mutableList.add(n) + lock.unlock() + } + + @Operation + fun removeFirst(): Int? { + lock.lock() + val bla = mutableList.removeFirstOrNull() + lock.unlock() + return bla + } + + @Operation + fun removeLast(): Int? { + lock.lock() + val bla = mutableList.removeLastOrNull() + lock.unlock() + return bla + } +} diff --git a/atomicfu/src/linux32Main/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.linux32.kt b/atomicfu/src/linux32Main/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.linux32.kt new file mode 100644 index 00000000..fba6675d --- /dev/null +++ b/atomicfu/src/linux32Main/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.linux32.kt @@ -0,0 +1,36 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.* +import platform.linux.SYS_futex +import platform.posix.* + +const val FUTEX_WAIT = 0 +const val FUTEX_WAKE = 1 + +@OptIn(ExperimentalForeignApi::class) +internal actual object FutexParkingDelegator: ParkingDelegator { + actual override fun createFutexPtr(): Long { + val signal = nativeHeap.alloc() + signal.value = 0u + return signal.ptr.toLong() + } + + actual override fun wait(futexPrt: Long): Boolean { + val cPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + val result = syscall(SYS_futex.toInt(), futexPrt, FUTEX_WAIT, 0u, NULL) + val interrupted = result.toInt() == EINTR + nativeHeap.free(cPtr) + return interrupted + } + + actual override fun wake(futexPrt: Long): Int { + //Returns n threads woken up (needs to be 1) + val result = syscall(SYS_futex.toInt(), futexPrt, FUTEX_WAKE, 1u, NULL).toInt() + return if (result == 1) 0 else -1 + } + + actual override fun manualDeallocate(futexPrt: Long) { + val cPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + nativeHeap.free(cPtr) + } +} diff --git a/atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/linux32Main/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt similarity index 91% rename from atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename to atomicfu/src/linux32Main/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt index 5b6ff7cf..f166afc7 100644 --- a/atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ b/atomicfu/src/linux32Main/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt @@ -1,8 +1,10 @@ package kotlinx.atomicfu.locks +import kotlinx.cinterop.ptr + import kotlinx.cinterop.* import platform.posix.* -import kotlin.concurrent.Volatile +import kotlin.concurrent.* public actual class NativeMutexNode { @@ -22,7 +24,7 @@ public actual class NativeMutexNode { pthread_mutex_unlock(pMutex.ptr) } - actual fun unlock() { + actual fun unlock() { pthread_mutex_lock(pMutex.ptr) isLocked = false pthread_cond_broadcast(pCond.ptr) diff --git a/atomicfu/src/linux64Main/kotlin/kotlinx.atomicfu.locks/FutexParkingDelegator.linux64.kt b/atomicfu/src/linux64Main/kotlin/kotlinx.atomicfu.locks/FutexParkingDelegator.linux64.kt new file mode 100644 index 00000000..6fbe4160 --- /dev/null +++ b/atomicfu/src/linux64Main/kotlin/kotlinx.atomicfu.locks/FutexParkingDelegator.linux64.kt @@ -0,0 +1,36 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.* +import platform.linux.SYS_futex +import platform.posix.* + +const val FUTEX_WAIT = 0 +const val FUTEX_WAKE = 1 + +@OptIn(ExperimentalForeignApi::class) +internal actual object FutexParkingDelegator: ParkingDelegator { + actual override fun createFutexPtr(): Long { + val signal = nativeHeap.alloc() + signal.value = 0u + return signal.ptr.toLong() + } + + actual override fun wait(futexPrt: Long): Boolean { + val cPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + val result = syscall(SYS_futex.toLong(), futexPrt, FUTEX_WAIT, 0u, NULL) + val interrupted = result.toInt() == EINTR + nativeHeap.free(cPtr) + return interrupted + } + + actual override fun wake(futexPrt: Long): Int { + //Returns n threads woken up (needs to be 1) + val result = syscall(SYS_futex.toLong(), futexPrt, FUTEX_WAKE, 1u, NULL).toInt() + return if (result == 1) 0 else -1 + } + + actual override fun manualDeallocate(futexPrt: Long) { + val cPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + nativeHeap.free(cPtr) + } +} diff --git a/atomicfu/src/linux64Main/kotlin/kotlinx.atomicfu.locks/NativeMutexNode.kt b/atomicfu/src/linux64Main/kotlin/kotlinx.atomicfu.locks/NativeMutexNode.kt new file mode 100644 index 00000000..f166afc7 --- /dev/null +++ b/atomicfu/src/linux64Main/kotlin/kotlinx.atomicfu.locks/NativeMutexNode.kt @@ -0,0 +1,33 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.ptr + +import kotlinx.cinterop.* +import platform.posix.* +import kotlin.concurrent.* + +public actual class NativeMutexNode { + + @Volatile + private var isLocked = false + private val pMutex = nativeHeap.alloc().apply { pthread_mutex_init(ptr, null) } + private val pCond = nativeHeap.alloc().apply { pthread_cond_init(ptr, null) } + + internal actual var next: NativeMutexNode? = null + + actual fun lock() { + pthread_mutex_lock(pMutex.ptr) + while (isLocked) { // wait till locked are available + pthread_cond_wait(pCond.ptr, pMutex.ptr) + } + isLocked = true + pthread_mutex_unlock(pMutex.ptr) + } + + actual fun unlock() { + pthread_mutex_lock(pMutex.ptr) + isLocked = false + pthread_cond_broadcast(pCond.ptr) + pthread_mutex_unlock(pMutex.ptr) + } +} \ No newline at end of file diff --git a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.mingw.kt b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.mingw.kt new file mode 100644 index 00000000..60bc4324 --- /dev/null +++ b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.mingw.kt @@ -0,0 +1,46 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.* +import platform.windows.INFINITE +import platform.windows.UINT64Var +import platform.windows.WaitOnAddress +import platform.windows.WakeByAddressSingle + +@OptIn(ExperimentalForeignApi::class) +internal actual object FutexParkingDelegator: ParkingDelegator { + actual override fun createFutexPtr(): Long { + val signal = nativeHeap.alloc() + signal.value = 0u + return signal.ptr.toLong() + } + + // From https://learn.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitonaddress + actual override fun wait(futexPrt: Long): Boolean { + val cPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + + val undesiredValue = nativeHeap.alloc() + undesiredValue.value = 0u + var capturedValue = cPtr.pointed.value + var result = 0 + while (capturedValue == 0uL) { + result = WaitOnAddress(cPtr, undesiredValue.ptr, 8u, INFINITE) + capturedValue = cPtr.pointed.value + } + + nativeHeap.free(undesiredValue) + nativeHeap.free(cPtr) + // TODO check thread interrupts on windows + return false + } + + actual override fun wake(futexPrt: Long): Int { + WakeByAddressSingle(futexPrt.toCPointer()) + // windows doesn't return a success or fail status. + return 0 + } + + actual override fun manualDeallocate(futexPrt: Long) { + val cPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from futex ref") + nativeHeap.free(cPtr) + } +} diff --git a/atomicfu/src/nativeInterop/cinterop/posixparking.def b/atomicfu/src/nativeInterop/cinterop/posixparking.def new file mode 100644 index 00000000..3b3d2304 --- /dev/null +++ b/atomicfu/src/nativeInterop/cinterop/posixparking.def @@ -0,0 +1,40 @@ +headers = stdlib.h stdint.h pthread.h +--- +typedef struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + uint64_t wake; +} posix_combo_t; + +posix_combo_t* posixParkInit() { + posix_combo_t* pc = malloc(sizeof(posix_combo_t)); + pthread_mutex_init(&pc->mutex, NULL); + pthread_cond_init(&pc->cond, NULL); + pc->wake = 0; + return pc; +} + +void posixWait(posix_combo_t* pc) { + pthread_mutex_lock(&pc->mutex); + while(!pc->wake) { + pthread_cond_wait(&pc->cond, &pc->mutex); + } + pthread_mutex_unlock(&pc->mutex); + pthread_mutex_destroy(&pc->mutex); + pthread_cond_destroy(&pc->cond); + free(pc); +} + +void posixWake(posix_combo_t* pc) { + pthread_mutex_lock(&pc->mutex); + pc->wake = 1; + pthread_cond_signal(&pc->cond); + pthread_mutex_unlock(&pc->mutex); +} + +void posixDestroy(posix_combo_t* pc) { + pthread_mutex_destroy(&pc->mutex); + pthread_cond_destroy(&pc->cond); + free(pc); +} + diff --git a/atomicfu/src/nativeInterop/cinterop/stub.def b/atomicfu/src/nativeInterop/cinterop/stub.def new file mode 100644 index 00000000..e69de29b diff --git a/atomicfu/src/nativeInterop/cinterop/ulock.def b/atomicfu/src/nativeInterop/cinterop/ulock.def new file mode 100644 index 00000000..484efec6 --- /dev/null +++ b/atomicfu/src/nativeInterop/cinterop/ulock.def @@ -0,0 +1 @@ +headers = stdint.h pthread.h ulock.h \ No newline at end of file diff --git a/atomicfu/src/nativeInterop/cinterop/ulock.h b/atomicfu/src/nativeInterop/cinterop/ulock.h new file mode 100644 index 00000000..e6bfa8ad --- /dev/null +++ b/atomicfu/src/nativeInterop/cinterop/ulock.h @@ -0,0 +1,3 @@ +int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout_us); +int __ulock_wait2(uint32_t operation, void *addr, uint64_t value, uint64_t timeout_ns, uint64_t value2); +int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.kt new file mode 100644 index 00000000..ae8a7ad4 --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/FutexParkingDelegator.kt @@ -0,0 +1,12 @@ +package kotlinx.atomicfu.locks + +/** + * Experimental! Might be unwise to use for apple targets. + * + */ +internal expect object FutexParkingDelegator: ParkingDelegator { + override fun createFutexPtr(): Long + override fun wait(futexPrt: Long): Boolean + override fun wake(futexPrt: Long): Int + override fun manualDeallocate(futexPrt: Long) +} diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Mutex.native.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Mutex.native.kt new file mode 100644 index 00000000..b0f948e4 --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Mutex.native.kt @@ -0,0 +1,9 @@ +package kotlinx.atomicfu.locks + +actual class Mutex { + private val mutex = NativeMutex { PosixParkingDelegator } + actual fun isLocked(): Boolean = mutex.isLocked() + actual fun tryLock(): Boolean = mutex.tryLock() + actual fun lock() = mutex.lock() + actual fun unlock() = mutex.unlock() +} diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNodeOld.kt similarity index 100% rename from atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename to atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNodeOld.kt diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt new file mode 100644 index 00000000..08a505c9 --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -0,0 +1,31 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.toCPointer +import kotlinx.cinterop.toLong +import platform.posix.* + +@OptIn(ExperimentalForeignApi::class) +internal object PosixParkingDelegator: ParkingDelegator { + override fun createFutexPtr(): Long { + val combo = posixParkInit() + return combo.toLong() + } + + override fun wait(futexPrt: Long): Boolean { + val comboPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from pthread_cond ref") + posixWait(comboPtr) + return false + } + + override fun wake(futexPrt: Long): Int { + val comboPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from pthread_cond ref") + posixWake(comboPtr) + return 0 + } + + override fun manualDeallocate(futexPrt: Long) { + val comboPtr = futexPrt.toCPointer() ?: throw IllegalStateException("Could not create C Pointer from pthread_cond ref") + posixDestroy(comboPtr) + } +} diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt index 3fdfc38c..8f19835f 100644 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt @@ -1,159 +1,10 @@ package kotlinx.atomicfu.locks -import platform.posix.* -import kotlinx.atomicfu.locks.SynchronizedObject.Status.* -import kotlinx.cinterop.UnsafeNumber -import kotlin.concurrent.AtomicReference - -@OptIn(UnsafeNumber::class) // required for KT-60572 public actual open class SynchronizedObject { - - protected val lock = AtomicReference(LockState(UNLOCKED, 0, 0)) - - public fun lock() { - val currentThreadId = pthread_self()!! - while (true) { - val state = lock.value - when (state.status) { - UNLOCKED -> { - val thinLock = LockState(THIN, 1, 0, currentThreadId) - if (lock.compareAndSet(state, thinLock)) - return - } - - THIN -> { - if (currentThreadId == state.ownerThreadId) { - // reentrant lock - val thinNested = LockState(THIN, state.nestedLocks + 1, state.waiters, currentThreadId) - if (lock.compareAndSet(state, thinNested)) - return - } else { - // another thread is trying to take this lock -> allocate native mutex - val mutex = mutexPool.allocate() - mutex.lock() - val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, mutex) - if (lock.compareAndSet(state, fatLock)) { - //block the current thread waiting for the owner thread to release the permit - mutex.lock() - tryLockAfterResume(currentThreadId) - return - } else { - // return permit taken for the owner thread and release mutex back to the pool - mutex.unlock() - mutexPool.release(mutex) - } - } - } - - FAT -> { - if (currentThreadId == state.ownerThreadId) { - // reentrant lock - val nestedFatLock = - LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex) - if (lock.compareAndSet(state, nestedFatLock)) return - } else if (state.ownerThreadId != null) { - val fatLock = - LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex) - if (lock.compareAndSet(state, fatLock)) { - fatLock.mutex!!.lock() - tryLockAfterResume(currentThreadId) - return - } - } - } - } - } - } - - public fun tryLock(): Boolean { - val currentThreadId = pthread_self()!! - while (true) { - val state = lock.value - if (state.status == UNLOCKED) { - val thinLock = LockState(THIN, 1, 0, currentThreadId) - if (lock.compareAndSet(state, thinLock)) - return true - } else { - if (currentThreadId == state.ownerThreadId) { - val nestedLock = - LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex) - if (lock.compareAndSet(state, nestedLock)) - return true - } else { - return false - } - } - } - } - - public fun unlock() { - val currentThreadId = pthread_self()!! - while (true) { - val state = lock.value - require(currentThreadId == state.ownerThreadId) { "Thin lock may be only released by the owner thread, expected: ${state.ownerThreadId}, real: $currentThreadId" } - when (state.status) { - THIN -> { - // nested unlock - if (state.nestedLocks == 1) { - val unlocked = LockState(UNLOCKED, 0, 0) - if (lock.compareAndSet(state, unlocked)) - return - } else { - val releasedNestedLock = - LockState(THIN, state.nestedLocks - 1, state.waiters, state.ownerThreadId) - if (lock.compareAndSet(state, releasedNestedLock)) - return - } - } - - FAT -> { - if (state.nestedLocks == 1) { - // last nested unlock -> release completely, resume some waiter - val releasedLock = LockState(FAT, 0, state.waiters - 1, null, state.mutex) - if (lock.compareAndSet(state, releasedLock)) { - releasedLock.mutex!!.unlock() - return - } - } else { - // lock is still owned by the current thread - val releasedLock = - LockState(FAT, state.nestedLocks - 1, state.waiters, state.ownerThreadId, state.mutex) - if (lock.compareAndSet(state, releasedLock)) - return - } - } - - else -> error("It is not possible to unlock the mutex that is not obtained") - } - } - } - - private fun tryLockAfterResume(threadId: pthread_t) { - while (true) { - val state = lock.value - val newState = if (state.waiters == 0) // deflate - LockState(THIN, 1, 0, threadId) - else - LockState(FAT, 1, state.waiters, threadId, state.mutex) - if (lock.compareAndSet(state, newState)) { - if (state.waiters == 0) { - state.mutex!!.unlock() - mutexPool.release(state.mutex) - } - return - } - } - } - - protected class LockState( - val status: Status, - val nestedLocks: Int, - val waiters: Int, - val ownerThreadId: pthread_t? = null, - val mutex: NativeMutexNode? = null - ) - - protected enum class Status { UNLOCKED, THIN, FAT } + private val lock = NativeMutex { PosixParkingDelegator } + public fun lock() = lock.lock() + public fun tryLock() = lock.tryLock() + public fun unlock() = lock.unlock() } public actual fun reentrantLock() = ReentrantLock() @@ -177,46 +28,3 @@ public actual inline fun synchronized(lock: SynchronizedObject, block: () -> lock.unlock() } } - -private const val INITIAL_POOL_CAPACITY = 64 - -private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) } - -class MutexPool(capacity: Int) { - private val top = AtomicReference(null) - - private val mutexes = Array(capacity) { NativeMutexNode() } - - init { - // Immediately form a stack - for (mutex in mutexes) { - release(mutex) - } - } - - private fun allocMutexNode() = NativeMutexNode() - - fun allocate(): NativeMutexNode = pop() ?: allocMutexNode() - - fun release(mutexNode: NativeMutexNode) { - while (true) { - val oldTop = top.value - mutexNode.next = oldTop - if (top.compareAndSet(oldTop, mutexNode)) { - return - } - } - } - - private fun pop(): NativeMutexNode? { - while (true) { - val oldTop = top.value - if (oldTop == null) - return null - val newHead = oldTop.next - if (top.compareAndSet(oldTop, newHead)) { - return oldTop - } - } - } -} diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronizedOld.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronizedOld.kt new file mode 100644 index 00000000..73ff342e --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/SynchronizedOld.kt @@ -0,0 +1,201 @@ +package kotlinx.atomicfu.locks + +import platform.posix.* +import kotlinx.atomicfu.locks.SynchronizedObjectOld.Status.* +import kotlinx.cinterop.UnsafeNumber +import kotlin.concurrent.AtomicReference + +@OptIn(UnsafeNumber::class) // required for KT-60572 +public open class SynchronizedObjectOld { + + protected val lock = AtomicReference(LockState(UNLOCKED, 0, 0)) + + public fun lock() { + val currentThreadId = pthread_self()!! + while (true) { + val state = lock.value + when (state.status) { + UNLOCKED -> { + val thinLock = LockState(THIN, 1, 0, currentThreadId) + if (lock.compareAndSet(state, thinLock)) + return + } + + THIN -> { + if (currentThreadId == state.ownerThreadId) { + // reentrant lock + val thinNested = LockState(THIN, state.nestedLocks + 1, state.waiters, currentThreadId) + if (lock.compareAndSet(state, thinNested)) + return + } else { + // another thread is trying to take this lock -> allocate native mutex + val mutex = mutexPool.allocate() + mutex.lock() + val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, mutex) + if (lock.compareAndSet(state, fatLock)) { + //block the current thread waiting for the owner thread to release the permit + mutex.lock() + tryLockAfterResume(currentThreadId) + return + } else { + // return permit taken for the owner thread and release mutex back to the pool + mutex.unlock() + mutexPool.release(mutex) + } + } + } + + FAT -> { + if (currentThreadId == state.ownerThreadId) { + // reentrant lock + val nestedFatLock = + LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex) + if (lock.compareAndSet(state, nestedFatLock)) return + } else if (state.ownerThreadId != null) { + val fatLock = + LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex) + if (lock.compareAndSet(state, fatLock)) { + fatLock.mutex!!.lock() + tryLockAfterResume(currentThreadId) + return + } + } + } + } + } + } + + public fun tryLock(): Boolean { + val currentThreadId = pthread_self()!! + while (true) { + val state = lock.value + if (state.status == UNLOCKED) { + val thinLock = LockState(THIN, 1, 0, currentThreadId) + if (lock.compareAndSet(state, thinLock)) + return true + } else { + if (currentThreadId == state.ownerThreadId) { + val nestedLock = + LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex) + if (lock.compareAndSet(state, nestedLock)) + return true + } else { + return false + } + } + } + } + + public fun unlock() { + val currentThreadId = pthread_self()!! + while (true) { + val state = lock.value + require(currentThreadId == state.ownerThreadId) { "Thin lock may be only released by the owner thread, expected: ${state.ownerThreadId}, real: $currentThreadId" } + when (state.status) { + THIN -> { + // nested unlock + if (state.nestedLocks == 1) { + val unlocked = LockState(UNLOCKED, 0, 0) + if (lock.compareAndSet(state, unlocked)) + return + } else { + val releasedNestedLock = + LockState(THIN, state.nestedLocks - 1, state.waiters, state.ownerThreadId) + if (lock.compareAndSet(state, releasedNestedLock)) + return + } + } + + FAT -> { + if (state.nestedLocks == 1) { + // last nested unlock -> release completely, resume some waiter + val releasedLock = LockState(FAT, 0, state.waiters - 1, null, state.mutex) + if (lock.compareAndSet(state, releasedLock)) { + releasedLock.mutex!!.unlock() + return + } + } else { + // lock is still owned by the current thread + val releasedLock = + LockState(FAT, state.nestedLocks - 1, state.waiters, state.ownerThreadId, state.mutex) + if (lock.compareAndSet(state, releasedLock)) + return + } + } + + else -> error("It is not possible to unlock the mutex that is not obtained") + } + } + } + + private fun tryLockAfterResume(threadId: pthread_t) { + while (true) { + val state = lock.value + val newState = if (state.waiters == 0) // deflate + LockState(THIN, 1, 0, threadId) + else + LockState(FAT, 1, state.waiters, threadId, state.mutex) + if (lock.compareAndSet(state, newState)) { + if (state.waiters == 0) { + state.mutex!!.unlock() + mutexPool.release(state.mutex) + } + return + } + } + } + + protected class LockState( + val status: Status, + val nestedLocks: Int, + val waiters: Int, + val ownerThreadId: pthread_t? = null, + val mutex: NativeMutexNode? = null + ) + + protected enum class Status { UNLOCKED, THIN, FAT } +} + + +private const val INITIAL_POOL_CAPACITY = 64 + +private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) } + +class MutexPool(capacity: Int) { + private val top = AtomicReference(null) + + private val mutexes = Array(capacity) { NativeMutexNode() } + + init { + // Immediately form a stack + for (mutex in mutexes) { + release(mutex) + } + } + + private fun allocMutexNode() = NativeMutexNode() + + fun allocate(): NativeMutexNode = pop() ?: allocMutexNode() + + fun release(mutexNode: NativeMutexNode) { + while (true) { + val oldTop = top.value + mutexNode.next = oldTop + if (top.compareAndSet(oldTop, mutexNode)) { + return + } + } + } + + private fun pop(): NativeMutexNode? { + while (true) { + val oldTop = top.value + if (oldTop == null) + return null + val newHead = oldTop.next + if (top.compareAndSet(oldTop, newHead)) { + return oldTop + } + } + } +} \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.native.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.native.kt new file mode 100644 index 00000000..f39877cd --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ThreadIdentifier.native.kt @@ -0,0 +1,10 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic + +private val threadCounter = atomic(0L) + +@kotlin.native.concurrent.ThreadLocal +private var threadId: Long = threadCounter.addAndGet(1) + +internal actual fun currentThreadId(): Long = threadId diff --git a/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/CompareToAtomicFU.kt b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/CompareToAtomicFU.kt new file mode 100644 index 00000000..f0448795 --- /dev/null +++ b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/CompareToAtomicFU.kt @@ -0,0 +1,155 @@ +package kotlinx.atomicfu.locks + +import kotlin.native.concurrent.Future +import kotlin.native.concurrent.TransferMode +import kotlin.native.concurrent.Worker +import kotlin.test.Test +import kotlin.test.assertTrue +import kotlin.time.DurationUnit +import kotlin.time.measureTime + +/** + * Compares to the atomicfu implementation. + * + * Fair tests: + * Each thread has a number id, n threads have numbers 0 until n. + * A counter protected by mutex needs to be incremented unttil 10.000 + * A thread can only increment when the counter is counter mod id. This tests fariness and progress. For each thread. + * + * Random tests: + * Are like fair tests but after each increment the thread choses randomly which thread should be next. + * This loses fairness but still requires progress for the test to complete. + */ +class CompareToAtomicFU { + @Test + fun compareWithAtomicFUSingleThread() { + var accumulatedDifference = 0L + repeat(3) { + val time1 = measureTime { + singleTOld() + } + println("Old $time1") + val time2 = measureTime { + singleTNew() + } + println("New $time2") + accumulatedDifference += time1.toLong(DurationUnit.MILLISECONDS) - time2.toLong(DurationUnit.MILLISECONDS) + } + assertTrue(accumulatedDifference > 0) + } + + @Test + fun compareAtomicFU3ThreadsFair() = compareAtomicFUMultiThread(3, true) + + @Test + fun compareAtomicFU5ThreadsFair() = compareAtomicFUMultiThread(5, true) + + @Test + fun compareAtomicFU7ThreadsFair() = compareAtomicFUMultiThread(7, true) + + @Test + fun compareAtomicFU3ThreadsRandom() = compareAtomicFUMultiThread(3, false) + + @Test + fun compareAtomicFU5ThreadsRandom() = compareAtomicFUMultiThread(5, false) + + @Test + fun compareAtomicFU7ThreadsRandom() = compareAtomicFUMultiThread(7, false) + + fun compareAtomicFUMultiThread(nThreads: Int, fair: Boolean) { + var accumulatedDifference = 0L + repeat(3) { + val timeNew = measureTime { + val newLock = NewLockInt() + mulitTestLock(newLock, nThreads, fair) + } + println("New $timeNew") + + val timeOld = measureTime { + val oldLock = OldLockInt() + mulitTestLock(oldLock, nThreads, fair) + } + println("Old $timeOld") + accumulatedDifference += timeOld.toLong(DurationUnit.MILLISECONDS) - timeNew.toLong(DurationUnit.MILLISECONDS) + } + assertTrue(accumulatedDifference > 0) + } + + fun singleTNew() { + val nativeMutex = NativeMutex { PosixParkingDelegator } + repeat(1000000) { + nativeMutex.lock() + nativeMutex.unlock() + } + } + + fun singleTOld() { + val reentrantLock = SynchronizedObjectOld() + repeat(1000000) { + reentrantLock.lock() + reentrantLock.unlock() + } + } + + fun mulitTestLock(lockInt: LockInt, nThreads: Int, fair: Boolean) { + val countTo = 100000 + val futureList = mutableListOf>() + repeat(nThreads) { i -> + val test = LockIntTest(lockInt, countTo, nThreads, i, fair) + futureList.add(testWithWorker(test)) + } + futureList.forEach { + it.result + } + } + + fun testWithWorker(test: LockIntTest): Future { + val worker = Worker.start() + return worker.execute(TransferMode.UNSAFE, { test }) { t -> + while (true) { + t.lockInt.lock() + if (t.fair && t.lockInt.n % t.mod == t.id) t.lockInt.n++ + if (!t.fair && t.lockInt.rand == t.id) { + t.lockInt.n++ + t.lockInt.rand = (0..< t.mod).random() + } + if (t.lockInt.n >= t.max) { + t.lockInt.unlock() + break + } + t.lockInt.unlock() + } + } + } + + data class LockIntTest( + val lockInt: LockInt, + val max: Int, + val mod: Int, + val id: Int, + val fair: Boolean + ) + + class NewLockInt: LockInt{ + private val lock = NativeMutex { PosixParkingDelegator } + override var n = 0 + override var rand = 0 + override fun lock() = lock.lock() + override fun unlock() = lock.unlock() + } + + class OldLockInt: LockInt { + private val lock = SynchronizedObjectOld() + override var n = 0 + override var rand = 0 + override fun lock() = lock.lock() + override fun unlock() = lock.unlock() + } + + interface LockInt { + fun lock() + fun unlock() + var n: Int + var rand: Int + } +} diff --git a/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt new file mode 100644 index 00000000..d7a940e1 --- /dev/null +++ b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/NativeMutexTest.kt @@ -0,0 +1,103 @@ +package kotlinx.atomicfu.locks +import platform.posix.usleep +import kotlin.native.concurrent.TransferMode +import kotlin.native.concurrent.Worker +import kotlin.test.Test +import kotlin.test.assertEquals + +class NativeMutexTest { + + + @Test + fun testNativeMutexSlow() { + val mutex = NativeMutex { PosixParkingDelegator } + val resultList = mutableListOf() + + val worker1 = Worker.start() + val fut1 = worker1.execute(TransferMode.UNSAFE, { mutex }) { mutex -> + repeat(30) { i -> + mutex.lock() + println("Locked : A $i") + usleep(100000u) + println("Unlocked: A $i") + mutex.unlock() + } + } + + val worker2 = Worker.start() + val fut2 = worker2.execute(TransferMode.UNSAFE, { mutex }) { mutex -> + repeat(30) { i -> + mutex.lock() + println("Locked : B $i") + usleep(100000u) + println("Unlocked: B $i") + mutex.unlock() + } + } + + repeat(30) { i -> + mutex.lock() + println("Locked : C $i") + usleep(100000u) + println("Unlocked: C $i") + mutex.unlock() + } + fut1.result + fut2.result + + resultList.filterIndexed { i, _ -> i % 2 == 0 } + .zip(resultList.filterIndexed {i, _ -> i % 2 == 1}) { a, b -> + assertEquals(a, b) + } + } + + @Test + fun testNativeMutexFast() { + val mutex = NativeMutex { PosixParkingDelegator } + val resultList = mutableListOf() + + val worker1 = Worker.start() + val fut1 = worker1.execute(TransferMode.UNSAFE, { Pair(resultList, mutex) }) { (rl, mutex) -> + repeat(30000) { i -> + mutex.lock() + rl.add("a$i") + println("Locked : A $i") + println("Unlocked: A $i") + rl.add("a$i") + mutex.unlock() + } + println("A DONE") + } + + val worker2 = Worker.start() + val fut2= worker2.execute(TransferMode.UNSAFE, { Pair(resultList, mutex) }) { (rl, mutex) -> + repeat(30000) { i -> + mutex.lock() + rl.add("b$i") + println("Locked : B $i") + println("Unlocked: B $i") + rl.add("b$i") + mutex.unlock() + } + println("B DONE") + } + + repeat(30000) { i -> + mutex.lock() + resultList.add("c$i") + println("Locked : C $i") + println("Unlocked: C $i") + resultList.add("c$i") + mutex.unlock() + } + println("C DONE") + fut1.result + fut2.result + + resultList + .filterIndexed { i, _ -> i % 2 == 0 } + .zip(resultList.filterIndexed {i, _ -> i % 2 == 1}) { a, b -> + assertEquals(a, b) + } + } +} diff --git a/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt new file mode 100644 index 00000000..838a9c98 --- /dev/null +++ b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/VaryingContentionTest.kt @@ -0,0 +1,76 @@ +package kotlinx.atomicfu.locks + +import kotlin.concurrent.AtomicInt +import kotlin.native.concurrent.Future +import kotlin.native.concurrent.TransferMode +import kotlin.native.concurrent.Worker +import kotlin.test.Test +import kotlin.test.assertTrue + +class VaryingContentionTest { + + @Test + fun compareAtomicFUMultiThread() { + val lockInt = NewLockInt2() + mulitTestLock(lockInt, 10, 100000) + println("1") + mulitTestLock(lockInt, 1, 200000) + println("2") + mulitTestLock(lockInt, 20, 300000) + println("3") + mulitTestLock(lockInt, 1, 400000) + println("4") + mulitTestLock(lockInt, 2, 1000000) + println("done") + } + + + fun mulitTestLock(lockInt: NewLockInt2, nThreads: Int, countTo: Int) { + val futureList = mutableListOf>() + repeat(nThreads) { i -> + val test = LockIntTest(lockInt, countTo, nThreads, i) + futureList.add(testWithWorker(test)) + } + futureList.forEach { + it.result + } + } + + fun testWithWorker(test: LockIntTest): Future { + val worker = Worker.start() + return worker.execute(TransferMode.UNSAFE, { test }) { t -> + while (true) { + t.lockInt.lock() + if (t.lockInt.n % t.mod == t.id) t.lockInt.n++ + if (t.lockInt.n >= t.max) { + t.lockInt.unlock() + break + } + t.lockInt.unlock() + } + } + } + + data class LockIntTest( + val lockInt: NewLockInt2, + val max: Int, + val mod: Int, + val id: Int, + ) + + class NewLockInt2{ + private val lock = NativeMutex { PosixParkingDelegator } + private val check = AtomicInt(0) + var n = 0 + fun lock() { + lock.lock() + assertTrue(check.incrementAndGet() == 1) + } + + fun unlock() { + assertTrue(check.decrementAndGet() == 0) + lock.unlock() + } + + } +}