Native implementation of synchronized object
Fixes #80
diff --git a/atomicfu/build.gradle b/atomicfu/build.gradle
index db3352b..aec75f9 100644
--- a/atomicfu/build.gradle
+++ b/atomicfu/build.gradle
@@ -8,12 +8,14 @@
project.ext.nativeMainSets = []
project.ext.nativeTestSets = []
+project.ext.nativeCompilations = []
kotlin {
targets.metaClass.addTarget = { preset ->
def target = delegate.fromPreset(preset, preset.name)
project.ext.nativeMainSets.add(target.compilations['main'].kotlinSourceSets.first())
project.ext.nativeTestSets.add(target.compilations['test'].kotlinSourceSets.first())
+ project.ext.nativeCompilations.add(target.compilations['main'])
}
targets {
@@ -74,10 +76,24 @@
}
nativeMain { dependsOn commonMain }
+ nativeTest {}
+
if (!project.ext.ideaActive) {
configure(nativeMainSets) {
dependsOn nativeMain
}
+
+ configure(nativeTestSets) {
+ dependsOn nativeTest
+ }
+ }
+ }
+
+ configure(project.ext.nativeCompilations) {
+ cinterops {
+ interop {
+ defFile 'src/nativeInterop/cinterop/interop.def'
+ }
}
}
}
diff --git a/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/Synchronized.common.kt b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/Synchronized.common.kt
new file mode 100644
index 0000000..e3cc37b
--- /dev/null
+++ b/atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/Synchronized.common.kt
@@ -0,0 +1,6 @@
+package kotlinx.atomicfu
+
+public expect open class SynchronizedObject() // marker abstract class
+
+public expect inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T
+
diff --git a/atomicfu/src/jsMain/kotlin/kotlinx/atomicfu/Synchronized.kt b/atomicfu/src/jsMain/kotlin/kotlinx/atomicfu/Synchronized.kt
new file mode 100644
index 0000000..f5a119c
--- /dev/null
+++ b/atomicfu/src/jsMain/kotlin/kotlinx/atomicfu/Synchronized.kt
@@ -0,0 +1,6 @@
+package kotlinx.atomicfu
+
+public actual typealias SynchronizedObject = Any
+
+public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T =
+ block()
\ No newline at end of file
diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/Synchronized.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/Synchronized.kt
new file mode 100644
index 0000000..f100340
--- /dev/null
+++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/Synchronized.kt
@@ -0,0 +1,6 @@
+package kotlinx.atomicfu
+
+public actual typealias SynchronizedObject = Any
+
+public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T =
+ kotlin.synchronized(lock, block)
\ No newline at end of file
diff --git a/atomicfu/src/nativeInterop/cinterop/interop.def b/atomicfu/src/nativeInterop/cinterop/interop.def
new file mode 100644
index 0000000..ea5f59d
--- /dev/null
+++ b/atomicfu/src/nativeInterop/cinterop/interop.def
@@ -0,0 +1,45 @@
+---
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+
+typedef struct lock_support {
+ volatile int locked;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+} lock_support_t;
+
+typedef struct mutex_node {
+ lock_support_t* mutex;
+ struct mutex_node* next;
+} mutex_node_t;
+
+lock_support_t* lock_support_init() {
+ lock_support_t * ls = (lock_support_t *) malloc(sizeof(lock_support_t));
+ ls->locked = 0;
+ pthread_mutex_init(&ls->mutex, NULL);
+ pthread_cond_init(&ls->cond, NULL);
+ return ls;
+}
+
+mutex_node_t* mutex_node_init(mutex_node_t* mutexNode) {
+ mutexNode->mutex = lock_support_init();
+ mutexNode->next = NULL;
+ return mutexNode;
+}
+
+void lock(lock_support_t* ls) {
+ pthread_mutex_lock(&ls->mutex);
+ while (ls->locked == 1) { // wait till locked are available
+ pthread_cond_wait(&ls->cond, &ls->mutex);
+ }
+ ls->locked = 1;
+ pthread_mutex_unlock(&ls->mutex);
+}
+
+void unlock(lock_support_t* ls) {
+ pthread_mutex_lock(&ls->mutex);
+ ls->locked = 0;
+ pthread_cond_broadcast(&ls->cond);
+ pthread_mutex_unlock(&ls->mutex);
+}
\ No newline at end of file
diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/Synchronized.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/Synchronized.kt
new file mode 100644
index 0000000..d32a27f
--- /dev/null
+++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/Synchronized.kt
@@ -0,0 +1,200 @@
+package kotlinx.atomicfu
+
+import platform.posix.*
+import interop.*
+import kotlinx.cinterop.*
+import kotlin.native.concurrent.AtomicNativePtr
+import kotlin.native.concurrent.AtomicReference
+import kotlin.native.concurrent.SharedImmutable
+import kotlin.native.concurrent.freeze
+import kotlin.native.internal.NativePtr
+import kotlinx.atomicfu.Status.*
+
+public actual open class SynchronizedObject {
+
+ private val lock = AtomicReference(LockState(UNLOCKED, 0, 0))
+
+ public fun lock() {
+ val currentThreadId = pthread_self()!!
+ while (true) {
+ val state = lock.value
+ when (state.status) {
+ UNLOCKED -> {
+ val thinLock = LockState(THIN, 1, 0, currentThreadId)
+ if (lock.compareAndSet(state, thinLock))
+ return
+ }
+ THIN -> {
+ if (currentThreadId == state.ownerThreadId) {
+ // reentrant lock
+ val thinNested = LockState(THIN, state.nestedLocks + 1, state.waiters, currentThreadId)
+ if (lock.compareAndSet(state, thinNested))
+ return
+ } else {
+ // another thread is trying to take this lock -> allocate native mutex
+ val mutex = mutexPool.allocate()
+ mutex.lock()
+ val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, mutex)
+ if (lock.compareAndSet(state, fatLock)) {
+ //block the current thread waiting for the owner thread to release the permit
+ mutex.lock()
+ tryLockAfterResume(currentThreadId)
+ return
+ } else {
+ // return permit taken for the owner thread and release mutex back to the pool
+ mutex.unlock()
+ mutexPool.release(mutex)
+ }
+ }
+ }
+ FAT -> {
+ if (currentThreadId == state.ownerThreadId) {
+ // nested lock
+ val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
+ if (lock.compareAndSet(state, nestedFatLock)) return
+ } else if (state.ownerThreadId != null) {
+ val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
+ if (lock.compareAndSet(state, fatLock)) {
+ fatLock.mutex!!.lock()
+ tryLockAfterResume(currentThreadId)
+ return
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public fun unlock() {
+ val currentThreadId = pthread_self()!!
+ while (true) {
+ val state = lock.value
+ require(currentThreadId == state.ownerThreadId) { "Thin lock may be only released by the owner thread, expected: ${state.ownerThreadId}, real: $currentThreadId" }
+ when (state.status) {
+ THIN -> {
+ // nested unlock
+ if (state.nestedLocks == 1) {
+ val unlocked = LockState(UNLOCKED, 0, 0)
+ if (lock.compareAndSet(state, unlocked))
+ return
+ } else {
+ val releasedNestedLock =
+ LockState(THIN, state.nestedLocks - 1, state.waiters, state.ownerThreadId)
+ if (lock.compareAndSet(state, releasedNestedLock))
+ return
+ }
+ }
+ FAT -> {
+ if (state.nestedLocks == 1) {
+ // last nested unlock -> release completely, resume some waiter
+ val releasedLock = LockState(FAT, 0, state.waiters - 1, null, state.mutex)
+ if (lock.compareAndSet(state, releasedLock)) {
+ releasedLock.mutex!!.unlock()
+ return
+ }
+ } else {
+ // lock is still owned by the current thread
+ val releasedLock =
+ LockState(FAT, state.nestedLocks - 1, state.waiters, state.ownerThreadId, state.mutex)
+ if (lock.compareAndSet(state, releasedLock))
+ return
+ }
+ }
+ else -> error("It is not possible to unlock the mutex that is not obtained")
+ }
+ }
+ }
+
+ private fun tryLockAfterResume(threadId: pthread_t) {
+ while (true) {
+ val state = lock.value
+ val newState = if (state.waiters == 0) // deflate
+ LockState(THIN, 1, 0, threadId)
+ else
+ LockState(FAT, 1, state.waiters, threadId, state.mutex)
+ if (lock.compareAndSet(state, newState)) {
+ if (state.waiters == 0) {
+ state.mutex!!.unlock()
+ mutexPool.release(state.mutex)
+ }
+ return
+ }
+ }
+ }
+
+ private class LockState(
+ val status: Status,
+ val nestedLocks: Int,
+ val waiters: Int,
+ val ownerThreadId: pthread_t? = null,
+ val mutex: CPointer<mutex_node_t>? = null
+ ) {
+ init { freeze() }
+ }
+
+ private fun CPointer<mutex_node_t>.lock() = lock(this.pointed.mutex)
+
+ private fun CPointer<mutex_node_t>.unlock() = unlock(this.pointed.mutex)
+}
+
+private enum class Status { UNLOCKED, THIN, FAT }
+
+public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T {
+ lock.lock()
+ try {
+ return block()
+ } finally {
+ lock.unlock()
+ }
+}
+
+private const val INITIAL_POOL_CAPACITY = 64
+
+@SharedImmutable
+private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) }
+
+class MutexPool(capacity: Int) {
+ private val top = AtomicNativePtr(NativePtr.NULL)
+
+ private val mutexes = memScoped {
+ nativeHeap.allocArray<mutex_node_t>(capacity) { mutex_node_init(ptr) }
+ }
+
+ init {
+ for (i in 0 until capacity) {
+ val mutexPtr = interpretCPointer<mutex_node_t>(mutexes.rawValue.plus(i * mutex_node_t.size))
+ ?: error ("Cast of the mutex_node NativePtr to CPointer failed")
+ push(mutexPtr)
+ }
+ }
+
+ private fun allocMutexNode() = memScoped {
+ nativeHeap.alloc<mutex_node_t> { mutex_node_init(ptr) }.ptr
+ }
+
+ fun allocate(): CPointer<mutex_node_t> = pop() ?: allocMutexNode()
+
+ fun release(mutexNode: CPointer<mutex_node_t>) {
+ if ((0..20).random() == 0) push(mutexNode)
+ }
+
+ private fun push(mutexNode: CPointer<mutex_node_t>) {
+ while (true) {
+ val oldTop = interpretCPointer<mutex_node_t>(top.value)
+ mutexNode.pointed.next = oldTop
+ if (top.compareAndSet(oldTop.rawValue, mutexNode.rawValue))
+ return
+ }
+ }
+
+ private fun pop(): CPointer<mutex_node_t>? {
+ while (true) {
+ val oldTop = interpretCPointer<mutex_node_t>(top.value)
+ if (oldTop.rawValue === NativePtr.NULL)
+ return null
+ val newHead = oldTop!!.pointed.next
+ if (top.compareAndSet(oldTop.rawValue, newHead.rawValue))
+ return oldTop
+ }
+ }
+}
\ No newline at end of file
diff --git a/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/test/SynchronizedTest.kt b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/test/SynchronizedTest.kt
new file mode 100644
index 0000000..b4d33a4
--- /dev/null
+++ b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/test/SynchronizedTest.kt
@@ -0,0 +1,68 @@
+package kotlinx.atomicfu.test
+
+import kotlinx.atomicfu.*
+import kotlin.native.concurrent.*
+import kotlin.native.concurrent.AtomicInt
+import kotlin.test.*
+
+private const val iterations = 100
+private const val nWorkers = 4
+private const val increments = 500
+private const val nLocks = 5
+
+class SynchronizedTest {
+
+ @Test
+ fun stressCounterTest() {
+ repeat(iterations) {
+ val workers = Array(nWorkers) { Worker.start() }
+ val counter = AtomicInt(0).freeze()
+ val so = SynchronizedObject().freeze()
+ workers.forEach { worker ->
+ worker.execute(TransferMode.SAFE, {
+ counter to so
+ }) { (count, lock) ->
+ repeat(increments) {
+ val nestedLocks = (1..3).random()
+ repeat(nestedLocks) { lock.lock() }
+ val oldValue = count.value
+ count.value = oldValue + 1
+ repeat(nestedLocks) { lock.unlock() }
+ }
+ }
+ }
+ workers.forEach {
+ it.requestTermination().result
+ }
+ assertEquals(nWorkers * increments, counter.value)
+ }
+ }
+
+
+ @Test
+ fun manyLocksTest() {
+ repeat(iterations) {
+ val workers = Array(nWorkers) { Worker.start() }
+ val counters = Array(nLocks) { AtomicInt(0) }.freeze()
+ val locks = Array(nLocks) { SynchronizedObject() }.freeze()
+ workers.forEach { worker ->
+ worker.execute(TransferMode.SAFE, {
+ counters to locks
+ }) { (counters, locks) ->
+ locks.forEachIndexed { i, lock ->
+ repeat(increments) {
+ synchronized(lock) {
+ val oldValue = counters[i].value
+ counters[i].value = oldValue + 1
+ }
+ }
+ }
+ }
+ }
+ workers.forEach {
+ it.requestTermination().result
+ }
+ assertEquals(nWorkers * nLocks * increments, counters.sumBy { it.value })
+ }
+ }
+}
\ No newline at end of file