a2dp HAL: add write thread to avoid audio skips

The a2dp stack is throttling the audio throughtput
to accommodate headsets that can accept audio too fast.
This makes that very limited audio buffering exists in the
A2DP output path. We are therefore vulnerable to irregular scheduling of
audioflinger mixer thread when CPU clock speed or system activity changes.

The fix consists in adding an intermediate audio buffer between audioflinger
mixer thread and the A2DP socket.

Issue 5682206.

Change-Id: I6f2387fb694a75e62c0805b7f134f7cb88eaedd6
diff --git a/audio/android_audio_hw.c b/audio/android_audio_hw.c
index 30e1a22..52d6066 100644
--- a/audio/android_audio_hw.c
+++ b/audio/android_audio_hw.c
@@ -44,8 +44,29 @@
 
 #define OUT_SINK_ADDR_PARM             "a2dp_sink_address"
 
-/* NOTE: If you need to hold the adev_a2dp->lock AND the astream_out->lock,
-   you MUST take adev_a2dp lock first!!
+/* number of periods in pcm buffer (one period corresponds to buffer size reported to audio flinger
+ * by out_get_buffer_size() */
+#define BUF_NUM_PERIODS 6
+/* maximum time allowed by out_standby_stream_locked() for 2dp_write() to complete */
+#define BUF_WRITE_COMPLETION_TIMEOUT_MS 5000
+/* maximum time allowed by out_write() for frames to be available in in write thread buffer */
+#define BUF_WRITE_AVAILABILITY_TIMEOUT_MS 1000
+/* maximum number of attempts to wait for a write completion in out_standby_stream_locked() */
+#define MAX_WRITE_COMPLETION_ATTEMPTS 5
+
+/* NOTE: there are 2 mutexes used by the a2dp output stream.
+ *  - lock: protects all calls to a2dp lib functions (a2dp_stop(), a2dp_cleanup()...).
+ *    One exception is a2dp_write() which is also protected by the flag write_busy. This is because
+ *    out_write() cannot block waiting for a2dp_write() to complete because this function
+ *    can sleep to throttle the A2DP bit rate.
+ *    This flag is always set/reset and tested with "lock" mutex held.
+ *  - buf_lock: protects access to pcm buffer read and write indexes.
+ *
+ *  The locking order is always as follows:
+ *      buf_lock -> lock
+ *
+ * If you need to hold the adev_a2dp->lock AND the astream_out->lock or astream_out->buf_lock,
+ * you MUST take adev_a2dp lock first!!
  */
 
 struct astream_out;
@@ -75,7 +96,7 @@
     int                     retry_count;
     void*                   data;
 
-    pthread_mutex_t         lock;
+    pthread_mutex_t         lock;   /* see NOTE on mutex locking order above */
 
     audio_devices_t         device;
     uint64_t                 last_write_time;
@@ -84,6 +105,20 @@
     bool                    bt_enabled;
     bool                    suspended;
     char                    a2dp_addr[20];
+
+    uint32_t *buf;              /* pcm buffer between audioflinger thread and write thread*/
+    size_t buf_size;            /* size of pcm buffer in frames */
+    size_t buf_rd_idx;          /* read index in pcm buffer, in frames*/
+    size_t buf_wr_idx;          /* write index in pcm buffer, in frames */
+    size_t buf_frames_ready;    /* number of frames ready for writing to a2dp sink */
+    pthread_mutex_t buf_lock;   /* mutex protecting read and write indexes */
+                                /* see NOTE on mutex locking order above */
+    pthread_cond_t buf_cond;    /* condition signaling data write/read to/from pcm buffer */
+    pthread_t buf_thread;       /* thread reading data from buffer and writing to a2dp sink*/
+    bool buf_thread_exit;       /* flag requesting write thread exit */
+    bool write_busy;            /* indicates that a write to a2dp sink is in progress and that
+                                   standby must wait for this flag to be cleared by write thread */
+    pthread_cond_t write_cond;  /* condition associated with write_busy flag */
 };
 
 static uint64_t system_time(void)
