Implement ObservableValue<T>.asFlow() (#1789)
* Implement ObservableValue<T>.asFlow()
Fixes #1695
diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api
index 24c5b70..620e904 100644
--- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api
+++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api
@@ -1,3 +1,7 @@
+public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
+ public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
+}
+
public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
new file mode 100644
index 0000000..903b60a
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.javafx
+
+import javafx.beans.value.ChangeListener
+import javafx.beans.value.ObservableValue
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.flow.*
+
+/**
+ * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
+ * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
+ * succession, only the last one will be emitted.
+ * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
+ * supports lazy evaluation, eager computation will be enforced while the flow is being collected.
+ * All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
+ * This flow emits at least the initial value.
+ *
+ * ### Operator fusion
+ *
+ * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
+ * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
+ */
+@ExperimentalCoroutinesApi
+public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
+ val listener = ChangeListener<T> { _, _, newValue ->
+ try {
+ offer(newValue)
+ } catch (e: CancellationException) {
+ // In case the event fires after the channel is closed
+ }
+ }
+ addListener(listener)
+ send(value)
+ awaitClose {
+ removeListener(listener)
+ }
+}.flowOn(Dispatchers.JavaFx).conflate()
diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
index 8f5be31..ed74ad6 100644
--- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
+++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
@@ -116,7 +116,7 @@
}
}
-/** @return [true] if initialized successfully, and [false] if no display is detected */
+/** @return true if initialized successfully, and false if no display is detected */
internal fun initPlatform(): Boolean = PlatformInitializer.success
// Lazily try to initialize JavaFx platform just once
diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt
similarity index 97%
rename from ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt
rename to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt
index e6a1ddb..724be6d 100644
--- a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt
+++ b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt
@@ -8,7 +8,7 @@
import kotlinx.coroutines.*
import org.junit.*
-class JavaFxTest : TestBase() {
+class JavaFxDispatcherTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
new file mode 100644
index 0000000..6964050
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
@@ -0,0 +1,86 @@
+package kotlinx.coroutines.javafx
+
+import javafx.beans.property.SimpleIntegerProperty
+import kotlinx.coroutines.TestBase
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Before
+import org.junit.Test
+import kotlin.test.*
+
+
+class JavaFxObservableAsFlowTest : TestBase() {
+
+ @Before
+ fun setup() {
+ ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
+ }
+
+ @Test
+ fun testFlowOrder() = runTest {
+ if (!initPlatform()) {
+ println("Skipping JavaFxTest in headless environment")
+ return@runTest // ignore test in headless environments
+ }
+
+ val integerProperty = SimpleIntegerProperty(0)
+ val n = 1000
+ val flow = integerProperty.asFlow().takeWhile { j -> j != n }
+ newSingleThreadContext("setter").use { pool ->
+ launch(pool) {
+ for (i in 1..n) {
+ launch(Dispatchers.JavaFx) {
+ integerProperty.set(i)
+ }
+ }
+ }
+ var i = -1
+ flow.collect { j ->
+ assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled")
+ i = j
+ }
+ }
+ }
+
+ @Test
+ fun testConflation() = runTest {
+ if (!initPlatform()) {
+ println("Skipping JavaFxTest in headless environment")
+ return@runTest // ignore test in headless environments
+ }
+
+ withContext(Dispatchers.JavaFx) {
+ val END_MARKER = -1
+ val integerProperty = SimpleIntegerProperty(0)
+ val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
+ launch {
+ yield() // to subscribe to [integerProperty]
+ yield() // send 0
+ integerProperty.set(1)
+ expect(3)
+ yield() // send 1
+ expect(5)
+ integerProperty.set(2)
+ for (i in (-100..-2)) {
+ integerProperty.set(i) // should be skipped due to conflation
+ }
+ integerProperty.set(3)
+ expect(6)
+ yield() // send 2 and 3
+ integerProperty.set(-1)
+ }
+ expect(1)
+ flow.collect { i ->
+ when (i) {
+ 0 -> expect(2)
+ 1 -> expect(4)
+ 2 -> expect(7)
+ 3 -> expect(8)
+ else -> fail("i is $i")
+ }
+ }
+ finish(9)
+ }
+ }
+
+}
diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt
new file mode 100644
index 0000000..5338835
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.javafx
+
+import javafx.beans.property.SimpleIntegerProperty
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.first
+import org.junit.*
+
+class JavaFxStressTest : TestBase() {
+
+ @Before
+ fun setup() {
+ ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
+ }
+
+ @get:Rule
+ val pool = ExecutorRule(1)
+
+ @Test
+ fun testCancellationRace() = runTest {
+ if (!initPlatform()) {
+ println("Skipping JavaFxTest in headless environment")
+ return@runTest // ignore test in headless environments
+ }
+
+ val integerProperty = SimpleIntegerProperty(0)
+ val flow = integerProperty.asFlow()
+ var i = 1
+ val n = 1000 * stressTestMultiplier
+ repeat (n) {
+ launch(pool) {
+ flow.first()
+ }
+ withContext(Dispatchers.JavaFx) {
+ integerProperty.set(i)
+ }
+ i += 1
+ }
+ }
+}
\ No newline at end of file
diff --git a/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt b/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt
new file mode 100644
index 0000000..00003f7
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt
@@ -0,0 +1,101 @@
+package examples
+
+import javafx.application.Application
+import javafx.scene.Scene
+import javafx.scene.control.*
+import javafx.scene.layout.GridPane
+import javafx.stage.Stage
+import javafx.beans.property.SimpleStringProperty
+import javafx.event.EventHandler
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.javafx.*
+import kotlin.coroutines.CoroutineContext
+
+fun main(args: Array<String>) {
+ Application.launch(FxAsFlowApp::class.java, *args)
+}
+
+/**
+ * Adapted from
+ * https://github.com/ReactiveX/RxJavaFX/blob/a78ca7d15f7d82d201df8fafb6eba732ec17e327/src/test/java/io/reactivex/rxjavafx/RxJavaFXTest.java
+ */
+class FxAsFlowApp: Application(), CoroutineScope {
+
+ private var job = Job()
+ override val coroutineContext: CoroutineContext
+ get() = JavaFx + job
+
+ private val incrementButton = Button("Increment")
+ private val incrementLabel = Label("")
+ private val textInput = TextField()
+ private val flippedTextLabel = Label()
+ private val spinner = Spinner<Int>()
+ private val spinnerChangesLabel = Label()
+
+ public override fun start( primaryStage: Stage) {
+ val gridPane = GridPane()
+ gridPane.apply {
+ hgap = 10.0
+ vgap = 10.0
+ add(incrementButton, 0, 0)
+ add(incrementLabel, 1, 0)
+ add(textInput, 0, 1)
+ add(flippedTextLabel, 1, 1)
+ add(spinner, 0, 2)
+ add(spinnerChangesLabel, 1, 2)
+ }
+ val scene = Scene(gridPane)
+ primaryStage.apply {
+ width = 275.0
+ setScene(scene)
+ show()
+ }
+ }
+
+ public override fun stop() {
+ super.stop()
+ job.cancel()
+ job = Job()
+ }
+
+ init {
+ // Initializing the "Increment" button
+ val stringProperty = SimpleStringProperty()
+ var i = 0
+ incrementButton.onAction = EventHandler {
+ i += 1
+ stringProperty.set(i.toString())
+ }
+ launch {
+ stringProperty.asFlow().collect {
+ if (it != null) {
+ stringProperty.set(it)
+ }
+ }
+ }
+ incrementLabel.textProperty().bind(stringProperty)
+ // Initializing the reversed text field
+ val stringProperty2 = SimpleStringProperty()
+ launch {
+ textInput.textProperty().asFlow().collect {
+ if (it != null) {
+ stringProperty2.set(it.reversed())
+ }
+ }
+ }
+ flippedTextLabel.textProperty().bind(stringProperty2)
+ // Initializing the spinner
+ spinner.valueFactory = SpinnerValueFactory.IntegerSpinnerValueFactory(0, 100)
+ spinner.isEditable = true
+ val stringProperty3 = SimpleStringProperty()
+ launch {
+ spinner.valueProperty().asFlow().collect {
+ if (it != null) {
+ stringProperty3.set("NEW: $it")
+ }
+ }
+ }
+ spinnerChangesLabel.textProperty().bind(stringProperty3)
+ }
+}