blob: a661c34ef01b1afba05dedf3d7c6c2aef86245fa [file] [log] [blame]
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <memory>
#include <vector>
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/subprocess.h"
// 1) FYI from m3b@ about fork():
// A danger of calling fork() (as opposed to clone() or vfork()) is that if
// many people have used pthread_atfork() to acquire locks, fork() can deadlock,
// because it's unlikely that the locking order will be correct in a large
// program where different layers are unaware of one another and using
// pthread_atfork() independently.
//
// The danger of not calling fork() is that if libc managed to use
// pthread_atfork() correctly (for example, to lock the environment), you'd
// miss out on that protection. (But as far as I can see most libc's don't get
// that right; certainly glibc doesn't seem to.)
//
// clone() or vfork() are also frustrating because clone() exists only on Linux,
// and both clone(...CLONE_VM...) and vfork() have interesting issues around
// signals being delivered after the fork and before the exec. It may be
// possible to work around the latter by blocking all signals before the fork
// and unblocking them afterwards.
//
// Fortunately, most people haven't heard of pthread_atfork().
//
//
// 2) FYI from m3b@ about execv():
// The execv() call implicitly uses the libc global variable environ, which was
// copied by fork(), and that copy could have raced with a setenv() call in
// another thread, since libc implementations are usually not very careful about
// this. (glibc isn't careful, for example.)
//
// If this were inside libc, we could use locks or memory barriers to avoid the
// race, but as it is, I see nothing you can do. Even if you tried to copy the
// environment before the fork(), the copying could race with other threads
// calling setenv(). The good news is that few people call setenv().
//
// Amusingly, the standard says of fork(): "...to avoid errors, the child
// process may only execute async-signal-safe operations until such time as one
// of the exec functions is called." Notice that execve() is listed as
// async-signal-safe, but execv() is not, and the difference is just the
// handling of the environment.
namespace tensorflow {
SubProcess::SubProcess(int nfds)
: running_(false), pid_(-1), exec_path_(nullptr), exec_argv_(nullptr) {
// The input 'nfds' parameter is currently ignored and the internal constant
// 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
for (int i = 0; i < kNFds; i++) {
action_[i] = ACTION_CLOSE;
parent_pipe_[i] = -1;
child_pipe_[i] = -1;
}
}
SubProcess::~SubProcess() {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
pid_ = -1;
running_ = false;
FreeArgs();
ClosePipes();
}
void SubProcess::FreeArgs() {
free(exec_path_);
exec_path_ = nullptr;
if (exec_argv_) {
for (char** p = exec_argv_; *p != nullptr; p++) {
free(*p);
}
delete[] exec_argv_;
exec_argv_ = nullptr;
}
}
void SubProcess::ClosePipes() {
for (int i = 0; i < kNFds; i++) {
if (parent_pipe_[i] >= 0) {
close(parent_pipe_[i]);
parent_pipe_[i] = -1;
}
if (child_pipe_[i] >= 0) {
close(child_pipe_[i]);
child_pipe_[i] = -1;
}
}
}
void SubProcess::SetProgram(const string& file,
const std::vector<string>& argv) {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
if (running_) {
LOG(FATAL) << "SetProgram called after the process was started.";
return;
}
FreeArgs();
exec_path_ = strdup(file.c_str());
if (exec_path_ == nullptr) {
LOG(FATAL) << "SetProgram failed to allocate file string.";
return;
}
int argc = argv.size();
exec_argv_ = new char*[argc + 1];
for (int i = 0; i < argc; i++) {
exec_argv_[i] = strdup(argv[i].c_str());
if (exec_argv_[i] == nullptr) {
LOG(FATAL) << "SetProgram failed to allocate command argument.";
return;
}
}
exec_argv_[argc] = nullptr;
}
void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
if (running_) {
LOG(FATAL) << "SetChannelAction called after the process was started.";
} else if (!chan_valid(chan)) {
LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
} else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
(action != ACTION_DUPPARENT)) {
LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
} else {
action_[chan] = action;
}
}
bool SubProcess::Start() {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
if (running_) {
LOG(ERROR) << "Start called after the process was started.";
return false;
}
if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
LOG(ERROR) << "Start called without setting a program.";
return false;
}
// Create parent/child pipes for the specified channels and make the
// parent-side of the pipes non-blocking.
for (int i = 0; i < kNFds; i++) {
if (action_[i] == ACTION_PIPE) {
int pipe_fds[2];
if (pipe(pipe_fds) < 0) {
LOG(ERROR) << "Start cannot create pipe: " << strerror(errno);
ClosePipes();
return false;
}
// Handle the direction of the pipe (stdin vs stdout/err).
if (i == 0) {
parent_pipe_[i] = pipe_fds[1];
child_pipe_[i] = pipe_fds[0];
} else {
parent_pipe_[i] = pipe_fds[0];
child_pipe_[i] = pipe_fds[1];
}
if (fcntl(parent_pipe_[i], F_SETFL, O_NONBLOCK) < 0) {
LOG(ERROR) << "Start cannot make pipe non-blocking: "
<< strerror(errno);
ClosePipes();
return false;
}
if (fcntl(parent_pipe_[i], F_SETFD, FD_CLOEXEC) < 0) {
LOG(ERROR) << "Start cannot make pipe close-on-exec: "
<< strerror(errno);
ClosePipes();
return false;
}
}
}
// Start the child process and setup the file descriptors of both processes.
// See comment (1) in the header about issues with the use of fork().
pid_ = fork();
if (pid_ < 0) {
LOG(ERROR) << "Start cannot fork() child process: " << strerror(errno);
ClosePipes();
return false;
}
if (pid_ > 0) {
// Parent process: close the child-side pipes and return.
running_ = true;
for (int i = 0; i < kNFds; i++) {
if (child_pipe_[i] >= 0) {
close(child_pipe_[i]);
child_pipe_[i] = -1;
}
}
return true;
}
// Child process: close parent-side pipes and channels marked for closing.
// For pipe channels, replace their file descriptors with the pipes.
int devnull_fd = -1;
for (int i = 0; i < kNFds; i++) {
if (parent_pipe_[i] >= 0) {
close(parent_pipe_[i]);
parent_pipe_[i] = -1;
}
switch (action_[i]) {
case ACTION_DUPPARENT:
// Nothing to do, fork() took care of it.
break;
case ACTION_PIPE:
while (dup2(child_pipe_[i], i) < 0) {
if (!retry(errno)) {
_exit(1);
}
}
close(child_pipe_[i]);
child_pipe_[i] = -1;
break;
case ACTION_CLOSE:
default:
// Do not close stdin/out/err, instead redirect them to /dev/null so
// their file descriptors remain unavailable for reuse by open(), etc.
if (i <= CHAN_STDERR) {
if (devnull_fd < 0) {
while ((devnull_fd = open("/dev/null", O_RDWR, 0)) < 0) {
if (!retry(errno)) {
_exit(1);
}
}
}
while (dup2(devnull_fd, i) < 0) {
if (!retry(errno)) {
_exit(1);
}
}
} else {
close(i);
}
break;
}
}
if (devnull_fd >= 0) {
close(devnull_fd);
}
// Execute the child program.
// See comment (2) in the header about issues with the use of execv().
execv(exec_path_, exec_argv_);
_exit(1);
}
bool SubProcess::Wait() {
int status;
return WaitInternal(&status);
}
bool SubProcess::WaitInternal(int* status) {
// The waiter must release proc_mu_ while waiting in order for Kill() to work.
proc_mu_.lock();
bool running = running_;
pid_t pid = pid_;
proc_mu_.unlock();
bool ret = false;
if (running && (pid > 1)) {
pid_t cpid;
int cstat;
bool done = false;
while (!done) {
cpid = waitpid(pid, &cstat, 0);
if ((cpid < 0) && !retry(errno)) {
done = true;
} else if ((cpid == pid) && (WIFEXITED(cstat) || WIFSIGNALED(cstat))) {
*status = cstat;
ret = true;
done = true;
}
}
}
proc_mu_.lock();
if ((running_ == running) && (pid_ == pid)) {
running_ = false;
pid_ = -1;
}
proc_mu_.unlock();
return ret;
}
bool SubProcess::Kill(int signal) {
proc_mu_.lock();
bool running = running_;
pid_t pid = pid_;
proc_mu_.unlock();
bool ret = false;
if (running && (pid > 1)) {
ret = (kill(pid, signal) == 0);
}
return ret;
}
int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
string* stderr_output) {
struct pollfd fds[kNFds];
size_t nbytes[kNFds];
string* iobufs[kNFds];
int fd_count = 0;
proc_mu_.lock();
bool running = running_;
proc_mu_.unlock();
if (!running) {
LOG(ERROR) << "Communicate called without a running process.";
return 1;
}
// If SIGPIPE handling is the default action, change it to ignore SIGPIPE and
// keep it ignored, don't change it back. This is needed while communicating
// with the child process so the parent process can survive the death of the
// child process while it is writing to its stdin. If the application has
// registered a SIGPIPE handler, then let it deal with any signals generated
// by the premature death of the child process, don't overwrite its handler.
struct sigaction act;
if (sigaction(SIGPIPE, nullptr, &act) < 0) {
LOG(ERROR) << "Communicate cannot get SIGPIPE handler: " << strerror(errno);
return 1;
}
if (act.sa_handler == SIG_DFL) {
memset(&act, 0, sizeof(act));
act.sa_handler = SIG_IGN;
sigemptyset(&act.sa_mask);
if (sigaction(SIGPIPE, &act, nullptr) < 0) {
LOG(ERROR) << "Communicate cannot ignore SIGPIPE: " << strerror(errno);
return 1;
}
}
// Lock data_mu_ but not proc_mu_ while communicating with the child process
// in order for Kill() to be able to terminate the child from another thread.
data_mu_.lock();
// Initialize the poll() structures and buffer tracking.
for (int i = 0; i < kNFds; i++) {
if (action_[i] == ACTION_PIPE) {
switch (i) {
case CHAN_STDIN:
// Special case: if no data is given to send to the child process,
// close the pipe to unblock the child, and skip the file descriptor.
if (stdin_input == nullptr) {
close(parent_pipe_[i]);
parent_pipe_[i] = -1;
continue;
}
iobufs[fd_count] = const_cast<string*>(stdin_input);
break;
case CHAN_STDOUT:
iobufs[fd_count] = stdout_output;
break;
case CHAN_STDERR:
iobufs[fd_count] = stderr_output;
break;
default:
iobufs[fd_count] = nullptr;
break;
}
nbytes[fd_count] = 0;
fds[fd_count].fd = parent_pipe_[i];
fds[fd_count].events = (i > 0) ? POLLIN : POLLOUT;
fds[fd_count].revents = 0;
fd_count++;
}
}
// Loop communicating with the child process.
int fd_remain = fd_count;
char buf[4096];
while (fd_remain > 0) {
int n = poll(fds, fd_count, -1);
if ((n < 0) && !retry(errno)) {
LOG(ERROR) << "Communicate cannot poll(): " << strerror(errno);
fd_remain = 0;
} else if (n == 0) {
LOG(ERROR) << "Communicate cannot poll(): timeout not possible";
fd_remain = 0;
} else if (n > 0) {
// Handle the pipes ready for I/O.
for (int i = 0; i < fd_count; i++) {
if ((fds[i].revents & (POLLIN | POLLHUP)) != 0) {
// Read from one of the child's outputs.
ssize_t n = read(fds[i].fd, buf, sizeof(buf));
if (n > 0) {
if (iobufs[i] != nullptr) {
iobufs[i]->append(buf, n);
nbytes[i] += n;
}
} else if ((n == 0) || !retry(errno)) {
fds[i].fd = -1;
fd_remain--;
}
} else if ((fds[i].revents & POLLOUT) != 0) {
// Write to the child's stdin.
ssize_t n = iobufs[i]->size() - nbytes[i];
if (n > 0) {
n = write(fds[i].fd, iobufs[i]->c_str() + nbytes[i], n);
}
if (n >= 0) {
nbytes[i] += n;
if (nbytes[i] >= iobufs[i]->size()) {
fds[i].fd = -1;
fd_remain--;
// Close the child's stdin pipe to unblock the process.
close(parent_pipe_[CHAN_STDIN]);
parent_pipe_[CHAN_STDIN] = -1;
}
} else if (!retry(errno)) {
fds[i].fd = -1;
fd_remain--;
}
} else if ((fds[i].revents & (POLLERR | POLLNVAL)) != 0) {
fds[i].fd = -1;
fd_remain--;
}
}
}
}
data_mu_.unlock();
// Wait for the child process to exit and return its status.
int status;
return WaitInternal(&status) ? status : -1;
}
std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
std::unique_ptr<SubProcess> proc(new SubProcess());
proc->SetProgram(argv[0], argv);
proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
return proc;
}
} // namespace tensorflow