aaudio: lock transport methods

The start/pause/stop/flush/close and other binder methods
need to be thread safe. They do not need to run
in parallel. So a lock was added for each.

Where virtual methods are needed, the locked method calls
a corresponding _l submethod, eg. stop() calls stop_l().

The close logic was also simplified because the "pending"
technique is not needed now that we have the locks.
It was only needed because a close could have occured
while in the middle of another method.

Bug: 153358911
Test: adb logcat *:F
Test: in another window:  test_binder_start_storm
Test: There should be no fatal error in the logcat.
Test: atest CtsNativeMediaAAudioTestCases
Change-Id: I5920cf78af4501856756c5c2fc8e77758232508a
Merged-In: I5920cf78af4501856756c5c2fc8e77758232508a
diff --git a/services/oboeservice/AAudioService.cpp b/services/oboeservice/AAudioService.cpp
index e6a8375..86e26a9 100644
--- a/services/oboeservice/AAudioService.cpp
+++ b/services/oboeservice/AAudioService.cpp
@@ -23,7 +23,6 @@
 #include <sstream>
 
 #include <aaudio/AAudio.h>
-#include <mediautils/SchedulingPolicyService.h>
 #include <mediautils/ServiceUtilities.h>
 #include <utils/String16.h>
 
@@ -144,28 +143,6 @@
     }
 }
 
