blob: 5722c61392be0b7539d99945c5b0c7a569e8ed79 [file] [log] [blame]
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "os/reactor.h"
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <algorithm>
#include <cerrno>
#include <cinttypes>
#include <cstring>
#include "os/log.h"
namespace {
// Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
constexpr int kEpollMaxEvents = 64;
constexpr uint64_t kStopReactor = 1 << 0;
constexpr uint64_t kWaitForIdle = 1 << 1;
} // namespace
namespace bluetooth {
namespace os {
using common::Closure;
class Reactor::Reactable {
public:
Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
: fd_(fd),
on_read_ready_(std::move(on_read_ready)),
on_write_ready_(std::move(on_write_ready)),
is_executing_(false),
removed_(false) {}
const int fd_;
Closure on_read_ready_;
Closure on_write_ready_;
bool is_executing_;
bool removed_;
std::mutex mutex_;
std::unique_ptr<std::promise<void>> finished_promise_;
};
Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno));
control_fd_ = eventfd(0, EFD_NONBLOCK);
ASSERT(control_fd_ != -1);
epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
int result;
RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
ASSERT(result != -1);
}
Reactor::~Reactor() {
int result;
RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
ASSERT(result != -1);
RUN_NO_INTR(result = close(control_fd_));
ASSERT(result != -1);
RUN_NO_INTR(result = close(epoll_fd_));
ASSERT(result != -1);
}
void Reactor::Run() {
bool already_running = is_running_.exchange(true);
ASSERT(!already_running);
int timeout_ms = -1;
bool waiting_for_idle = false;
for (;;) {
{
std::unique_lock<std::mutex> lock(mutex_);
invalidation_list_.clear();
}
epoll_event events[kEpollMaxEvents];
int count;
RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
ASSERT(count != -1);
if (waiting_for_idle && count == 0) {
timeout_ms = -1;
waiting_for_idle = false;
idle_promise_->set_value();
idle_promise_ = nullptr;
}
for (int i = 0; i < count; ++i) {
auto event = events[i];
ASSERT(event.events != 0u);
// If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
if (event.data.ptr == nullptr) {
uint64_t value;
eventfd_read(control_fd_, &value);
if ((value & kStopReactor) != 0) {
is_running_ = false;
return;
} else if ((value & kWaitForIdle) != 0) {
timeout_ms = 30;
waiting_for_idle = true;
continue;
} else {
LOG_ERROR("Unknown control_fd value %" PRIu64 "x", value);
continue;
}
}
auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
std::unique_lock<std::mutex> lock(mutex_);
executing_reactable_finished_ = nullptr;
// See if this reactable has been removed in the meantime.
if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
continue;
}
{
std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
lock.unlock();
reactable->is_executing_ = true;
}
if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
reactable->on_read_ready_.Run();
}
if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
reactable->on_write_ready_.Run();
}
{
std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
reactable->is_executing_ = false;
if (reactable->removed_) {
reactable->finished_promise_->set_value();
reactable_lock.unlock();
delete reactable;
}
}
}
}
}
void Reactor::Stop() {
if (!is_running_) {
LOG_WARN("not running, will stop once it's started");
}
auto control = eventfd_write(control_fd_, kStopReactor);
ASSERT(control != -1);
}
Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
uint32_t poll_event_type = 0;
if (!on_read_ready.is_null()) {
poll_event_type |= (EPOLLIN | EPOLLRDHUP);
}
if (!on_write_ready.is_null()) {
poll_event_type |= EPOLLOUT;
}
auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
epoll_event event = {
.events = poll_event_type,
.data = {.ptr = reactable},
};
int register_fd;
RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
ASSERT(register_fd != -1);
return reactable;
}
void Reactor::Unregister(Reactor::Reactable* reactable) {
ASSERT(reactable != nullptr);
{
std::lock_guard<std::mutex> lock(mutex_);
invalidation_list_.push_back(reactable);
}
bool delaying_delete_until_callback_finished = false;
{
int result;
std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
if (result == -1 && errno == ENOENT) {
LOG_INFO("reactable is invalid or unregistered");
} else {
ASSERT(result != -1);
}
// If we are unregistering during the callback event from this reactable, we delete it after the callback is
// executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
if (reactable->is_executing_) {
reactable->removed_ = true;
reactable->finished_promise_ = std::make_unique<std::promise<void>>();
executing_reactable_finished_ = std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
delaying_delete_until_callback_finished = true;
}
}
// If we are unregistering outside of the callback event from this reactable, we delete it now
if (!delaying_delete_until_callback_finished) {
delete reactable;
}
}
bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
std::lock_guard<std::mutex> lock(mutex_);
if (executing_reactable_finished_ == nullptr) {
return true;
}
auto stop_status = executing_reactable_finished_->wait_for(timeout);
if (stop_status != std::future_status::ready) {
LOG_ERROR("Unregister reactable timed out");
}
return stop_status == std::future_status::ready;
}
bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
auto promise = std::make_shared<std::promise<void>>();
auto future = std::make_unique<std::future<void>>(promise->get_future());
{
std::lock_guard<std::mutex> lock(mutex_);
idle_promise_ = promise;
}
auto control = eventfd_write(control_fd_, kWaitForIdle);
ASSERT(control != -1);
auto idle_status = future->wait_for(timeout);
return idle_status == std::future_status::ready;
}
void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) {
ASSERT(reactable != nullptr);
uint32_t poll_event_type = 0;
if (!on_read_ready.is_null()) {
poll_event_type |= (EPOLLIN | EPOLLRDHUP);
}
if (!on_write_ready.is_null()) {
poll_event_type |= EPOLLOUT;
}
{
std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
reactable->on_read_ready_ = std::move(on_read_ready);
reactable->on_write_ready_ = std::move(on_write_ready);
}
epoll_event event = {
.events = poll_event_type,
.data = {.ptr = reactable},
};
int modify_fd;
RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
ASSERT(modify_fd != -1);
}
} // namespace os
} // namespace bluetooth