@@ -196,15 +231,26 @@
 static int out_standby_stream_locked(struct astream_out *out)
 {
     int ret = 0;
+    int attempts = MAX_WRITE_COMPLETION_ATTEMPTS;
 
     if (out->standby || !out->data)
         return 0;
 
-    LOGV_IF(!out->bt_enabled, "Standby skip stop: enabled %d", out->bt_enabled);
-    if (out->bt_enabled)
-        ret = a2dp_stop(out->data);
-    release_wake_lock(A2DP_WAKE_LOCK_NAME);
     out->standby = true;
+    /* wait for write completion if needed */
+    while (out->write_busy && attempts--) {
+        ret = pthread_cond_timeout_np(&out->write_cond,
+                                &out->lock,
+                                BUF_WRITE_COMPLETION_TIMEOUT_MS);
+        LOGE_IF(ret != 0, "out_standby_stream_locked() wait cond error %d", ret);
+    }
+    LOGE_IF(attempts == 0, "out_standby_stream_locked() a2dp_write() would not stop!!!");
+
+    LOGV_IF(!out->bt_enabled, "Standby skip stop: enabled %d", out->bt_enabled);
+    if (out->bt_enabled) {
+        ret = a2dp_stop(out->data);
+    }
+    release_wake_lock(A2DP_WAKE_LOCK_NAME);
 
     return ret;
 }
@@ -309,20 +355,59 @@
     return str;
 }
 
