blob: 07eaff6d55407a87e6097b0e2421c0763bdd810d [file] [log] [blame]
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "OutputStreamImpl.h"
#include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
#include <aidl/device/google/atv/audio_proxy/PresentationPosition.h>
#include <android-base/logging.h>
#include <time.h>
#include "AudioProxyClientError.h"
#include "AudioProxyStreamOut.h"
using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
using aidl::device::google::atv::audio_proxy::PresentationPosition;
using android::status_t;
namespace audio_proxy {
namespace {
// 1GB
constexpr uint32_t kMaxBufferSize = 1 << 30;
void deleteEventFlag(EventFlag* obj) {
if (!obj) {
return;
}
status_t status = EventFlag::deleteEventFlag(&obj);
if (status) {
LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
}
}
class WriteThread : public Thread {
public:
// WriteThread's lifespan never exceeds StreamOut's lifespan.
WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
OutputStreamImpl::DataMQ* dataMQ,
OutputStreamImpl::StatusMQ* statusMQ, EventFlag* eventFlag);
~WriteThread() override;
private:
bool threadLoop() override;
PresentationPosition doGetPresentationPosition();
int64_t doWrite();
std::atomic<bool>* const mStop;
AudioProxyStreamOut* mStream;
OutputStreamImpl::DataMQ* const mDataMQ;
OutputStreamImpl::StatusMQ* const mStatusMQ;
EventFlag* const mEventFlag;
const std::unique_ptr<int8_t[]> mBuffer;
};
WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
OutputStreamImpl::DataMQ* dataMQ,
OutputStreamImpl::StatusMQ* statusMQ,
EventFlag* eventFlag)
: Thread(false /*canCallJava*/),
mStop(stop),
mStream(stream),
mDataMQ(dataMQ),
mStatusMQ(statusMQ),
mEventFlag(eventFlag),
mBuffer(new int8_t[mDataMQ->getQuantumCount()]) {}
WriteThread::~WriteThread() = default;
int64_t WriteThread::doWrite() {
const size_t availToRead = mDataMQ->availableToRead();
if (availToRead == 0) {
return 0;
}
if (!mDataMQ->read(&mBuffer[0], availToRead)) {
return 0;
}
return mStream->write(&mBuffer[0], availToRead);
}
PresentationPosition WriteThread::doGetPresentationPosition() {
PresentationPosition position;
mStream->getPresentationPosition(&position.frames, &position.timestamp);
return position;
}
bool WriteThread::threadLoop() {
// This implementation doesn't return control back to the Thread until the
// parent thread decides to stop, as the Thread uses mutexes, and this can
// lead to priority inversion.
while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
uint32_t efState = 0;
mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY),
&efState);
if (!(efState & static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY))) {
continue; // Nothing to do.
}
WriteStatus status;
status.written = doWrite();
status.position = doGetPresentationPosition();
if (!mStatusMQ->write(&status)) {
LOG(ERROR) << "status message queue write failed.";
}
mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_FULL));
}
return false;
}
} // namespace
OutputStreamImpl::OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)
: mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
OutputStreamImpl::~OutputStreamImpl() {
closeImpl();
if (mWriteThread) {
status_t status = mWriteThread->join();
if (status) {
LOG(ERROR) << "write thread exit error: " << strerror(-status);
}
}
mEventFlag.reset();
}
ndk::ScopedAStatus OutputStreamImpl::standby() {
mStream->standby();
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus OutputStreamImpl::close() { return closeImpl(); }
ndk::ScopedAStatus OutputStreamImpl::closeImpl() {
if (mStopWriteThread.load(
std::memory_order_relaxed)) { // only this thread writes
return ndk::ScopedAStatus::ok();
}
mStopWriteThread.store(true, std::memory_order_release);
if (mEventFlag) {
mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
}
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus OutputStreamImpl::prepareForWriting(
int32_t frameSize, int32_t framesCount, DataMQDesc* dataMQDesc,
StatusMQDesc* statusMQDesc) {
if (mDataMQ) {
LOG(ERROR) << "the client attempted to call prepareForWriting twice.";
return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
}
if (frameSize == 0 || framesCount == 0) {
LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount ("
<< framesCount << ")";
return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
}
if (frameSize > kMaxBufferSize / framesCount) {
LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount
<< " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")";
return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
}
auto dataMQ =
std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
if (!dataMQ->isValid()) {
LOG(ERROR) << "data MQ is invalid";
return ndk::ScopedAStatus::fromServiceSpecificError(
ERROR_FMQ_CREATION_FAILURE);
}
auto statusMQ = std::make_unique<StatusMQ>(1);
if (!statusMQ->isValid()) {
LOG(ERROR) << "status MQ is invalid";
return ndk::ScopedAStatus::fromServiceSpecificError(
ERROR_FMQ_CREATION_FAILURE);
}
EventFlag* rawEventFlag = nullptr;
status_t status =
EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
deleteEventFlag);
if (status != ::android::OK || !eventFlag) {
LOG(ERROR) << "failed creating event flag for data MQ: "
<< strerror(-status);
return ndk::ScopedAStatus::fromServiceSpecificError(
ERROR_FMQ_CREATION_FAILURE);
}
sp<WriteThread> writeThread =
new WriteThread(&mStopWriteThread, mStream.get(), dataMQ.get(),
statusMQ.get(), eventFlag.get());
status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
if (status != ::android::OK) {
LOG(ERROR) << "failed to start writer thread: " << strerror(-status);
return ndk::ScopedAStatus::fromServiceSpecificError(
ERROR_FMQ_CREATION_FAILURE);
}
mDataMQ = std::move(dataMQ);
mStatusMQ = std::move(statusMQ);
mEventFlag = std::move(eventFlag);
mWriteThread = std::move(writeThread);
*dataMQDesc = mDataMQ->dupeDesc();
*statusMQDesc = mStatusMQ->dupeDesc();
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus OutputStreamImpl::pause() {
mStream->pause();
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus OutputStreamImpl::resume() {
mStream->resume();
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus OutputStreamImpl::drain(AudioDrain type) {
mStream->drain(type);
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus OutputStreamImpl::flush() {
mStream->flush();
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus OutputStreamImpl::setVolume(float left, float right) {
mStream->setVolume(left, right);
return ndk::ScopedAStatus::ok();
}
} // namespace audio_proxy