blob: 1c371055489eeb3a24fc9575424213bd88a015a9 [file] [log] [blame]
// Copyright 2020 The Android Open Source Project
//
// This software is licensed under the terms of the GNU General Public
// License version 2, as published by the Free Software Foundation, and
// may be copied, distributed, and modified under those terms.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// this implements multi-threaded WaitForMultipleObjects on windows
// for more than 64 objects
//
// main idea:
// use thread pools to wait on condition variable that is send out
// whenever there are work to do from calling thread.
// 0. when the number of fds is <= 64, use the old implementation
// in oslib-win32.c where a single call of WaitForMultipleObjects is used,
// that is the fastest; multi-threading does not help, as the fired
// events are very sparse. synchronization cost using mt is over-kill
// 1. use as few threads as possible, and once started keep them persistent
// and wait for new job. std::async is better avoided, as its performance
// is not predictable. based on testing, it is important to use thread pool
// as the alternative is taking 0.5ms on windows, too much for poll,
// translates to 20% to 30% boot time increase
// 2. once the minimum number of threads are decided, the work load should be
// balanced among them.
// 3. need to avoid vector create/destroy as well; now the performance is almost
// on par with singla wait for fewer than 64 fds (about 5% slower)
// 4. G_WIN32_MSG_HANDLE is not handled, as it does not happen on emulator
//
#include <glib.h>
extern "C" {
#include <windows.h>
#include "qemu/osdep.h"
}
#include <stdio.h>
#include <condition_variable>
#include <thread>
#include <vector>
#define WIN32_POLL_DEBUG 0
#if WIN32_POLL_DEBUG
#define DD(fmt, ...) \
fprintf(stderr, "win32poll: %s:%d thread id %llu " fmt "\n", __func__, \
__LINE__, std::this_thread::get_id(), ##__VA_ARGS__);
#else
#define DD(fmt, ...)
#endif
namespace {
constexpr int kNumFdsPerItem = 60;
constexpr int kMaxNumFdsPerItem = MAXIMUM_WAIT_OBJECTS;
struct WorkItem {
GPollFD *pfds{nullptr};
HANDLE handles[kMaxNumFdsPerItem];
HANDLE stopSignal;
HANDLE doneSignal;
int nfds{0};
int fired{0};
int timeout;
};
struct PollWorkerState {
std::vector<std::unique_ptr<std::condition_variable>> s_cv;
std::vector<std::unique_ptr<std::mutex>> s_cv_m;
bool s_should_quit{false};
HANDLE s_stopSignal;
std::vector<HANDLE> s_thread_done_signals;
std::vector<std::thread> s_threads;
std::vector<WorkItem *> s_works;
std::vector<WorkItem> s_real_works;
};
static PollWorkerState sPollWorkerState;
int one_wait_job(WorkItem *pitem) {
auto &item = *pitem;
constexpr int array_size = kMaxNumFdsPerItem;
HANDLE *handles = item.handles;
assert(item.nfds < array_size);
int nfds = 0;
for (int i = 0; i < item.nfds; ++i) {
auto &gfd = item.pfds[i];
handles[nfds++] = reinterpret_cast<HANDLE>(gfd.fd);
gfd.revents = 0;
}
// put the stop signal at the end, so to avoid
// 1 wait in terms of timeout
handles[nfds++] = item.stopSignal;
HANDLE *phandles = handles;
int ret = WaitForMultipleObjects(nfds, phandles, FALSE, item.timeout);
if (ret == WAIT_FAILED) {
item.fired = 0;
SetEvent(item.doneSignal);
return -1;
}
if (ret == WAIT_TIMEOUT) {
item.fired = 0;
SetEvent(item.doneSignal);
return 0;
}
int numReady = 0;
while (ret >= WAIT_OBJECT_0 && ret < WAIT_OBJECT_0 + nfds) {
const int offset = phandles - handles;
const int indexFired = ret - WAIT_OBJECT_0;
const int indexOriginal = indexFired + offset;
if (handles[indexOriginal] != item.stopSignal) {
auto &gfd = item.pfds[indexOriginal];
gfd.revents = gfd.events;
DD("found revents %d handle %llu", gfd.revents, gfd.fd);
if (gfd.revents != 0) {
++numReady;
}
}
// prepare for next check: skip the fired one (1) and the ones before it
// (indexFired)
const int num_handles_to_ignore = (indexFired + 1);
nfds -= num_handles_to_ignore;
phandles += num_handles_to_ignore;
constexpr int timeout = 0; // no wait
if (nfds == 0) {
break;
}
ret = WaitForMultipleObjects(nfds, phandles, FALSE, timeout);
}
SetEvent(item.doneSignal);
DD("this thread returns with %d", numReady);
item.fired = numReady;
return numReady;
}
void single_thread_work(int idx) {
while (!sPollWorkerState.s_should_quit) {
DD("started work thread %d", idx);
WorkItem *pitem = nullptr;
{
std::unique_lock<std::mutex> lk(*(sPollWorkerState.s_cv_m[idx]));
sPollWorkerState.s_cv[idx]->wait(lk, [idx] {
return sPollWorkerState.s_works[idx] != nullptr ||
sPollWorkerState.s_should_quit;
});
pitem = sPollWorkerState.s_works[idx];
sPollWorkerState.s_works[idx] = nullptr;
}
if (sPollWorkerState.s_should_quit) {
return;
}
// get work
DD("really work now work thread %d", idx);
one_wait_job(pitem);
}
}
int wait_async(GPollFD *fds, int nfds, std::vector<WorkItem> &items,
int timeout, bool need_mt) {
if (sPollWorkerState.s_works.empty()) {
// one time only
// note, we dont want to change this vector when more threads are added,
// as existing threads are waiting on it to become non-nullptr
// so just allocate the maximum numbers and done with it
sPollWorkerState.s_works.resize(kMaxNumFdsPerItem, nullptr);
sPollWorkerState.s_cv_m.resize(kMaxNumFdsPerItem);
sPollWorkerState.s_cv.resize(kMaxNumFdsPerItem);
// the one and only one stop signal
sPollWorkerState.s_stopSignal = CreateEventW(NULL, TRUE, FALSE, NULL);
}
int nwork = 1;
int x_kNumFdsPerItem = kNumFdsPerItem;
if (1) {
static_assert(kNumFdsPerItem < kMaxNumFdsPerItem);
static_assert(kMaxNumFdsPerItem <= MAXIMUM_WAIT_OBJECTS);
const int remain = nfds % kNumFdsPerItem;
nwork = nfds / kNumFdsPerItem + ((remain > 0) ? 1 : 0);
x_kNumFdsPerItem = kNumFdsPerItem;
if (nwork > 1) {
// rebalance the work load for each thread;
x_kNumFdsPerItem = nfds / nwork;
if (x_kNumFdsPerItem * nwork != nfds) {
x_kNumFdsPerItem +=
1; // add one so the last one does not overflow
}
assert(x_kNumFdsPerItem < kMaxNumFdsPerItem);
DD("rebalancing: work per item is %d fds", x_kNumFdsPerItem);
}
if (items.size() < nwork) {
items.resize(nwork);
}
DD("size of real_works %d", items.size());
}
// need to reset sPollWorkerState.s_stopSignal each time
ResetEvent(sPollWorkerState.s_stopSignal);
for (int i = sPollWorkerState.s_thread_done_signals.size(); i < nwork;
++i) {
auto handle = CreateEventW(NULL, TRUE, FALSE, NULL);
sPollWorkerState.s_thread_done_signals.push_back(handle);
sPollWorkerState.s_cv_m[i].reset(new std::mutex);
sPollWorkerState.s_cv[i].reset(new std::condition_variable);
sPollWorkerState.s_threads.emplace_back(
std::move(std::thread(single_thread_work, i)));
}
for (int i = 0; i < nwork; ++i) {
DD("prepare for %d th item", i);
auto handle = sPollWorkerState.s_thread_done_signals[i];
ResetEvent(handle);
items[i].stopSignal = sPollWorkerState.s_stopSignal;
items[i].doneSignal = handle;
items[i].timeout = timeout;
GPollFD *pfds = fds + i * x_kNumFdsPerItem;
const int num_fds =
(i == nwork - 1) ? (nfds - i * x_kNumFdsPerItem) : x_kNumFdsPerItem;
assert(num_fds + 1 <= kMaxNumFdsPerItem);
items[i].pfds = pfds;
items[i].nfds = num_fds;
if (need_mt) {
std::lock_guard<std::mutex> lk(*(sPollWorkerState.s_cv_m[i]));
sPollWorkerState.s_works[i] = &(items[i]);
sPollWorkerState.s_cv[i]->notify_all();
} else {
one_wait_job(&(items[i]));
}
}
if (need_mt) {
DD("wait for any thread to complete");
// wait for any threads to signal done
int ret = WaitForMultipleObjects(
nwork, sPollWorkerState.s_thread_done_signals.data(), FALSE,
items[0].timeout);
// if there is any thread signaled, stop all the others
SetEvent(sPollWorkerState.s_stopSignal);
DD("wait for all threads to complete");
// wait for all to signal
ret = WaitForMultipleObjects(
nwork, sPollWorkerState.s_thread_done_signals.data(), TRUE,
INFINITE);
}
int numReady = 0;
for (int i = 0; i < nwork; ++i) {
numReady += items[i].fired;
}
return numReady;
}
int wait_driver(GPollFD *fds, int nfds, int timeout) {
DD("start driver... nfds %d", nfds);
DD("start wait_async");
bool need_mt = timeout > 0;
if (nfds <= kNumFdsPerItem) {
need_mt = false;
}
int numReady =
wait_async(fds, nfds, sPollWorkerState.s_real_works, timeout, need_mt);
DD("done %d", numReady);
return numReady;
}
} // anonymous namespace
extern "C" {
// timeout: -1: wait infinite
// 0: no wait, just check fd status
// >=1: wait 1ms or more
int win32_wait_for_objects(GPollFD *fds, int nfds, int timeout) {
if (timeout == -1) {
timeout = INFINITE;
}
int ret = wait_driver(fds, nfds, timeout);
return ret;
}
}