Implement ObservableValue<T>.asFlow() (#1789)

* Implement ObservableValue<T>.asFlow()

Fixes #1695
diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api
index 24c5b70..620e904 100644
--- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api
+++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api
@@ -1,3 +1,7 @@
+public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
+	public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
+}
+
 public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
 	public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
new file mode 100644
index 0000000..903b60a
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
@@ -0,0 +1,41 @@
+/*
+ *  Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.javafx
+
+import javafx.beans.value.ChangeListener
+import javafx.beans.value.ObservableValue
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.flow.*
+
+/**
+ * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
+ * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
+ * succession, only the last one will be emitted.
+ * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
+ * supports lazy evaluation, eager computation will be enforced while the flow is being collected.
+ * All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
+ * This flow emits at least the initial value.
+ *
+ * ### Operator fusion
+ *
+ * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
+ * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
+ */
+@ExperimentalCoroutinesApi
+public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
+    val listener = ChangeListener<T> { _, _, newValue ->
+        try {
+            offer(newValue)
+        } catch (e: CancellationException) {
+            // In case the event fires after the channel is closed
+        }
+    }
+    addListener(listener)
+    send(value)
+    awaitClose {
+        removeListener(listener)
+    }
+}.flowOn(Dispatchers.JavaFx).conflate()
diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
index 8f5be31..ed74ad6 100644
--- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
+++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
@@ -116,7 +116,7 @@
     }
 }
 
-/** @return [true] if initialized successfully, and [false] if no display is detected */
+/** @return true if initialized successfully, and false if no display is detected */
 internal fun initPlatform(): Boolean = PlatformInitializer.success
 
 // Lazily try to initialize JavaFx platform just once
diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt
similarity index 97%
rename from ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt
rename to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt
index e6a1ddb..724be6d 100644
--- a/ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt
+++ b/ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt
@@ -8,7 +8,7 @@
 import kotlinx.coroutines.*
 import org.junit.*
 