+size_t _out_frames_available_locked(struct astream_out *out)
+{
+
+    size_t frames = out->buf_size - out->buf_frames_ready;
+    if (frames > out->buf_size - out->buf_wr_idx) {
+        frames = out->buf_size - out->buf_wr_idx;
+    }
+    return frames;
+}
+
+size_t _out_frames_ready_locked(struct astream_out *out)
+{
+    size_t frames = out->buf_frames_ready;
+
+    if (frames > out->buf_size - out->buf_rd_idx) {
+        frames = out->buf_size - out->buf_rd_idx;
+    }
+    return frames;
+}
+
+void _out_inc_wr_idx_locked(struct astream_out *out, size_t frames)
+{
+    out->buf_wr_idx += frames;
+    out->buf_frames_ready += frames;
+    if (out->buf_wr_idx == out->buf_size) {
+        out->buf_wr_idx = 0;
+    }
+    pthread_cond_signal(&out->buf_cond);
+}
+
+void _out_inc_rd_idx_locked(struct astream_out *out, size_t frames)
+{
+    out->buf_rd_idx += frames;
+    out->buf_frames_ready -= frames;
+    if (out->buf_rd_idx == out->buf_size) {
+        out->buf_rd_idx = 0;
+    }
+    pthread_cond_signal(&out->buf_cond);
+}
+
 static ssize_t out_write(struct audio_stream_out *stream, const void* buffer,
                          size_t bytes)
 {
     struct astream_out *out = (struct astream_out *)stream;
     int ret;
-    int cnt = bytes;
-    int retries = MAX_WRITE_RETRIES;
-    uint64_t now;
-    uint32_t elapsed_us;
-    const uint8_t *buf = buffer;
+    size_t frames_total = bytes / sizeof(uint32_t); // always stereo 16 bit
+    uint32_t *buf = (uint32_t *)buffer;
+    size_t frames_written = 0;
 
+    pthread_mutex_lock(&out->buf_lock);
     pthread_mutex_lock(&out->lock);
     if (!out->bt_enabled || out->suspended) {
-        LOGV("a2dp %s: bluetooth disabled bt_en %d, suspended %d",
+        LOGV("a2dp write: bluetooth disabled bt_en %d, suspended %d",
              out->bt_enabled, out->suspended);
         ret = -1;
         goto err_bt_disabled;
@@ -332,46 +417,52 @@
         acquire_wake_lock(PARTIAL_WAKE_LOCK, A2DP_WAKE_LOCK_NAME);
         out->standby = false;
         out->last_write_time = system_time();
+        out->buf_rd_idx = 0;
+        out->buf_wr_idx = 0;
+        out->buf_frames_ready = 0;
     }
 
     ret = _out_init_locked(out, NULL);
-    if (ret < 0)
+    if (ret < 0) {
         goto err_init;
-
-    while (cnt > 0 && retries > 0) {
-        ret = a2dp_write(out->data, buf, cnt);
-        if (ret < 0) {
-            LOGE("%s: a2dp_write failed (%d)\n", __func__, ret);
-            goto err_write;
-        } else if (ret == 0) {
-            retries--;
-            continue;
-        }
-
-        cnt -= ret;
-        buf += ret;
     }
 
-    /* XXX: PLEASE FIX ME!!!! */
-
-    /* if A2DP sink runs abnormally fast, sleep a little so that
-     * audioflinger mixer thread does no spin and starve other threads. */
-    /* NOTE: It is likely that the A2DP headset is being disconnected */
-    now = system_time();
-    elapsed_us = (now - out->last_write_time) / 1000UL;
-    if (elapsed_us < (out->buffer_duration_us / 4)) {
-        LOGV("A2DP sink runs too fast");
-        usleep(out->buffer_duration_us - elapsed_us);
-    }
-    out->last_write_time = now;
-
     pthread_mutex_unlock(&out->lock);
 
+    while (frames_written < frames_total) {
+        size_t frames = _out_frames_available_locked(out);
+        if (frames == 0) {
+            int ret = pthread_cond_timeout_np(&out->buf_cond,
+                                              &out->buf_lock,
+                                              BUF_WRITE_AVAILABILITY_TIMEOUT_MS);
+            if (ret != 0) {
+                pthread_mutex_lock(&out->lock);
+                goto err_write;
+            }
+            frames  = _out_frames_available_locked(out);
+        }
+        if (frames > frames_total - frames_written) {
+            frames = frames_total - frames_written;
+        }
+        memcpy(out->buf + out->buf_wr_idx, buf + frames_written, frames * sizeof(uint32_t));
+        frames_written += frames;
+        _out_inc_wr_idx_locked(out, frames);
+        pthread_mutex_lock(&out->lock);
+        if (out->standby) {
+            goto err_write;
+        }
+        pthread_mutex_unlock(&out->lock);
+    }
+    pthread_mutex_unlock(&out->buf_lock);
+
     return bytes;
 
+/* out->lock must be locked and out->buf_lock unlocked when jumping here */
 err_write:
 err_init:
 err_bt_disabled:
+    pthread_mutex_unlock(&out->buf_lock);
+    LOGV("!!!! write error");
     out_standby_stream_locked(out);
     pthread_mutex_unlock(&out->lock);
 
@@ -380,6 +471,99 @@
     return ret;
 }
 
+
+static void *_out_buf_thread_func(void *context)
+{
+    struct astream_out *out = (struct astream_out *)context;
+
+    pthread_mutex_lock(&out->buf_lock);
+
+    while(!out->buf_thread_exit) {
+        size_t frames;
+
+        frames = _out_frames_ready_locked(out);
+        while (frames && !out->buf_thread_exit) {
+            int retries = MAX_WRITE_RETRIES;
+            uint64_t now;
+            uint32_t elapsed_us;
+
+            while (frames > 0 && !out->buf_thread_exit) {
+                int ret;
+                uint32_t buffer_duration_us;
+                /* PCM format is always 16bit stereo */
+                size_t bytes = frames * sizeof(uint32_t);
+                if (bytes > out->buffer_size) {
+                    bytes = out->buffer_size;
+                }
+
+                pthread_mutex_lock(&out->lock);
+                if (out->standby) {
+                    /* abort and clear all pending frames if standby requested */
+                    pthread_mutex_unlock(&out->lock);
+                    frames = _out_frames_ready_locked(out);
+                    _out_inc_rd_idx_locked(out, frames);
+                    goto wait;
+                }
+                /* indicate to out_standby_stream_locked() that a2dp_write() is active */
+                out->write_busy = true;
+                pthread_mutex_unlock(&out->lock);
+                pthread_mutex_unlock(&out->buf_lock);
+
+                ret = a2dp_write(out->data, out->buf + out->buf_rd_idx, bytes);
+
+                /* clear write_busy condition */
+                pthread_mutex_lock(&out->buf_lock);
+                pthread_mutex_lock(&out->lock);
+                out->write_busy = false;
+                pthread_cond_signal(&out->write_cond);
+                pthread_mutex_unlock(&out->lock);
+
+                if (ret < 0) {
+                    LOGE("%s: a2dp_write failed (%d)\n", __func__, ret);
+                    /* skip pending frames in case of write error */
+                    _out_inc_rd_idx_locked(out, frames);
+                    break;
+                } else if (ret == 0) {
+                    if (retries-- == 0) {
+                        /* skip pending frames in case of multiple time out */
+                        _out_inc_rd_idx_locked(out, frames);
+                        break;
+                    }
+                    continue;
+                }
+                ret /= sizeof(uint32_t);
+                _out_inc_rd_idx_locked(out, ret);
+                frames -= ret;
+
+                /* XXX: PLEASE FIX ME!!!! */
+
+                /* if A2DP sink runs abnormally fast, sleep a little so that
+                 * audioflinger mixer thread does no spin and starve other threads. */
+                /* NOTE: It is likely that the A2DP headset is being disconnected */
+                now = system_time();
+                elapsed_us = (now - out->last_write_time) / 1000UL;
+                buffer_duration_us = ((ret * 1000) / out->sample_rate) * 1000;
+
+                if (elapsed_us < (buffer_duration_us / 4)) {
+                    LOGV("A2DP sink runs too fast");
+                    usleep(buffer_duration_us - elapsed_us);
+                }
+                out->last_write_time = now;
+
+            }
+            frames = _out_frames_ready_locked(out);
+        }
+wait:
+        if (!out->buf_thread_exit) {
+            pthread_cond_wait(&out->buf_cond, &out->buf_lock);
+        }
+    }
+    pthread_mutex_unlock(&out->buf_lock);
+    return NULL;
+}
+
+
+
 static int out_add_audio_effect(const struct audio_stream *stream, effect_handle_t effect)
 {
     return 0;
@@ -482,6 +666,18 @@
         goto err_validate_parms;
     }
 
+    int err = pthread_create(&out->buf_thread, (const pthread_attr_t *) NULL, _out_buf_thread_func, out);
+    if (err != 0) {
+        goto err_validate_parms;
+    }
+
+    /* PCM format is always 16bit, stereo */
+    out->buf_size = (out->buffer_size * BUF_NUM_PERIODS) / sizeof(int32_t);
+    out->buf = (uint32_t *)malloc(out->buf_size * sizeof(int32_t));
+    if (!out->buf) {
+        goto err_validate_parms;
+    }
+
     /* XXX: check return code? */
     if (adev->bt_enabled)
         _out_init_locked(out, "00:00:00:00:00:00");
@@ -524,8 +720,22 @@
     }
 
     pthread_mutex_lock(&out->lock);
+    /* out_write() must not be executed from now on */
+    out->bt_enabled = false;
     out_close_stream_locked(out);
     pthread_mutex_unlock(&out->lock);
+    if (out->buf_thread) {
+        pthread_mutex_lock(&out->buf_lock);
+        out->buf_thread_exit = true;
+        pthread_cond_broadcast(&out->buf_cond);
+        pthread_mutex_unlock(&out->buf_lock);
+        pthread_join(out->buf_thread, (void **) NULL);
+        pthread_cond_destroy(&out->buf_cond);
+        pthread_mutex_destroy(&out->buf_lock);
+    }
+    if (out->buf) {
+        free(out->buf);
+    }
 
     adev->output = NULL;
     free(out);