blob: c5ecc44aee474352ec98101f4d89a6158c424123 [file] [log] [blame]
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <ctime>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp.h>
#include "gnss_grpc_proxy.grpc.pb.h"
#include <signal.h>
#include <chrono>
#include <deque>
#include <mutex>
#include <sstream>
#include <thread>
#include <vector>
#include <android-base/logging.h>
#include <android-base/strings.h>
#include <gflags/gflags.h>
#include <common/libs/fs/shared_fd.h>
#include <common/libs/fs/shared_buf.h>
#include <common/libs/fs/shared_select.h>
#include <host/libs/config/cuttlefish_config.h>
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using gnss_grpc_proxy::SendNmeaRequest;
using gnss_grpc_proxy::SendNmeaReply;
using gnss_grpc_proxy::GnssGrpcProxy;
DEFINE_int32(gnss_in_fd,
-1,
"File descriptor for the gnss's input channel");
DEFINE_int32(gnss_out_fd,
-1,
"File descriptor for the gnss's output channel");
DEFINE_int32(gnss_grpc_port,
-1,
"Service port for gnss grpc");
DEFINE_string(gnss_file_path,
"",
"NMEA file path for gnss grpc");
constexpr char CMD_GET_LOCATION[] = "CMD_GET_LOCATION";
constexpr char CMD_GET_RAWMEASUREMENT[] = "CMD_GET_RAWMEASUREMENT";
constexpr char END_OF_MSG_MARK[] = "\n\n\n\n";
constexpr uint32_t GNSS_SERIAL_BUFFER_SIZE = 4096;
// Logic and data behind the server's behavior.
class GnssGrpcProxyServiceImpl final : public GnssGrpcProxy::Service {
public:
GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,
cuttlefish::SharedFD gnss_out) : gnss_in_(gnss_in),
gnss_out_(gnss_out) {}
Status SendNmea(ServerContext* context, const SendNmeaRequest* request,
SendNmeaReply* reply) override {
reply->set_reply("Received nmea record.");
auto buffer = request->nmea();
std::lock_guard<std::mutex> lock(cached_nmea_mutex);
cached_nmea = request->nmea();
return Status::OK;
}
void sendToSerial() {
std::lock_guard<std::mutex> lock(cached_nmea_mutex);
if (!isNMEA(cached_nmea)) {
return;
}
ssize_t bytes_written =
cuttlefish::WriteAll(gnss_in_, cached_nmea + END_OF_MSG_MARK);
if (bytes_written < 0) {
LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
}
}
void sendGnssRawToSerial() {
std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
if (!isGnssRawMeasurement(cached_gnss_raw)) {
return;
}
if (previous_cached_gnss_raw == cached_gnss_raw) {
// Skip for same record
return;
} else {
// Update cached data
LOG(DEBUG) << "Skip same record";
previous_cached_gnss_raw = cached_gnss_raw;
}
ssize_t bytes_written =
cuttlefish::WriteAll(gnss_in_, cached_gnss_raw + END_OF_MSG_MARK);
LOG(DEBUG) << "Send Gnss Raw to serial: bytes_written: " << bytes_written;
if (bytes_written < 0) {
LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
}
}
void StartServer() {
// Create a new thread to handle writes to the gnss and to the any client
// connected to the socket.
read_thread_ = std::thread([this]() { ReadLoop(); });
}
void StartReadNmeaFileThread() {
// Create a new thread to read nmea data.
nmea_file_read_thread_ =
std::thread([this]() { ReadNmeaFromLocalFile(); });
}
void StartReadGnssRawMeasurementFileThread() {
// Create a new thread to read raw measurement data.
measurement_file_read_thread_ =
std::thread([this]() { ReadGnssRawMeasurement(); });
}
void ReadNmeaFromLocalFile() {
std::ifstream file(FLAGS_gnss_file_path);
if (file.is_open()) {
std::string line;
std::string lastLine;
int count = 0;
while (std::getline(file, line)) {
count++;
/* Only support a lite version of NEMA format to make it simple.
* Records will only contains $GPGGA, $GPRMC,
* $GPGGA,213204.00,3725.371240,N,12205.589239,W,7,,0.38,-26.75,M,0.0,M,0.0,0000*78
* $GPRMC,213204.00,A,3725.371240,N,12205.589239,W,000.0,000.0,290819,,,A*49
* $GPGGA,....
* $GPRMC,....
* Sending at 1Hz, currently user should
* provide a NMEA file that has one location per second. need some extra work
* to make it more generic, i.e. align with the timestamp in the file.
*/
if (count % 2 == 0) {
{
std::lock_guard<std::mutex> lock(cached_nmea_mutex);
cached_nmea = lastLine + '\n' + line;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
lastLine = line;
}
file.close();
} else {
LOG(ERROR) << "Can not open NMEA file: " << FLAGS_gnss_file_path ;
return;
}
}
void ReadGnssRawMeasurement() {
std::ifstream file(FLAGS_gnss_file_path);
if (file.is_open()) {
std::string line;
std::string cached_line = "";
std::string header = "";
while (!cached_line.empty() || std::getline(file, line)) {
if (!cached_line.empty()) {
line = cached_line;
cached_line = "";
}
// Get data header.
if (header.empty() && android::base::StartsWith(line, "# Raw")) {
header = line;
LOG(DEBUG) << "Header: " << header;
continue;
}
// Ignore not raw measurement data.
if (!android::base::StartsWith(line, "Raw")) {
continue;
}
{
std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
cached_gnss_raw = header + "\n" + line;
std::string new_line = "";
while (std::getline(file, new_line)) {
// Group raw data by TimeNanos.
if (getTimeNanosFromLine(new_line) ==
getTimeNanosFromLine(line)) {
cached_gnss_raw += "\n" + new_line;
} else {
cached_line = new_line;
break;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
file.close();
} else {
LOG(ERROR) << "Can not open GNSS Raw file: " << FLAGS_gnss_file_path;
return;
}
}
~GnssGrpcProxyServiceImpl() {
if (nmea_file_read_thread_.joinable()) {
nmea_file_read_thread_.join();
}
if (measurement_file_read_thread_.joinable()) {
measurement_file_read_thread_.join();
}
if (read_thread_.joinable()) {
read_thread_.join();
}
}
private:
[[noreturn]] void ReadLoop() {
cuttlefish::SharedFDSet read_set;
read_set.Set(gnss_out_);
std::vector<char> buffer(GNSS_SERIAL_BUFFER_SIZE);
int total_read = 0;
std::string gnss_cmd_str;
int flags = gnss_out_->Fcntl(F_GETFL, 0);
gnss_out_->Fcntl(F_SETFL, flags | O_NONBLOCK);
while (true) {
auto bytes_read = gnss_out_->Read(buffer.data(), buffer.size());
if (bytes_read > 0) {
std::string s(buffer.data(), bytes_read);
gnss_cmd_str += s;
// In case random string sent though /dev/gnss0, gnss_cmd_str will auto resize,
// to get rid of first page.
if (gnss_cmd_str.size() > GNSS_SERIAL_BUFFER_SIZE * 2) {
gnss_cmd_str = gnss_cmd_str.substr(gnss_cmd_str.size() - GNSS_SERIAL_BUFFER_SIZE);
}
total_read += bytes_read;
if (gnss_cmd_str.find(CMD_GET_LOCATION) != std::string::npos) {
sendToSerial();
gnss_cmd_str = "";
total_read = 0;
}
if (gnss_cmd_str.find(CMD_GET_RAWMEASUREMENT) != std::string::npos) {
sendGnssRawToSerial();
gnss_cmd_str = "";
total_read = 0;
}
} else {
if (gnss_out_->GetErrno() == EAGAIN|| gnss_out_->GetErrno() == EWOULDBLOCK) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
LOG(ERROR) << "Error reading fd " << FLAGS_gnss_out_fd << ": "
<< " Error code: " << gnss_out_->GetErrno()
<< " Error sg:" << gnss_out_->StrError();
}
}
}
}
std::string getTimeNanosFromLine(const std::string& line) {
// TimeNanos is in column #3.
std::vector<std::string> vals = android::base::Split(line, ",");
return vals.size() >= 3 ? vals[2] : "-1";
}
bool isGnssRawMeasurement(const std::string& inputStr) {
// TODO: add more logic check to by pass invalid data.
return !inputStr.empty() && android::base::StartsWith(inputStr, "# Raw");
}
bool isNMEA(const std::string& inputStr) {
return !inputStr.empty() &&
(android::base::StartsWith(inputStr, "$GPRMC") ||
android::base::StartsWith(inputStr, "$GPRMA"));
}
cuttlefish::SharedFD gnss_in_;
cuttlefish::SharedFD gnss_out_;
std::thread read_thread_;
std::thread nmea_file_read_thread_;
std::thread measurement_file_read_thread_;
std::string cached_nmea;
std::mutex cached_nmea_mutex;
std::string cached_gnss_raw;
std::string previous_cached_gnss_raw;
std::mutex cached_gnss_raw_mutex;
};
void RunServer() {
auto gnss_in = cuttlefish::SharedFD::Dup(FLAGS_gnss_in_fd);
close(FLAGS_gnss_in_fd);
if (!gnss_in->IsOpen()) {
LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_in_fd << ": "
<< gnss_in->StrError();
return;
}
close(FLAGS_gnss_in_fd);
auto gnss_out = cuttlefish::SharedFD::Dup(FLAGS_gnss_out_fd);
close(FLAGS_gnss_out_fd);
if (!gnss_out->IsOpen()) {
LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_out_fd << ": "
<< gnss_out->StrError();
return;
}
auto server_address("0.0.0.0:" + std::to_string(FLAGS_gnss_grpc_port));
GnssGrpcProxyServiceImpl service(gnss_in, gnss_out);
service.StartServer();
if (!FLAGS_gnss_file_path.empty()) {
// TODO: On-demand start the read file threads according to data type.
service.StartReadNmeaFileThread();
service.StartReadGnssRawMeasurementFileThread();
// In the local mode, we are not start a grpc server, use a infinite loop instead
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
} else {
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
}
int main(int argc, char** argv) {
::android::base::InitLogging(argv, android::base::StderrLogger);
::gflags::ParseCommandLineFlags(&argc, &argv, true);
LOG(DEBUG) << "Starting gnss grpc proxy server...";
RunServer();
return 0;
}