-// If a close request is pending then close the stream
-bool AAudioService::releaseStream(const sp<AAudioServiceStreamBase> &serviceStream) {
-    bool closed = false;
-    // decrementAndRemoveStreamByHandle() uses a lock so that if there are two simultaneous closes
-    // then only one will get the pointer and do the close.
-    sp<AAudioServiceStreamBase> foundStream = mStreamTracker.decrementAndRemoveStreamByHandle(
-            serviceStream->getHandle());
-    if (foundStream.get() != nullptr) {
-        foundStream->close();
-        pid_t pid = foundStream->getOwnerProcessId();
-        AAudioClientTracker::getInstance().unregisterClientStream(pid, foundStream);
-        closed = true;
-    }
-    return closed;
-}
-
-aaudio_result_t AAudioService::checkForPendingClose(
-        const sp<AAudioServiceStreamBase> &serviceStream,
-        aaudio_result_t defaultResult) {
-    return releaseStream(serviceStream) ? AAUDIO_ERROR_INVALID_STATE : defaultResult;
-}
-
 aaudio_result_t AAudioService::closeStream(aaudio_handle_t streamHandle) {
     // Check permission and ownership first.
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
@@ -174,17 +151,20 @@
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
 
+    // This is protected by a lock in AAudioClientTracker.
+    // It is safe to unregister the same stream twice.
     pid_t pid = serviceStream->getOwnerProcessId();
     AAudioClientTracker::getInstance().unregisterClientStream(pid, serviceStream);
+    // This is protected by a lock in mStreamTracker.
+    // It is safe to remove the same stream twice.
+    mStreamTracker.removeStreamByHandle(streamHandle);
 
-    serviceStream->markCloseNeeded();
-    (void) releaseStream(serviceStream);
-    return AAUDIO_OK;
+    return serviceStream->close();
 }
 
 sp<AAudioServiceStreamBase> AAudioService::convertHandleToServiceStream(
         aaudio_handle_t streamHandle) {
-    sp<AAudioServiceStreamBase> serviceStream = mStreamTracker.getStreamByHandleAndIncrement(
+    sp<AAudioServiceStreamBase> serviceStream = mStreamTracker.getStreamByHandle(
             streamHandle);
     if (serviceStream.get() != nullptr) {
         // Only allow owner or the aaudio service to access the stream.
@@ -197,8 +177,6 @@
         if (!allowed) {
             ALOGE("AAudioService: calling uid %d cannot access stream 0x%08X owned by %d",
                   callingUserId, streamHandle, ownerUserId);
-            // We incremented the reference count so we must check if it needs to be closed.
-            checkForPendingClose(serviceStream, AAUDIO_OK);
             serviceStream.clear();
         }
     }
@@ -213,94 +191,64 @@
         ALOGE("getStreamDescription(), illegal stream handle = 0x%0x", streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-
-    aaudio_result_t result = serviceStream->getDescription(parcelable);
-    // parcelable.dump();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->getDescription(parcelable);
 }
 
 aaudio_result_t AAudioService::startStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("startStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-
-    aaudio_result_t result = serviceStream->start();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->start();
 }
 
 aaudio_result_t AAudioService::pauseStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("pauseStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->pause();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->pause();
 }
 
 aaudio_result_t AAudioService::stopStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("stopStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->stop();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->stop();
 }
 
 aaudio_result_t AAudioService::flushStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("flushStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->flush();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->flush();
 }
 
 aaudio_result_t AAudioService::registerAudioThread(aaudio_handle_t streamHandle,
                                                    pid_t clientThreadId,
-                                                   int64_t periodNanoseconds) {
-    aaudio_result_t result = AAUDIO_OK;
+                                                   int64_t /* periodNanoseconds */) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("registerAudioThread(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    if (serviceStream->getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) {
-        ALOGE("AAudioService::registerAudioThread(), thread already registered");
-        result = AAUDIO_ERROR_INVALID_STATE;
-    } else {
-        const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
-        serviceStream->setRegisteredThread(clientThreadId);
-        int err = android::requestPriority(ownerPid, clientThreadId,
-                                           DEFAULT_AUDIO_PRIORITY, true /* isForApp */);
-        if (err != 0) {
-            ALOGE("AAudioService::registerAudioThread(%d) failed, errno = %d, priority = %d",
-                  clientThreadId, errno, DEFAULT_AUDIO_PRIORITY);
-            result = AAUDIO_ERROR_INTERNAL;
-        }
-    }
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->registerAudioThread(clientThreadId, DEFAULT_AUDIO_PRIORITY);
 }
 
 aaudio_result_t AAudioService::unregisterAudioThread(aaudio_handle_t streamHandle,
                                                      pid_t clientThreadId) {
-    aaudio_result_t result = AAUDIO_OK;
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("%s(), illegal stream handle = 0x%0x", __func__, streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    if (serviceStream->getRegisteredThread() != clientThreadId) {
-        ALOGE("%s(), wrong thread", __func__);
-        result = AAUDIO_ERROR_ILLEGAL_ARGUMENT;
-    } else {
-        serviceStream->setRegisteredThread(0);
-    }
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->unregisterAudioThread(clientThreadId);
 }
 
 aaudio_result_t AAudioService::startClient(aaudio_handle_t streamHandle,
@@ -308,22 +256,20 @@
                                   audio_port_handle_t *clientHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("%s(), illegal stream handle = 0x%0x", __func__, streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->startClient(client, clientHandle);
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->startClient(client, clientHandle);
 }
 
 aaudio_result_t AAudioService::stopClient(aaudio_handle_t streamHandle,
                                           audio_port_handle_t portHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("%s(), illegal stream handle = 0x%0x", __func__, streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->stopClient(portHandle);
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->stopClient(portHandle);
 }
 
 // This is only called internally when AudioFlinger wants to tear down a stream.
@@ -331,12 +277,13 @@
 aaudio_result_t AAudioService::disconnectStreamByPortHandle(audio_port_handle_t portHandle) {
     ALOGD("%s(%d) called", __func__, portHandle);
     sp<AAudioServiceStreamBase> serviceStream =
-            mStreamTracker.findStreamByPortHandleAndIncrement(portHandle);
+            mStreamTracker.findStreamByPortHandle(portHandle);
     if (serviceStream.get() == nullptr) {
         ALOGE("%s(), could not find stream with portHandle = %d", __func__, portHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
+    // This is protected by a lock and will just return if already stopped.
     aaudio_result_t result = serviceStream->stop();
     serviceStream->disconnect();
-    return checkForPendingClose(serviceStream, result);
+    return result;
 }
diff --git a/services/oboeservice/AAudioService.h b/services/oboeservice/AAudioService.h
index d21b1cd..18bc33f 100644
--- a/services/oboeservice/AAudioService.h
+++ b/services/oboeservice/AAudioService.h
@@ -95,13 +95,6 @@
     sp<aaudio::AAudioServiceStreamBase> convertHandleToServiceStream(
             aaudio::aaudio_handle_t streamHandle);
 
-
-
-    bool releaseStream(const sp<aaudio::AAudioServiceStreamBase> &serviceStream);
-
-    aaudio_result_t checkForPendingClose(const sp<aaudio::AAudioServiceStreamBase> &serviceStream,
-                                         aaudio_result_t defaultResult);
-
     android::AudioClient            mAudioClient;
 
     aaudio::AAudioStreamTracker     mStreamTracker;
diff --git a/services/oboeservice/AAudioServiceEndpoint.cpp b/services/oboeservice/AAudioServiceEndpoint.cpp
index 553754e..d9e9db1 100644
--- a/services/oboeservice/AAudioServiceEndpoint.cpp
+++ b/services/oboeservice/AAudioServiceEndpoint.cpp
@@ -87,15 +87,20 @@
 }
 
 void AAudioServiceEndpoint::disconnectRegisteredStreams() {
-    std::lock_guard<std::mutex> lock(mLockStreams);
+    std::vector<android::sp<AAudioServiceStreamBase>> streamsToDisconnect;
+    {
+        std::lock_guard<std::mutex> lock(mLockStreams);
+        mRegisteredStreams.swap(streamsToDisconnect);
+    }
+    // Stop and disconnect outside mLockStreams to avoid reverse
+    // ordering of AAudioServiceStreamBase::mLock and mLockStreams
     mConnected.store(false);
-    for (const auto& stream : mRegisteredStreams) {
+    for (const auto& stream : streamsToDisconnect) {
         ALOGD("disconnectRegisteredStreams() stop and disconnect port %d",
               stream->getPortHandle());
         stream->stop();
         stream->disconnect();
     }
-    mRegisteredStreams.clear();
 }
 
 aaudio_result_t AAudioServiceEndpoint::registerStream(sp<AAudioServiceStreamBase>stream) {
diff --git a/services/oboeservice/AAudioServiceEndpointShared.cpp b/services/oboeservice/AAudioServiceEndpointShared.cpp
index 0a415fd..0d79cdd 100644
--- a/services/oboeservice/AAudioServiceEndpointShared.cpp
+++ b/services/oboeservice/AAudioServiceEndpointShared.cpp
@@ -166,13 +166,11 @@
 
 aaudio_result_t AAudioServiceEndpointShared::stopStream(sp<AAudioServiceStreamBase> sharedStream,
                                                         audio_port_handle_t clientHandle) {
-    // Don't lock here because the disconnectRegisteredStreams also uses the lock.
-
     // Ignore result.
     (void) getStreamInternal()->stopClient(clientHandle);
 
     if (--mRunningStreamCount == 0) { // atomic
-        stopSharingThread();
+        stopSharingThread(); // the sharing thread locks mLockStreams
         getStreamInternal()->requestStop();
     }
     return AAUDIO_OK;
diff --git a/services/oboeservice/AAudioServiceStreamBase.cpp b/services/oboeservice/AAudioServiceStreamBase.cpp
index 880a3d7..4a99ae0 100644
--- a/services/oboeservice/AAudioServiceStreamBase.cpp
+++ b/services/oboeservice/AAudioServiceStreamBase.cpp
@@ -22,6 +22,8 @@
 #include <iostream>
 #include <mutex>
 
+#include <mediautils/SchedulingPolicyService.h>
+
 #include "binding/IAAudioService.h"
 #include "binding/AAudioServiceMessage.h"
 #include "utility/AudioClock.h"
@@ -126,12 +128,17 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::close() {
+    std::lock_guard<std::mutex> lock(mLock);
+    return close_l();
+}
+
+aaudio_result_t AAudioServiceStreamBase::close_l() {
     aaudio_result_t result = AAUDIO_OK;
     if (getState() == AAUDIO_STREAM_STATE_CLOSED) {
         return AAUDIO_OK;
     }
 
-    stop();
+    stop_l();
 
     sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
     if (endpoint == nullptr) {
@@ -142,7 +149,7 @@
         endpointManager.closeEndpoint(endpoint);
 
         // AAudioService::closeStream() prevents two threads from closing at the same time.
-        mServiceEndpoint.clear(); // endpoint will hold the pointer until this method returns.
+        mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns.
     }
 
     {
@@ -172,7 +179,14 @@
  * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete.
  */
 aaudio_result_t AAudioServiceStreamBase::start() {
-    aaudio_result_t result = AAUDIO_OK;
+    std::lock_guard<std::mutex> lock(mLock);
+
+    if (auto state = getState();
+        state == AAUDIO_STREAM_STATE_CLOSED || state == AAUDIO_STREAM_STATE_DISCONNECTED) {
+        ALOGW("%s() already CLOSED, returns INVALID_STATE, handle = %d",
+                __func__, getHandle());
+        return AAUDIO_ERROR_INVALID_STATE;
+    }
 
     if (isRunning()) {
         return AAUDIO_OK;
@@ -185,7 +199,7 @@
     mAtomicStreamTimestamp.clear();
 
     mClientHandle = AUDIO_PORT_HANDLE_NONE;
-    result = startDevice();
+    aaudio_result_t result = startDevice();
     if (result != AAUDIO_OK) goto error;
 
     // This should happen at the end of the start.
@@ -198,11 +212,16 @@
     return result;
 
 error:
-    disconnect();
+    disconnect_l();
     return result;
 }
 
 aaudio_result_t AAudioServiceStreamBase::pause() {
+    std::lock_guard<std::mutex> lock(mLock);
+    return pause_l();
+}
+
+aaudio_result_t AAudioServiceStreamBase::pause_l() {
     aaudio_result_t result = AAUDIO_OK;
     if (!isRunning()) {
         return result;
@@ -214,7 +233,7 @@
 
     result = stopTimestampThread();
     if (result != AAUDIO_OK) {
-        disconnect();
+        disconnect_l();
         return result;
     }
 
@@ -226,7 +245,7 @@
     result = endpoint->stopStream(this, mClientHandle);
     if (result != AAUDIO_OK) {
         ALOGE("%s() mServiceEndpoint returned %d, %s", __func__, result, getTypeText());
-        disconnect(); // TODO should we return or pause Base first?
+        disconnect_l(); // TODO should we return or pause Base first?
     }
 
     sendServiceEvent(AAUDIO_SERVICE_EVENT_PAUSED);
@@ -235,6 +254,11 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::stop() {
+    std::lock_guard<std::mutex> lock(mLock);
+    return stop_l();
+}
+
+aaudio_result_t AAudioServiceStreamBase::stop_l() {
     aaudio_result_t result = AAUDIO_OK;
     if (!isRunning()) {
         return result;
@@ -247,7 +271,7 @@
     sendCurrentTimestamp(); // warning - this calls a virtual function
     result = stopTimestampThread();
     if (result != AAUDIO_OK) {
-        disconnect();
+        disconnect_l();
         return result;
     }
 
@@ -260,7 +284,7 @@
     result = endpoint->stopStream(this, mClientHandle);
     if (result != AAUDIO_OK) {
         ALOGE("%s() stopStream returned %d, %s", __func__, result, getTypeText());
-        disconnect();
+        disconnect_l();
         // TODO what to do with result here?
     }
 
@@ -279,6 +303,7 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::flush() {
+    std::lock_guard<std::mutex> lock(mLock);
     aaudio_result_t result = AAudio_isFlushAllowed(getState());
     if (result != AAUDIO_OK) {
         return result;
@@ -319,12 +344,60 @@
 }
 
 void AAudioServiceStreamBase::disconnect() {
-    if (getState() != AAUDIO_STREAM_STATE_DISCONNECTED) {
+    std::lock_guard<std::mutex> lock(mLock);
+    disconnect_l();
+}
+
+void AAudioServiceStreamBase::disconnect_l() {
+    if (getState() != AAUDIO_STREAM_STATE_DISCONNECTED
+        && getState() != AAUDIO_STREAM_STATE_CLOSED) {
         sendServiceEvent(AAUDIO_SERVICE_EVENT_DISCONNECTED);
         setState(AAUDIO_STREAM_STATE_DISCONNECTED);
     }
 }
 
+aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId,
+        int priority) {
+    std::lock_guard<std::mutex> lock(mLock);
+    aaudio_result_t result = AAUDIO_OK;
+    if (getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) {
+        ALOGE("AAudioService::registerAudioThread(), thread already registered");
+        result = AAUDIO_ERROR_INVALID_STATE;
+    } else {
+        const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
+        setRegisteredThread(clientThreadId);
+        int err = android::requestPriority(ownerPid, clientThreadId,
+                                           priority, true /* isForApp */);
+        if (err != 0) {
+            ALOGE("AAudioService::registerAudioThread(%d) failed, errno = %d, priority = %d",
+                  clientThreadId, errno, priority);
+            result = AAUDIO_ERROR_INTERNAL;
+        }
+    }
+    return result;
+}
+
+aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) {
+    std::lock_guard<std::mutex> lock(mLock);
+    aaudio_result_t result = AAUDIO_OK;
+    if (getRegisteredThread() != clientThreadId) {
+        ALOGE("%s(), wrong thread", __func__);
+        result = AAUDIO_ERROR_ILLEGAL_ARGUMENT;
+    } else {
+        setRegisteredThread(0);
+    }
+    return result;
+}
+
+void AAudioServiceStreamBase::setState(aaudio_stream_state_t state) {
+    // CLOSED is a final state.
+    if (mState != AAUDIO_STREAM_STATE_CLOSED) {
+        mState = state;
+    } else {
+        ALOGW_IF(mState != state, "%s(%d) when already CLOSED", __func__, state);
+    }
+}
+
 aaudio_result_t AAudioServiceStreamBase::sendServiceEvent(aaudio_service_event_t event,
                                                           double  dataDouble) {
     AAudioServiceMessage command;
@@ -422,6 +495,7 @@
  * used to communicate with the underlying HAL or Service.
  */
 aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) {
+    std::lock_guard<std::mutex> lock(mLock);
     {
         std::lock_guard<std::mutex> lock(mUpMessageQueueLock);
         if (mUpMessageQueue == nullptr) {
diff --git a/services/oboeservice/AAudioServiceStreamBase.h b/services/oboeservice/AAudioServiceStreamBase.h
index 097bc64..32db728 100644
--- a/services/oboeservice/AAudioServiceStreamBase.h
+++ b/services/oboeservice/AAudioServiceStreamBase.h
@@ -74,7 +74,7 @@
      */
     virtual aaudio_result_t open(const aaudio::AAudioStreamRequest &request) = 0;
 
-    virtual aaudio_result_t close();
+    aaudio_result_t close();
 
     /**
      * Start the flow of audio data.
@@ -82,7 +82,7 @@
      * This is not guaranteed to be synchronous but it currently is.
      * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete.
      */
-    virtual aaudio_result_t start();
+    aaudio_result_t start();
 
     /**
      * Stop the flow of data so that start() can resume without loss of data.
@@ -90,7 +90,7 @@
      * This is not guaranteed to be synchronous but it currently is.
      * An AAUDIO_SERVICE_EVENT_PAUSED will be sent to the client when complete.
     */
-    virtual aaudio_result_t pause();
+    aaudio_result_t pause();
 
     /**
      * Stop the flow of data after the currently queued data has finished playing.
@@ -99,17 +99,14 @@
      * An AAUDIO_SERVICE_EVENT_STOPPED will be sent to the client when complete.
      *
      */
-    virtual aaudio_result_t stop();
-
-    aaudio_result_t stopTimestampThread();
+    aaudio_result_t stop();
 
     /**
      * Discard any data held by the underlying HAL or Service.
      *
      * An AAUDIO_SERVICE_EVENT_FLUSHED will be sent to the client when complete.
      */
-    virtual aaudio_result_t flush();
-
+    aaudio_result_t flush();
 
     virtual aaudio_result_t startClient(const android::AudioClient& client __unused,
                                         audio_port_handle_t *clientHandle __unused) {
@@ -122,29 +119,19 @@
         return AAUDIO_ERROR_UNAVAILABLE;
     }
 
+    aaudio_result_t registerAudioThread(pid_t clientThreadId, int priority);
+
+    aaudio_result_t unregisterAudioThread(pid_t clientThreadId);
+
     bool isRunning() const {
         return mState == AAUDIO_STREAM_STATE_STARTED;
     }
 
-    // -------------------------------------------------------------------
-
-    /**
-     * Send a message to the client with an int64_t data value.
-     */
-    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
-                                     int64_t dataLong = 0);
-    /**
-     * Send a message to the client with an double data value.
-     */
-    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
-                                     double  dataDouble);
-
     /**
      * Fill in a parcelable description of stream.
      */
     aaudio_result_t getDescription(AudioEndpointParcelable &parcelable);
 
-
     void setRegisteredThread(pid_t pid) {
         mRegisteredClientThread = pid;
     }
@@ -258,9 +245,13 @@
     aaudio_result_t open(const aaudio::AAudioStreamRequest &request,
                          aaudio_sharing_mode_t sharingMode);
 
-    void setState(aaudio_stream_state_t state) {
-        mState = state;
-    }
+    // These must be called under mLock
+    virtual aaudio_result_t close_l();
+    virtual aaudio_result_t pause_l();
+    virtual aaudio_result_t stop_l();
+    void disconnect_l();
+
+    void setState(aaudio_stream_state_t state);
 
     /**
      * Device specific startup.
@@ -292,6 +283,10 @@
     SharedRingBuffer*       mUpMessageQueue;
     std::mutex              mUpMessageQueueLock;
 
+    // Locking order is important.
+    // Always acquire mLock before acquiring AAudioServiceEndpoint::mLockStreams
+    std::mutex              mLock; // Prevent start/stop/close etcetera from colliding
+
     AAudioThread            mTimestampThread;
     // This is used by one thread to tell another thread to exit. So it must be atomic.
     std::atomic<bool>       mThreadEnabled{false};
@@ -313,6 +308,19 @@
 
 private:
 
+    aaudio_result_t stopTimestampThread();
+
+    /**
+     * Send a message to the client with an int64_t data value.
+     */
+    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
+                                     int64_t dataLong = 0);
+    /**
+     * Send a message to the client with a double data value.
+     */
+    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
+                                     double dataDouble);
+
     /**
      * @return true if the queue is getting full.
      */
diff --git a/services/oboeservice/AAudioServiceStreamMMAP.cpp b/services/oboeservice/AAudioServiceStreamMMAP.cpp
index 837b080..8e4217a 100644
--- a/services/oboeservice/AAudioServiceStreamMMAP.cpp
+++ b/services/oboeservice/AAudioServiceStreamMMAP.cpp
@@ -49,16 +49,6 @@
         , mInService(inService) {
 }
 
-aaudio_result_t AAudioServiceStreamMMAP::close() {
-    if (getState() == AAUDIO_STREAM_STATE_CLOSED) {
-        return AAUDIO_OK;
-    }
-
-    stop();
-
-    return AAudioServiceStreamBase::close();
-}
-
 // Open stream on HAL and pass information about the shared memory buffer back to the client.
 aaudio_result_t AAudioServiceStreamMMAP::open(const aaudio::AAudioStreamRequest &request) {
 
@@ -102,11 +92,11 @@
 }
 
 // Stop the flow of data such that start() can resume with loss of data.
-aaudio_result_t AAudioServiceStreamMMAP::pause() {
+aaudio_result_t AAudioServiceStreamMMAP::pause_l() {
     if (!isRunning()) {
         return AAUDIO_OK;
     }
-    aaudio_result_t result = AAudioServiceStreamBase::pause();
+    aaudio_result_t result = AAudioServiceStreamBase::pause_l();
     // TODO put before base::pause()?
     if (!mInService) {
         (void) stopClient(mClientHandle);
@@ -114,11 +104,11 @@
     return result;
 }
 
-aaudio_result_t AAudioServiceStreamMMAP::stop() {
+aaudio_result_t AAudioServiceStreamMMAP::stop_l() {
     if (!isRunning()) {
         return AAUDIO_OK;
     }
-    aaudio_result_t result = AAudioServiceStreamBase::stop();
+    aaudio_result_t result = AAudioServiceStreamBase::stop_l();
     // TODO put before base::stop()?
     if (!mInService) {
         (void) stopClient(mClientHandle);
diff --git a/services/oboeservice/AAudioServiceStreamMMAP.h b/services/oboeservice/AAudioServiceStreamMMAP.h
index 1509f7d..141174e 100644
--- a/services/oboeservice/AAudioServiceStreamMMAP.h
+++ b/services/oboeservice/AAudioServiceStreamMMAP.h
@@ -52,26 +52,24 @@
 
     aaudio_result_t open(const aaudio::AAudioStreamRequest &request) override;
 
+    aaudio_result_t startClient(const android::AudioClient& client,
+                                audio_port_handle_t *clientHandle) override;
+
+    aaudio_result_t stopClient(audio_port_handle_t clientHandle) override;
+
+    const char *getTypeText() const override { return "MMAP"; }
+
+protected:
+
     /**
      * Stop the flow of data so that start() can resume without loss of data.
      *
      * This is not guaranteed to be synchronous but it currently is.
      * An AAUDIO_SERVICE_EVENT_PAUSED will be sent to the client when complete.
     */
-    aaudio_result_t pause() override;
+    aaudio_result_t pause_l() override;
 
-    aaudio_result_t stop() override;
-
-    aaudio_result_t startClient(const android::AudioClient& client,
-                                audio_port_handle_t *clientHandle) override;
-
-    aaudio_result_t stopClient(audio_port_handle_t clientHandle) override;
-
-    aaudio_result_t close() override;
-
-    const char *getTypeText() const override { return "MMAP"; }
-
-protected:
+    aaudio_result_t stop_l() override;
 
     aaudio_result_t getAudioDataDescription(AudioEndpointParcelable &parcelable) override;
 
diff --git a/services/oboeservice/AAudioServiceStreamShared.cpp b/services/oboeservice/AAudioServiceStreamShared.cpp
index 2ca847a..01b1c2e 100644
--- a/services/oboeservice/AAudioServiceStreamShared.cpp
+++ b/services/oboeservice/AAudioServiceStreamShared.cpp
@@ -203,9 +203,8 @@
     return result;
 }
 
-
-aaudio_result_t AAudioServiceStreamShared::close()  {
-    aaudio_result_t result = AAudioServiceStreamBase::close();
+aaudio_result_t AAudioServiceStreamShared::close_l()  {
+    aaudio_result_t result = AAudioServiceStreamBase::close_l();
 
     {
         std::lock_guard<std::mutex> lock(mAudioDataQueueLock);
diff --git a/services/oboeservice/AAudioServiceStreamShared.h b/services/oboeservice/AAudioServiceStreamShared.h
index 61769b5..abcb782 100644
--- a/services/oboeservice/AAudioServiceStreamShared.h
+++ b/services/oboeservice/AAudioServiceStreamShared.h
@@ -52,7 +52,7 @@
 
     aaudio_result_t open(const aaudio::AAudioStreamRequest &request) override;
 
-    aaudio_result_t close() override;
+    aaudio_result_t close_l() override;
 
     /**
      * This must be locked when calling getAudioDataFifoBuffer_l() and while
diff --git a/services/oboeservice/AAudioStreamTracker.cpp b/services/oboeservice/AAudioStreamTracker.cpp
index 3328159..8e66b94 100644
--- a/services/oboeservice/AAudioStreamTracker.cpp
+++ b/services/oboeservice/AAudioStreamTracker.cpp
@@ -30,32 +30,20 @@
 using namespace android;
 using namespace aaudio;
 
-sp<AAudioServiceStreamBase> AAudioStreamTracker::decrementAndRemoveStreamByHandle(
+int32_t AAudioStreamTracker::removeStreamByHandle(
         aaudio_handle_t streamHandle) {
     std::lock_guard<std::mutex> lock(mHandleLock);
-    sp<AAudioServiceStreamBase> serviceStream;
-    auto it = mStreamsByHandle.find(streamHandle);
-    if (it != mStreamsByHandle.end()) {
-        sp<AAudioServiceStreamBase> tempStream = it->second;
-        // Does the caller need to close the stream?
-        // The reference count should never be negative.
-        // But it is safer to check for <= 0 than == 0.
-        if ((tempStream->decrementServiceReferenceCount_l() <= 0) && tempStream->isCloseNeeded()) {
-            serviceStream = tempStream; // Only return stream if ready to be closed.
-            mStreamsByHandle.erase(it);
-        }
-    }
-    return serviceStream;
+    auto count = mStreamsByHandle.erase(streamHandle);
+    return static_cast<int32_t>(count);
 }
 
-sp<AAudioServiceStreamBase> AAudioStreamTracker::getStreamByHandleAndIncrement(
+sp<AAudioServiceStreamBase> AAudioStreamTracker::getStreamByHandle(
         aaudio_handle_t streamHandle) {
     std::lock_guard<std::mutex> lock(mHandleLock);
     sp<AAudioServiceStreamBase> serviceStream;
     auto it = mStreamsByHandle.find(streamHandle);
     if (it != mStreamsByHandle.end()) {
         serviceStream = it->second;
-        serviceStream->incrementServiceReferenceCount_l();
     }
     return serviceStream;
 }
@@ -63,7 +51,7 @@
 // The port handle is only available when the stream is started.
 // So we have to iterate over all the streams.
 // Luckily this rarely happens.
-sp<AAudioServiceStreamBase> AAudioStreamTracker::findStreamByPortHandleAndIncrement(
+sp<AAudioServiceStreamBase> AAudioStreamTracker::findStreamByPortHandle(
         audio_port_handle_t portHandle) {
     std::lock_guard<std::mutex> lock(mHandleLock);
     sp<AAudioServiceStreamBase> serviceStream;
@@ -72,7 +60,6 @@
         auto candidate = it->second;
         if (candidate->getPortHandle() == portHandle) {
             serviceStream = candidate;
-            serviceStream->incrementServiceReferenceCount_l();
             break;
         }
         it++;
diff --git a/services/oboeservice/AAudioStreamTracker.h b/services/oboeservice/AAudioStreamTracker.h
index 57ec426..d1301a2 100644
--- a/services/oboeservice/AAudioStreamTracker.h
+++ b/services/oboeservice/AAudioStreamTracker.h
@@ -32,25 +32,20 @@
 
 public:
     /**
-     * Find the stream associated with the handle.
-     * Decrement its reference counter. If zero and the stream needs
-     * to be closed then remove the stream and return a pointer to the stream.
-     * Otherwise return null if it does not need to be closed.
+     * Remove any streams with the matching handle.
      *
      * @param streamHandle
-     * @return strong pointer to the stream if it needs to be closed, or nullptr
+     * @return number of streams removed
      */
-    android::sp<AAudioServiceStreamBase> decrementAndRemoveStreamByHandle(
-            aaudio_handle_t streamHandle);
+    int32_t removeStreamByHandle(aaudio_handle_t streamHandle);
 
     /**
      * Look up a stream based on the handle.
-     * Increment its service reference count if found.
      *
      * @param streamHandle
      * @return strong pointer to the stream if found, or nullptr
      */
-    android::sp<aaudio::AAudioServiceStreamBase> getStreamByHandleAndIncrement(
+    android::sp<aaudio::AAudioServiceStreamBase> getStreamByHandle(
             aaudio_handle_t streamHandle);
 
     /**
@@ -60,7 +55,7 @@
      * @param portHandle
      * @return strong pointer to the stream if found, or nullptr
      */
-    android::sp<aaudio::AAudioServiceStreamBase> findStreamByPortHandleAndIncrement(
+    android::sp<aaudio::AAudioServiceStreamBase> findStreamByPortHandle(
             audio_port_handle_t portHandle);
 
     /**