blob: 18cc4b8cc2d0e9cd52ba385ad507bf04b2fe03e0 [file] [log] [blame]
// Copyright 2015 Google Inc. All rights reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build ignore
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <queue>
#include <set>
#include <string>
#include <vector>
#define CHECK(expr) do { \
if (!(expr)) { \
fprintf(stderr, "%s:%d: " #expr, __FILE__, __LINE__); \
exit(1); \
} \
} while(0)
#define PCHECK(expr) do { \
if ((expr) < 0) { \
fprintf(stderr, "%s:%d: ", __FILE__, __LINE__); \
perror(""); \
exit(1); \
} \
} while(0)
using namespace std;
class Para;
struct Task {
Task() : echo(false), ignore_error(false), status(-1), signal(-1) {
stdout_pipe[0] = -1;
stdout_pipe[1] = -1;
stderr_pipe[0] = -1;
stderr_pipe[1] = -1;
}
~Task() {
if (stdout_pipe[0] >= 0)
PCHECK(close(stdout_pipe[0]));
if (stderr_pipe[0] >= 0)
PCHECK(close(stderr_pipe[0]));
}
string output;
string shell;
string cmd;
bool echo;
bool ignore_error;
pid_t pid;
int stdout_pipe[2];
int stderr_pipe[2];
string stdout_buf;
string stderr_buf;
int status;
int signal;
};
class TaskProvider {
public:
virtual ~TaskProvider() {}
virtual int GetFD() = 0;
virtual void PollFD(Para* para, int fd) = 0;
virtual void OnStarted(Task* t) = 0;
virtual void OnFinished(Task* t) = 0;
};
class Para {
public:
Para(TaskProvider* provider, int num_jobs);
~Para();
void AddTask(Task* task);
void Loop();
void Done();
static void WakeUp(int);
private:
void WaitChildren();
void RunCommands();
TaskProvider* provider_;
size_t num_jobs_;
int sig_pipe_[2];
int provider_fd_;
queue<Task*> tasks_;
set<Task*> running_;
bool done_;
};
class StdinTaskProvider : public TaskProvider {
public:
virtual ~StdinTaskProvider() {}
virtual int GetFD() { return STDIN_FILENO; }
virtual void PollFD(Para* para, int fd);
virtual void OnStarted(Task* t);
virtual void OnFinished(Task* t);
private:
string buf_;
};
class KatiTaskProvider : public TaskProvider {
public:
virtual ~KatiTaskProvider() {}
virtual int GetFD() { return STDIN_FILENO; }
virtual void PollFD(Para* para, int fd);
virtual void OnStarted(Task* t);
virtual void OnFinished(Task* t);
private:
string buf_;
};
Para* g_para_;
Para::Para(TaskProvider* provider, int num_jobs)
: provider_(provider),
num_jobs_(num_jobs),
done_(false) {
g_para_ = this;
PCHECK(pipe(sig_pipe_));
sigset_t sigmask;
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGCHLD);
PCHECK(sigprocmask(SIG_BLOCK, &sigmask, NULL));
CHECK(signal(SIGCHLD, &Para::WakeUp) != SIG_ERR);
provider_fd_ = provider_->GetFD();
}
Para::~Para() {
PCHECK(close(sig_pipe_[0]));
PCHECK(close(sig_pipe_[1]));
}
void Para::AddTask(Task* task) {
task->pid = 0;
tasks_.push(task);
}
static void SetFd(int fd, fd_set* fdset, int* nfds) {
if (fd < 0)
return;
FD_SET(fd, fdset);
*nfds = max(*nfds, fd);
}
static void readOutput(int* fd, string* buf) {
char b[4096];
ssize_t r = read(*fd, b, sizeof(b));
if (r < 0 && errno != EINTR)
return;
PCHECK(r);
if (r == 0) {
PCHECK(close(*fd));
*fd = -1;
return;
}
size_t l = buf->size();
buf->resize(l + r);
memcpy(&((*buf)[l]), b, r);
}
void Para::Loop() {
sigset_t sigmask;
sigemptyset(&sigmask);
while (!done_ || !tasks_.empty() || !running_.empty()) {
int nfds = 0;
fd_set rd;
FD_ZERO(&rd);
if (!done_)
SetFd(provider_fd_, &rd, &nfds);
SetFd(sig_pipe_[0], &rd, &nfds);
for (Task* t : running_) {
SetFd(t->stdout_pipe[0], &rd, &nfds);
SetFd(t->stderr_pipe[0], &rd, &nfds);
}
int r = pselect(nfds, &rd, NULL, NULL, NULL, &sigmask);
PCHECK(r && errno != EINTR);
if (FD_ISSET(sig_pipe_[0], &rd)) {
WaitChildren();
RunCommands();
continue;
}
if (FD_ISSET(provider_fd_, &rd)) {
provider_->PollFD(this, provider_fd_);
RunCommands();
continue;
}
for (Task* t : running_) {
if (t->stdout_pipe[0] >= 0 && FD_ISSET(t->stdout_pipe[0], &rd))
readOutput(&t->stdout_pipe[0], &t->stdout_buf);
if (t->stderr_pipe[0] >= 0 && FD_ISSET(t->stderr_pipe[0], &rd))
readOutput(&t->stderr_pipe[0], &t->stderr_buf);
}
}
}
void Para::Done() {
done_ = true;
}
void Para::WakeUp(int) {
char c = 42;
PCHECK(write(g_para_->sig_pipe_[1], &c, 1));
}
void Para::WaitChildren() {
char c = 0;
PCHECK(read(sig_pipe_[0], &c, 1));
CHECK(c == 42);
vector<Task*> finished;
for (Task* task : running_) {
int status;
pid_t pid = waitpid(task->pid, &status, WNOHANG);
if (WIFSIGNALED(status)) {
task->signal = WTERMSIG(status);
} else if (WIFEXITED(status)) {
task->status = WEXITSTATUS(status);
} else {
PCHECK(false);
}
PCHECK(pid);
if (pid == 0) {
continue;
}
CHECK(pid == task->pid);
while (task->stdout_pipe[0] >= 0)
readOutput(&task->stdout_pipe[0], &task->stdout_buf);
while (task->stderr_pipe[0] >= 0)
readOutput(&task->stderr_pipe[0], &task->stderr_buf);
// TODO: Handle error.
finished.push_back(task);
}
for (Task* task : finished) {
running_.erase(task);
provider_->OnFinished(task);
delete task;
}
}
void Para::RunCommands() {
while (!tasks_.empty() && (running_.size() < num_jobs_ || num_jobs_ == 0)) {
Task* task = tasks_.front();
tasks_.pop();
provider_->OnStarted(task);
PCHECK(pipe(task->stdout_pipe));
PCHECK(pipe(task->stderr_pipe));
task->pid = fork();
if (task->pid == 0) {
PCHECK(close(task->stdout_pipe[0]));
PCHECK(close(task->stderr_pipe[0]));
PCHECK(dup2(task->stdout_pipe[1], STDOUT_FILENO));
PCHECK(dup2(task->stderr_pipe[1], STDERR_FILENO));
PCHECK(close(task->stdout_pipe[1]));
PCHECK(close(task->stderr_pipe[1]));
const char* args[] = {
task->shell.c_str(),
"-c",
task->cmd.c_str(),
NULL
};
PCHECK(execvp(args[0], const_cast<char* const*>(args)));
abort();
}
PCHECK(close(task->stdout_pipe[1]));
PCHECK(close(task->stderr_pipe[1]));
running_.insert(task);
}
}
void StdinTaskProvider::PollFD(Para* para, int fd) {
char buf[4096];
ssize_t r = read(fd, buf, sizeof(buf));
PCHECK(r);
if (r == 0) {
para->Done();
return;
}
buf_.append(buf, r);
for (;;) {
size_t index = buf_.find('\n');
if (index == string::npos) {
break;
}
Task* task = new Task();
task->shell = "/bin/sh";
task->cmd = buf_.substr(0, index);
if (task->cmd.empty())
continue;
para->AddTask(task);
buf_ = buf_.substr(index + 1);
}
}
void StdinTaskProvider::OnStarted(Task*) {
}
void StdinTaskProvider::OnFinished(Task* t) {
fprintf(stdout, "%s", t->stdout_buf.c_str());
fprintf(stderr, "%s", t->stderr_buf.c_str());
}
struct Runner {
string output;
string cmd;
string shell;
bool echo;
bool ignore_error;
};
static void recvData(int fd, void* d, size_t sz) {
size_t s = 0;
while (s < sz) {
ssize_t r = read(fd, reinterpret_cast<char*>(d) + s, sz - s);
if (r < 0 && errno == EINTR)
continue;
PCHECK(r);
if (r == 0) {
exit(1);
}
s += r;
}
}
static int recvInt(int fd) {
int v;
recvData(fd, &v, sizeof(v));
return v;
}
static void recvString(int fd, string* s) {
int l = recvInt(fd);
s->resize(l);
recvData(fd, &((*s)[0]), l);
}
static void recvTasks(int fd, vector<Task*>* tasks) {
int l = recvInt(fd);
for (int i = 0; i < l; i++) {
Task* r = new Task();
recvString(fd, &r->output);
recvString(fd, &r->cmd);
recvString(fd, &r->shell);
r->echo = recvInt(fd);
r->ignore_error = recvInt(fd);
tasks->push_back(r);
}
}
void KatiTaskProvider::PollFD(Para* para, int fd) {
vector<Task*> tasks;
recvTasks(fd, &tasks);
#if 0
for (Task* t : tasks) {
para->AddTask(t);
}
#else
Task* task = tasks[0];
for (Task* t : tasks) {
if (task == t)
continue;
task->cmd += " ; ";
task->cmd += t->cmd;
}
para->AddTask(task);
#endif
}
static void sendData(int fd, const void* d, size_t sz) {
size_t s = 0;
while (s < sz) {
ssize_t r = write(fd, reinterpret_cast<const char*>(d) + s, sz - s);
if (r < 0 && errno == EINTR)
continue;
PCHECK(r);
if (r == 0) {
exit(1);
}
s += r;
}
}
static void sendInt(int fd, int v) {
sendData(fd, &v, sizeof(v));
}
static void sendString(int fd, const string& s) {
sendInt(fd, s.size());
sendData(fd, s.data(), s.size());
}
static void sendResult(int fd, Task* t) {
sendString(fd, t->output);
sendString(fd, t->stdout_buf);
sendString(fd, t->stderr_buf);
sendInt(fd, t->status);
sendInt(fd, t->signal);
}
void KatiTaskProvider::OnStarted(Task* t) {
sendResult(STDOUT_FILENO, t);
}
void KatiTaskProvider::OnFinished(Task* t) {
sendResult(STDOUT_FILENO, t);
}
int GetNumCpus() {
// TODO: Implement.
return 4;
}
int main(int argc, char* argv[]) {
int num_jobs = -1;
bool from_kati = false;
for (int i = 1; i < argc; i++) {
char* arg = argv[i];
if (!strncmp(arg, "-j", 2)) {
num_jobs = atoi(arg + 2);
} else if (!strcmp(arg, "--kati")) {
from_kati = true;
}
}
if (num_jobs < 0) {
num_jobs = GetNumCpus();
}
TaskProvider* provider = NULL;
if (from_kati) {
provider = new KatiTaskProvider();
} else {
provider = new StdinTaskProvider();
}
Para para(provider, num_jobs);
para.Loop();
}