Emit from file observer when the connection is established

There is a race condition in our IPC design where if the
IPC needs some initialization, DataStore has no way of waiting
for that initialization.

So when DataStore wants to run logic similar to:
1) start observing IPC
2) read the file
3) wait for IPC upadtes

Ideally, DataStore would want to do this to ensure it never misses
an update when it is observed, yet it does not have the mechanism
wait for IPC to be ready. The only API it has is `Flow<Unit>` from
the IPC but it rather needs either:
a) Flow<Event>, enum Event{STARTED, CHANGE_DETECTED}
or
b) suspend fun observe():Flow<Unit> which could return after
initialization is complete.

b is not super practical because it runs into another problem where
the Flow may never be collected. a is the better solution as it
provides the state as events and DataStore can wait for STARTED
before reading to avoid missing events.

Note that Flow.onStart is not an option here because it is called before
the contents of the Flow computation starts, hence does not wait for
the FileSystem communication.

We did in fact hit this problem when writing tests for MP-IPC, hence
the internals of it emitted after establishing the connection to the
Filesystem but dropped that initial emission in the public API's
implementation.

This CL makes MP-IPC also dispatch that initial Unit in its public
API implementation, which in practice means it will be dispatching
a Change notification after it connects to the FileSystem.
This does not ensure (2) happens before (1) but it
ensures (3) does not miss an event if (2) happens before (1) starts.

Note that this does not cause an extra read on the FS if the file
didn't change after (2) because we have the version in shared counter
so the change should not have a noticable perf impact.

Fixes: 326141553
Test: did run it 200 times in P71532587/targets/androidx_device_tests
and also 2K times on a local emulator and firebase (matrix-2dop3ihikvsfy).
(cherry picked from https://android-review.googlesource.com/q/commit:94002239d3b000874d9dcf11eb6594a5613dd5c9)
Merged-In: I38b3f041b09f1332fb6f1bbf0e3bd266750465be
Change-Id: I38b3f041b09f1332fb6f1bbf0e3bd266750465be
diff --git a/datastore/datastore-core/src/androidMain/kotlin/androidx/datastore/core/MultiProcessCoordinator.android.kt b/datastore/datastore-core/src/androidMain/kotlin/androidx/datastore/core/MultiProcessCoordinator.android.kt
index 30f06ef..3d3edff 100644
--- a/datastore/datastore-core/src/androidMain/kotlin/androidx/datastore/core/MultiProcessCoordinator.android.kt
+++ b/datastore/datastore-core/src/androidMain/kotlin/androidx/datastore/core/MultiProcessCoordinator.android.kt
@@ -26,7 +26,6 @@
 import kotlin.contracts.ExperimentalContracts
 import kotlin.coroutines.CoroutineContext
 import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.drop
 import kotlinx.coroutines.sync.Mutex
 import kotlinx.coroutines.sync.withLock
 import kotlinx.coroutines.withContext
@@ -37,9 +36,6 @@
 ) : InterProcessCoordinator {
     // TODO(b/269375542): the flow should `flowOn` the provided [context]
     override val updateNotifications: Flow<Unit> = MulticastFileObserver.observe(file)
-        // MulticastFileObserver dispatches 1 value upon connecting to the FileSystem, which
-        // is useful for its tests but not necessary here.
-        .drop(1)
 
     // run block with the exclusive lock
     override suspend fun <T> lock(block: suspend () -> T): T {