-class JavaFxTest : TestBase() {
+class JavaFxDispatcherTest : TestBase() {
     @Before
     fun setup() {
         ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
new file mode 100644
index 0000000..6964050
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
@@ -0,0 +1,86 @@
+package kotlinx.coroutines.javafx
+
+import javafx.beans.property.SimpleIntegerProperty
+import kotlinx.coroutines.TestBase
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Before
+import org.junit.Test
+import kotlin.test.*
+
+
+class JavaFxObservableAsFlowTest : TestBase() {
+
+    @Before
+    fun setup() {
+        ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
+    }
+
+    @Test
+    fun testFlowOrder() = runTest {
+        if (!initPlatform()) {
+            println("Skipping JavaFxTest in headless environment")
+            return@runTest // ignore test in headless environments
+        }
+
+        val integerProperty = SimpleIntegerProperty(0)
+        val n = 1000
+        val flow = integerProperty.asFlow().takeWhile { j -> j != n }
+        newSingleThreadContext("setter").use { pool ->
+            launch(pool) {
+                for (i in 1..n) {
+                    launch(Dispatchers.JavaFx) {
+                        integerProperty.set(i)
+                    }
+                }
+            }
+            var i = -1
+            flow.collect { j ->
+                assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled")
+                i = j
+            }
+        }
+    }
+
+    @Test
+    fun testConflation() = runTest {
+        if (!initPlatform()) {
+            println("Skipping JavaFxTest in headless environment")
+            return@runTest // ignore test in headless environments
+        }
+
+        withContext(Dispatchers.JavaFx) {
+            val END_MARKER = -1
+            val integerProperty = SimpleIntegerProperty(0)
+            val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
+            launch {
+                yield() // to subscribe to [integerProperty]
+                yield() // send 0
+                integerProperty.set(1)
+                expect(3)
+                yield() // send 1
+                expect(5)
+                integerProperty.set(2)
+                for (i in (-100..-2)) {
+                    integerProperty.set(i) // should be skipped due to conflation
+                }
+                integerProperty.set(3)
+                expect(6)
+                yield() // send 2 and 3
+                integerProperty.set(-1)
+            }
+            expect(1)
+            flow.collect { i ->
+                when (i) {
+                    0 -> expect(2)
+                    1 -> expect(4)
+                    2 -> expect(7)
+                    3 -> expect(8)
+                    else -> fail("i is $i")
+                }
+            }
+            finish(9)
+        }
+    }
+
+}
diff --git a/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt b/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt
new file mode 100644
index 0000000..5338835
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.javafx
+
+import javafx.beans.property.SimpleIntegerProperty
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.first
+import org.junit.*
+
+class JavaFxStressTest : TestBase() {
+
+    @Before
+    fun setup() {
+        ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
+    }
+
+    @get:Rule
+    val pool = ExecutorRule(1)
+
+    @Test
+    fun testCancellationRace() = runTest {
+        if (!initPlatform()) {
+            println("Skipping JavaFxTest in headless environment")
+            return@runTest // ignore test in headless environments
+        }
+
+        val integerProperty = SimpleIntegerProperty(0)
+        val flow = integerProperty.asFlow()
+        var i = 1
+        val n = 1000 * stressTestMultiplier
+        repeat (n) {
+            launch(pool) {
+                flow.first()
+            }
+            withContext(Dispatchers.JavaFx) {
+                integerProperty.set(i)
+            }
+            i += 1
+        }
+    }
+}
\ No newline at end of file
diff --git a/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt b/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt
new file mode 100644
index 0000000..00003f7
--- /dev/null
+++ b/ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt
@@ -0,0 +1,101 @@
+package examples
+
+import javafx.application.Application
+import javafx.scene.Scene
+import javafx.scene.control.*
+import javafx.scene.layout.GridPane
+import javafx.stage.Stage
+import javafx.beans.property.SimpleStringProperty
+import javafx.event.EventHandler
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.javafx.*
+import kotlin.coroutines.CoroutineContext
+
+fun main(args: Array<String>) {
+    Application.launch(FxAsFlowApp::class.java, *args)
+}
+
+/**
+ * Adapted from
+ * https://github.com/ReactiveX/RxJavaFX/blob/a78ca7d15f7d82d201df8fafb6eba732ec17e327/src/test/java/io/reactivex/rxjavafx/RxJavaFXTest.java
+ */
+class FxAsFlowApp: Application(), CoroutineScope {
+
+    private var job = Job()
+    override val coroutineContext: CoroutineContext
+        get() = JavaFx + job
+
+    private val incrementButton = Button("Increment")
+    private val incrementLabel = Label("")
+    private val textInput = TextField()
+    private val flippedTextLabel = Label()
+    private val spinner = Spinner<Int>()
+    private val spinnerChangesLabel = Label()
+
+    public override fun start(  primaryStage: Stage) {
+        val gridPane = GridPane()
+        gridPane.apply {
+            hgap = 10.0
+            vgap = 10.0
+            add(incrementButton, 0, 0)
+            add(incrementLabel, 1, 0)
+            add(textInput, 0, 1)
+            add(flippedTextLabel, 1, 1)
+            add(spinner, 0, 2)
+            add(spinnerChangesLabel, 1, 2)
+        }
+        val scene = Scene(gridPane)
+        primaryStage.apply {
+            width = 275.0
+            setScene(scene)
+            show()
+        }
+    }
+
+    public override fun stop() {
+        super.stop()
+        job.cancel()
+        job = Job()
+    }
+
+    init {
+        // Initializing the "Increment" button
+        val stringProperty = SimpleStringProperty()
+        var i = 0
+        incrementButton.onAction = EventHandler {
+            i += 1
+            stringProperty.set(i.toString())
+        }
+        launch {
+            stringProperty.asFlow().collect {
+                if (it != null) {
+                    stringProperty.set(it)
+                }
+            }
+        }
+        incrementLabel.textProperty().bind(stringProperty)
+        // Initializing the reversed text field
+        val stringProperty2 = SimpleStringProperty()
+        launch {
+            textInput.textProperty().asFlow().collect {
+                if (it != null) {
+                    stringProperty2.set(it.reversed())
+                }
+            }
+        }
+        flippedTextLabel.textProperty().bind(stringProperty2)
+        // Initializing the spinner
+        spinner.valueFactory = SpinnerValueFactory.IntegerSpinnerValueFactory(0, 100)
+        spinner.isEditable = true
+        val stringProperty3 = SimpleStringProperty()
+        launch {
+            spinner.valueProperty().asFlow().collect {
+                if (it != null) {
+                    stringProperty3.set("NEW: $it")
+                }
+            }
+        }
+        spinnerChangesLabel.textProperty().bind(stringProperty3)
+    }
+}