| #include "ge_backend.h" |
| |
| #define LOG_TAG "TuningFork.GE" |
| #include "Log.h" |
| |
| #include "tuningfork/protobuf_util.h" |
| |
| #include "runnable.h" |
| #include "web.h" |
| #include "tuningfork_utils.h" |
| |
| namespace tuningfork { |
| |
| constexpr Duration kUploadCheckInterval = std::chrono::seconds(10); |
| constexpr Duration kRequestTimeout = std::chrono::seconds(10); |
| |
| const char kRpcName[] = ":uploadTelemetry"; |
| |
| class UltimateUploader : public Runnable { |
| const TFCache* persister_; |
| WebRequest request_; |
| public: |
| UltimateUploader(const TFCache* persister, const WebRequest& request) |
| : Runnable(), persister_(persister), request_(request) {} |
| Duration DoWork() override { |
| CheckUploadPending(); |
| return kUploadCheckInterval; |
| } |
| void Run() override { |
| Runnable::Run(); |
| } |
| bool CheckUploadPending() { |
| CProtobufSerialization uploading_hists_ser; |
| if (persister_->get(HISTOGRAMS_UPLOADING, &uploading_hists_ser, |
| persister_->user_data)==TFERROR_OK) { |
| std::string request_json = ToString(uploading_hists_ser); |
| CProtobufSerialization_Free(&uploading_hists_ser); |
| int response_code = -1; |
| std::string body; |
| ALOGV("Got UPLOADING histograms: %s", request_json.c_str()); |
| TFErrorCode ret = request_.Send(kRpcName, request_json, response_code, body); |
| if (ret==TFERROR_OK) { |
| ALOGI("UPLOAD request returned %d %s", response_code, body.c_str()); |
| if (response_code==200) { |
| persister_->remove(HISTOGRAMS_UPLOADING, persister_->user_data); |
| return true; |
| } |
| } |
| else |
| ALOGW("Error %d when sending UPLOAD request\n%s", ret, request_json.c_str()); |
| } |
| else { |
| ALOGV("No upload pending"); |
| return true; |
| } |
| return false; |
| } |
| }; |
| |
| TFErrorCode GEBackend::Init(const Settings& settings, |
| const ExtraUploadInfo& extra_upload_info) { |
| |
| if (settings.EndpointUri().empty()) { |
| ALOGW("The base URI in Tuning Fork TFSettings is invalid"); |
| return TFERROR_BAD_PARAMETER; |
| } |
| if (settings.api_key.empty()) { |
| ALOGW("The API key in Tuning Fork TFSettings is invalid"); |
| return TFERROR_BAD_PARAMETER; |
| } |
| |
| Request rq(extra_upload_info, settings.EndpointUri(), settings.api_key, kRequestTimeout); |
| WebRequest web_request(rq); |
| |
| persister_ = settings.c_settings.persistent_cache; |
| |
| // TODO(b/140367226): Initialize a Java JobScheduler if we can |
| |
| if( ultimate_uploader_.get() == nullptr) { |
| ultimate_uploader_ = std::make_shared<UltimateUploader>(persister_, web_request); |
| ultimate_uploader_->Start(); |
| } |
| |
| return TFERROR_OK; |
| } |
| |
| GEBackend::~GEBackend() {} |
| |
| TFErrorCode GEBackend::Process(const std::string &evt_ser) { |
| |
| ALOGV("GEBackend::Process %s",evt_ser.c_str()); |
| |
| // Save event to file |
| CProtobufSerialization uploading_hists_ser; |
| ToCProtobufSerialization(evt_ser, uploading_hists_ser); |
| auto ret = persister_->set(HISTOGRAMS_UPLOADING, &uploading_hists_ser, |
| persister_->user_data); |
| |
| CProtobufSerialization_Free(&uploading_hists_ser); |
| |
| return ret; |
| } |
| |
| void GEBackend::KillThreads() { |
| if (ultimate_uploader_) |
| ultimate_uploader_->Stop(); |
| } |
| |
| } //namespace tuningfork { |