blob: f63e315f966fed01bc56018713b927158fe678b0 [file] [log] [blame]
/*
*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// For each client that connects initially a header is sent with the following,
// in this order, all as uint16_t in network-byte-order:
// number of channels, frame rate
//
// Following, audio packets are sent as a uint32_t length (network byte order)
// indicating the number of bytes
// followed by the (opus) frame_size as a uint32_t
// followed by <length> bytes.
#include "common/libs/tcp_socket/tcp_socket.h"
#include "common/vsoc/lib/audio_data_region_view.h"
#include "common/vsoc/lib/circqueue_impl.h"
#include "common/vsoc/lib/vsoc_audio_message.h"
#include "host/frontend/stream_audio/opuscpp/opus_wrapper.h"
#include "host/libs/config/cuttlefish_config.h"
#include <android-base/logging.h>
#include <gflags/gflags.h>
#include <arpa/inet.h>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <tuple>
#include <vector>
using vsoc::audio_data::AudioDataRegionView;
DEFINE_int32(port, 0, "port on which to serve audio.");
namespace {
// Read audio frames from the AudioDataRegionView
class AudioStreamer {
public:
cvd::Message MakeAudioDescriptionHeader() const {
std::unique_lock guard(buffer_lock_);
while (!audio_buffer_) {
buffer_cv_.wait(guard);
}
const size_t num_channels = header_.frame_size / sizeof(opus_int16);
return cvd::CreateMessage(static_cast<std::uint16_t>(num_channels),
static_cast<std::uint16_t>(header_.frame_rate));
}
std::uint32_t frame_rate() const {
std::unique_lock guard(buffer_lock_);
while (!audio_buffer_) {
buffer_cv_.wait(guard);
}
return header_.frame_rate;
}
std::uint32_t num_channels() const {
std::unique_lock guard(buffer_lock_);
while (!audio_buffer_) {
buffer_cv_.wait(guard);
}
return header_.frame_size / sizeof(opus_int16);
}
// Returns the frame id and audio frame
std::tuple<std::int64_t, std::shared_ptr<const cvd::Message>> audio_buffer(
std::int64_t previous_frame_num) const {
std::unique_lock guard(buffer_lock_);
while (header_.frame_num <= previous_frame_num) {
buffer_cv_.wait(guard);
}
return {header_.frame_num, audio_buffer_};
}
void Update() {
auto audio_data_rv =
AudioDataRegionView::GetInstance(vsoc::GetDomain().c_str());
auto worker = audio_data_rv->StartWorker();
std::vector<char> new_buffer;
while (true) {
new_buffer.resize(new_buffer.capacity());
auto [new_header, payload_size, audio_data] =
NextAudioMessage(audio_data_rv, &new_buffer);
LOG(DEBUG) << "stream " << new_header.stream_number << ", frame "
<< new_header.frame_num << ", rate " << new_header.frame_rate
<< ", channel_mask " << new_header.channel_mask << ", format "
<< new_header.format << ", payload_size " << payload_size
<< '\n';
{
std::lock_guard guard(buffer_lock_);
CheckAudioConfigurationIsSame(new_header);
header_ = new_header;
audio_buffer_ = std::make_shared<const cvd::Message>(
audio_data, audio_data + payload_size);
}
buffer_cv_.notify_all();
}
}
private:
struct AudioMessage {
gce_audio_message header;
std::size_t payload_size;
const std::uint8_t* payload_data;
};
void ReadAudioMessage(AudioDataRegionView* audio_data_rv,
std::vector<char>* buffer) const {
while (true) {
auto read_size = audio_data_rv->data()->audio_queue.Read(
audio_data_rv, buffer->data(), buffer->size());
if (read_size == -ENOSPC) {
DoubleSize(buffer);
} else if (read_size < 0) {
LOG(ERROR) << "CircularPacketQueue::Read returned " << read_size;
} else {
buffer->resize(read_size);
return;
}
}
}
void DoubleSize(std::vector<char>* buffer) const {
if (buffer->empty()) {
buffer->resize(1);
} else {
buffer->resize(buffer->size() * 2);
}
}
gce_audio_message GetHeaderFromBuffer(const std::vector<char>& buffer) const {
gce_audio_message new_header{};
CHECK_GE(buffer.size(), sizeof new_header);
std::memcpy(&new_header, buffer.data(), sizeof new_header);
CHECK_GT(new_header.stream_number, 0u);
return new_header;
}
std::tuple<std::size_t, const std::uint8_t*> GetPayloadFromBuffer(
const std::vector<char>& buffer) const {
const auto payload_size = buffer.size() - sizeof(gce_audio_message);
const auto* audio_data =
reinterpret_cast<const std::uint8_t*>(buffer.data()) +
sizeof(gce_audio_message);
return {payload_size, audio_data};
}
AudioMessage NextAudioMessage(AudioDataRegionView* audio_data_rv,
std::vector<char>* buffer) const {
while (true) {
ReadAudioMessage(audio_data_rv, buffer);
auto header = GetHeaderFromBuffer(*buffer);
if (header.message_type == gce_audio_message::DATA_SAMPLES) {
auto [payload_size, payload_data] = GetPayloadFromBuffer(*buffer);
return {header, payload_size, payload_data};
}
}
}
void CheckAudioConfigurationIsSame(
const gce_audio_message& new_header) const {
if (audio_buffer_) {
CHECK_EQ(header_.frame_size, new_header.frame_size)
<< "audio frame_size changed";
CHECK_EQ(header_.frame_rate, new_header.frame_rate)
<< "audio frame_rate changed";
CHECK_EQ(header_.stream_number, new_header.stream_number)
<< "audio stream_number changed";
}
}
std::shared_ptr<const cvd::Message> audio_buffer_{};
gce_audio_message header_{};
mutable std::mutex buffer_lock_;
mutable std::condition_variable buffer_cv_;
};
void HandleClient(AudioStreamer* audio_streamer,
cvd::ClientSocket client_socket) {
auto num_channels = audio_streamer->num_channels();
opus::Encoder enc(audio_streamer->frame_rate(),
audio_streamer->num_channels(), OPUS_APPLICATION_AUDIO);
CHECK(enc.valid()) << "Could not construct Encoder. Maybe bad frame_rate ("
<< audio_streamer->frame_rate() <<") or num_channels ("
<< audio_streamer->num_channels() << ")?";
auto header = audio_streamer->MakeAudioDescriptionHeader();
client_socket.SendNoSignal(header);
std::int64_t previous_frame_num = 0;
while (!client_socket.closed()) {
CHECK(enc.valid()) << "encoder in invalid state";
auto [frame_num, audio_data] =
audio_streamer->audio_buffer(previous_frame_num);
previous_frame_num = frame_num;
std::vector<opus_int16> pcm(audio_data->size() / sizeof(opus_int16));
std::memcpy(pcm.data(), audio_data->data(), audio_data->size());
// in opus terms "frame_size" is the number of unencoded samples per frame
const std::uint32_t frame_size = pcm.size() / num_channels;
auto encoded = enc.Encode(pcm, frame_size);
for (auto&& p : encoded) {
auto length_message =
cvd::CreateMessage(static_cast<std::uint32_t>(p.size()));
client_socket.SendNoSignal(length_message);
client_socket.SendNoSignal(cvd::CreateMessage(frame_size));
client_socket.SendNoSignal(p);
}
}
}
[[noreturn]] void AudioStreamerUpdateLoop(AudioStreamer* audio_streamer) {
while (true) {
audio_streamer->Update();
}
}
[[noreturn]] void MainLoop() {
AudioStreamer audio_streamer;
std::thread audio_streamer_update_thread;
auto server = cvd::ServerSocket(FLAGS_port);
while (true) {
LOG(INFO) << "waiting for client connection";
auto client = server.Accept();
LOG(INFO) << "client socket accepted";
if (!audio_streamer_update_thread.joinable()) {
audio_streamer_update_thread =
std::thread{AudioStreamerUpdateLoop, &audio_streamer};
}
std::thread(HandleClient, &audio_streamer, std::move(client)).detach();
}
}
} // namespace
int main(int argc, char* argv[]) {
::android::base::InitLogging(argv, android::base::StderrLogger);
gflags::SetUsageMessage(" ");
google::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_port <= 0) {
std::cerr << "--port must be specified.\n";
return 1;
}
MainLoop();
}