blob: d037478515cb822905bddecd0572e52fa243a37d [file] [log] [blame]
//
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <curl/curl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fstream>
#include <functional>
#include <iomanip>
#include <iostream>
#include <iterator>
#include <optional>
#include <string>
#include <unordered_map>
#include <android-base/logging.h>
#include <android-base/result.h>
#include <android-base/strings.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/text_format.h>
#include <google/protobuf/util/delimited_message_util.h>
#include "common/libs/fs/shared_buf.h"
#include "common/libs/fs/shared_fd.h"
#include "common/libs/fs/shared_select.h"
#include "common/libs/utils/archive.h"
#include "common/libs/utils/files.h"
#include "common/libs/utils/flag_parser.h"
#include "common/libs/utils/subprocess.h"
#include "host/commands/test_gce_driver/gce_api.h"
#include "host/commands/test_gce_driver/key_pair.h"
#include "host/commands/test_gce_driver/scoped_instance.h"
#include "host/libs/web/build_api.h"
#include "host/libs/web/credential_source.h"
#include "host/libs/web/curl_wrapper.h"
#include "host/libs/web/install_zip.h"
#include "test_gce_driver.pb.h"
using android::base::Error;
using android::base::Result;
using google::protobuf::util::ParseDelimitedFromZeroCopyStream;
using google::protobuf::util::SerializeDelimitedToFileDescriptor;
namespace cuttlefish {
namespace {
Result<Json::Value> ReadJsonFromFile(const std::string& path) {
Json::CharReaderBuilder builder;
std::ifstream ifs(path);
Json::Value content;
std::string errorMessage;
CF_EXPECT(Json::parseFromStream(builder, ifs, &content, &errorMessage),
"Could not read config file \"" << path << "\": " << errorMessage);
return content;
}
class ReadEvalPrintLoop {
public:
ReadEvalPrintLoop(GceApi& gce, BuildApi& build, int in_fd, int out_fd,
bool internal_addresses)
: gce_(gce),
build_(build),
in_(in_fd),
out_(out_fd),
internal_addresses_(internal_addresses) {}
Result<void> Process() {
while (true) {
test_gce_driver::TestMessage msg;
bool clean_eof;
LOG(DEBUG) << "Waiting for message";
bool parsed = ParseDelimitedFromZeroCopyStream(&msg, &in_, &clean_eof);
LOG(DEBUG) << "Received message";
if (clean_eof) {
return {};
} else if (!parsed) {
return Error() << "Failed to parse input message";
}
Result<void> handler_result;
switch (msg.contents_case()) {
case test_gce_driver::TestMessage::ContentsCase::kExit: {
test_gce_driver::TestMessage stream_end_msg;
stream_end_msg.mutable_exit(); // Set this in the oneof
if (!SerializeDelimitedToFileDescriptor(stream_end_msg, out_)) {
return Error() << "Failure while writing stream end message";
}
return {};
}
case test_gce_driver::TestMessage::ContentsCase::kStreamEnd:
continue;
case test_gce_driver::TestMessage::ContentsCase::kCreateInstance:
handler_result = NewInstance(msg.create_instance());
break;
case test_gce_driver::TestMessage::ContentsCase::kSshCommand:
handler_result = SshCommand(msg.ssh_command());
break;
case test_gce_driver::TestMessage::ContentsCase::kUploadBuildArtifact:
handler_result = UploadBuildArtifact(msg.upload_build_artifact());
break;
case test_gce_driver::TestMessage::ContentsCase::kUploadFile:
handler_result = UploadFile(msg.upload_file());
break;
default: {
std::string msg_txt;
if (google::protobuf::TextFormat::PrintToString(msg, &msg_txt)) {
handler_result = Error()
<< "Unexpected message: \"" << msg_txt << "\"";
} else {
handler_result = Error() << "Unexpected message: (unprintable)";
}
}
}
if (!handler_result.ok()) {
test_gce_driver::TestMessage error_msg;
error_msg.mutable_error()->set_text(handler_result.error().message());
CF_EXPECT(SerializeDelimitedToFileDescriptor(error_msg, out_),
"Failure while writing error message: (\n"
<< handler_result.error() << "\n)");
}
test_gce_driver::TestMessage stream_end_msg;
stream_end_msg.mutable_stream_end(); // Set this in the oneof
CF_EXPECT(SerializeDelimitedToFileDescriptor(stream_end_msg, out_));
}
return {};
}
private:
Result<void> NewInstance(const test_gce_driver::CreateInstance& request) {
CF_EXPECT(request.id().name() != "", "Instance name must be specified");
CF_EXPECT(request.id().zone() != "", "Instance zone must be specified");
auto instance = CF_EXPECT(ScopedGceInstance::CreateDefault(
gce_, request.id().zone(), request.id().name(), internal_addresses_));
instances_.emplace(request.id().name(), std::move(instance));
return {};
}
Result<void> SshCommand(const test_gce_driver::SshCommand& request) {
auto instance = instances_.find(request.instance().name());
CF_EXPECT(instance != instances_.end(),
"Instance \"" << request.instance().name() << "\" not found");
auto ssh = CF_EXPECT(instance->second->Ssh());
for (auto argument : request.arguments()) {
ssh.RemoteParameter(argument);
}
std::optional<Subprocess> ssh_proc;
SharedFD stdout_read;
SharedFD stderr_read;
{ // Things created here need to be closed early
auto cmd = ssh.Build();
SharedFD stdout_write;
CF_EXPECT(SharedFD::Pipe(&stdout_read, &stdout_write));
cmd.RedirectStdIO(Subprocess::StdIOChannel::kStdOut, stdout_write);
SharedFD stderr_write;
CF_EXPECT(SharedFD::Pipe(&stderr_read, &stderr_write));
cmd.RedirectStdIO(Subprocess::StdIOChannel::kStdErr, stderr_write);
ssh_proc = cmd.Start();
}
while (stdout_read->IsOpen() || stderr_read->IsOpen()) {
SharedFDSet read_set;
if (stdout_read->IsOpen()) {
read_set.Set(stdout_read);
}
if (stderr_read->IsOpen()) {
read_set.Set(stderr_read);
}
Select(&read_set, nullptr, nullptr, nullptr);
if (read_set.IsSet(stdout_read)) {
char buffer[1 << 14];
auto read = stdout_read->Read(buffer, sizeof(buffer));
CF_EXPECT(read >= 0,
"Failure in reading ssh stdout: " << stdout_read->StrError());
if (read == 0) { // EOF
stdout_read = SharedFD();
} else {
test_gce_driver::TestMessage stdout_chunk;
stdout_chunk.mutable_data()->set_type(
test_gce_driver::DataType::DATA_TYPE_STDOUT);
stdout_chunk.mutable_data()->set_contents(buffer, read);
CF_EXPECT(SerializeDelimitedToFileDescriptor(stdout_chunk, out_));
}
}
if (read_set.IsSet(stderr_read)) {
char buffer[1 << 14];
auto read = stderr_read->Read(buffer, sizeof(buffer));
CF_EXPECT(read >= 0,
"Failure in reading ssh stderr: " << stdout_read->StrError());
if (read == 0) { // EOF
stderr_read = SharedFD();
} else {
test_gce_driver::TestMessage stderr_chunk;
stderr_chunk.mutable_data()->set_type(
test_gce_driver::DataType::DATA_TYPE_STDERR);
stderr_chunk.mutable_data()->set_contents(buffer, read);
CF_EXPECT(SerializeDelimitedToFileDescriptor(stderr_chunk, out_));
}
}
}
auto ret = ssh_proc->Wait();
test_gce_driver::TestMessage retcode_chunk;
retcode_chunk.mutable_data()->set_type(
test_gce_driver::DataType::DATA_TYPE_RETURN_CODE);
retcode_chunk.mutable_data()->set_contents(std::to_string(ret));
CF_EXPECT(SerializeDelimitedToFileDescriptor(retcode_chunk, out_));
return {};
}
Result<void> UploadBuildArtifact(
const test_gce_driver::UploadBuildArtifact& request) {
auto instance = instances_.find(request.instance().name());
CF_EXPECT(instance != instances_.end(),
"Instance \"" << request.instance().name() << "\" not found");
struct {
ScopedGceInstance* instance;
SharedFD ssh_in;
std::optional<Subprocess> ssh_proc;
Result<void> result;
} callback_state;
callback_state.instance = instance->second.get();
auto callback = [&request, &callback_state](char* data, size_t size) {
if (data == nullptr) {
auto ssh = callback_state.instance->Ssh();
if (!ssh.ok()) {
callback_state.result = CF_ERR("ssh command failed\n" << ssh.error());
return false;
}
SharedFD ssh_stdin_out;
if (!SharedFD::Pipe(&ssh_stdin_out, &callback_state.ssh_in)) {
callback_state.result = CF_ERRNO("pipe failed");
return false;
}
ssh->RemoteParameter("cat >" + request.remote_path());
auto command = ssh->Build();
command.RedirectStdIO(Subprocess::StdIOChannel::kStdIn, ssh_stdin_out);
callback_state.ssh_proc = command.Start();
} else if (WriteAll(callback_state.ssh_in, data, size) != size) {
callback_state.ssh_proc->Stop();
callback_state.result = CF_ERR("Failed to write contents\n"
<< callback_state.ssh_in->StrError());
return false;
}
return true;
};
DeviceBuild build(request.build().id(), request.build().target());
CF_EXPECT(
build_.ArtifactToCallback(build, request.artifact_name(), callback),
"Failed to send file: (\n"
<< (callback_state.result.ok()
? "Unknown failure"
: callback_state.result.error().message() + "\n)"));
callback_state.ssh_in->Close();
if (callback_state.ssh_proc) {
auto ssh_ret = callback_state.ssh_proc->Wait();
CF_EXPECT(ssh_ret == 0, "SSH command failed with code: " << ssh_ret);
}
return {};
}
Result<void> UploadFile(const test_gce_driver::UploadFile& request) {
auto instance = instances_.find(request.instance().name());
CF_EXPECT(instance != instances_.end(),
"Instance \"" << request.instance().name() << "\" not found");
auto ssh = CF_EXPECT(instance->second->Ssh());
ssh.RemoteParameter("cat >" + request.remote_path());
auto command = ssh.Build();
SharedFD ssh_stdin_out, ssh_stdin_in;
CF_EXPECT(SharedFD::Pipe(&ssh_stdin_out, &ssh_stdin_in), strerror(errno));
command.RedirectStdIO(Subprocess::StdIOChannel::kStdIn, ssh_stdin_out);
auto ssh_proc = command.Start();
ssh_stdin_out->Close();
{
Command unused = std::move(command); // force deletion
}
while (true) {
LOG(INFO) << "upload data loop";
test_gce_driver::TestMessage data_msg;
bool clean_eof;
LOG(DEBUG) << "Waiting for message";
bool parsed =
ParseDelimitedFromZeroCopyStream(&data_msg, &in_, &clean_eof);
if (clean_eof) {
ssh_proc.Stop();
return CF_ERR("Received EOF");
} else if (!parsed) {
ssh_proc.Stop();
return CF_ERR("Failed to parse message");
} else if (data_msg.contents_case() ==
test_gce_driver::TestMessage::ContentsCase::kStreamEnd) {
break;
} else if (data_msg.contents_case() !=
test_gce_driver::TestMessage::ContentsCase::kData) {
ssh_proc.Stop();
return CF_ERR(
"Received wrong type of message: " << data_msg.contents_case());
} else if (data_msg.data().type() !=
test_gce_driver::DataType::DATA_TYPE_FILE_CONTENTS) {
ssh_proc.Stop();
return CF_ERR(
"Received unexpected data type: " << data_msg.data().type());
}
LOG(INFO) << "going to write message of size "
<< data_msg.data().contents().size();
if (WriteAll(ssh_stdin_in, data_msg.data().contents()) !=
data_msg.data().contents().size()) {
ssh_proc.Stop();
return CF_ERR("Failed to write contents: " << ssh_stdin_in->StrError());
}
LOG(INFO) << "successfully wrote message?";
}
ssh_stdin_in->Close();
auto ssh_ret = ssh_proc.Wait();
CF_EXPECT(ssh_ret == 0, "SSH command failed with code: " << ssh_ret);
return {};
}
GceApi& gce_;
BuildApi& build_;
google::protobuf::io::FileInputStream in_;
int out_;
bool internal_addresses_;
std::unordered_map<std::string, std::unique_ptr<ScopedGceInstance>>
instances_;
};
} // namespace
Result<void> TestGceDriverMain(int argc, char** argv) {
std::vector<Flag> flags;
std::string service_account_json_private_key_path = "";
flags.emplace_back(GflagsCompatFlag("service-account-json-private-key-path",
service_account_json_private_key_path));
std::string cloud_project = "";
flags.emplace_back(GflagsCompatFlag("cloud-project", cloud_project));
bool internal_addresses = false;
flags.emplace_back(
GflagsCompatFlag("internal-addresses", internal_addresses));
std::vector<std::string> args =
ArgsToVec(argc - 1, argv + 1); // Skip argv[0]
CF_EXPECT(ParseFlags(flags, args), "Could not process command line flags.");
auto service_json =
CF_EXPECT(ReadJsonFromFile(service_account_json_private_key_path));
static constexpr char COMPUTE_SCOPE[] =
"https://www.googleapis.com/auth/compute";
auto curl = CurlWrapper::Create();
auto gce_creds = CF_EXPECT(ServiceAccountOauthCredentialSource::FromJson(
*curl, service_json, COMPUTE_SCOPE));
GceApi gce(*curl, gce_creds, cloud_project);
static constexpr char BUILD_SCOPE[] =
"https://www.googleapis.com/auth/androidbuild.internal";
auto build_creds = CF_EXPECT(ServiceAccountOauthCredentialSource::FromJson(
*curl, service_json, BUILD_SCOPE));
BuildApi build(*curl, &build_creds);
ReadEvalPrintLoop executor(gce, build, STDIN_FILENO, STDOUT_FILENO,
internal_addresses);
LOG(INFO) << "Starting processing";
CF_EXPECT(executor.Process());
return {};
}
} // namespace cuttlefish
int main(int argc, char** argv) {
auto res = cuttlefish::TestGceDriverMain(argc, argv);
CHECK(res.ok()) << "cvd_test_gce_driver failed: " << res.error();
}