blob: ce13d87ed73cd3ca074bb1641eefc9bd37897e3e [file] [log] [blame]
/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/status.h"
#include <stdio.h>
#include <deque>
#include <functional>
#include "absl/base/call_once.h"
#include "absl/strings/escaping.h"
#include "absl/strings/match.h"
#include "absl/types/optional.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/stacktrace.h"
#include "tensorflow/core/platform/str_util.h"
#include "tensorflow/core/platform/strcat.h"
#include "tensorflow/core/platform/stringprintf.h"
#include "tensorflow/core/protobuf/error_codes.pb.h"
#include "tensorflow/core/protobuf/status.pb.h"
namespace tensorflow {
namespace {
// Log sink is used to collect recent warning and error log messages to be
// attached to the error status.
class StatusLogSink : public TFLogSink {
public:
static StatusLogSink* GetInstance() {
static StatusLogSink* sink = new StatusLogSink();
return sink;
}
void enable() {
absl::call_once(flag_, [this] {
num_messages_ = 5; // default to 5 messages
if (const char* num_msgs_str =
getenv("TF_WORKER_NUM_FORWARDED_LOG_MESSAGES")) {
if (!absl::SimpleAtoi(num_msgs_str, &num_messages_)) {
LOG(WARNING) << "Failed to parse env variable "
"TF_WORKER_NUM_WARNING_ERROR_LOG_IN_STATUS="
<< num_msgs_str << " as int. Using the default value "
<< num_messages_ << ".";
}
}
if (num_messages_ > 0) {
TFAddLogSink(this);
}
});
}
void GetMessages(std::vector<std::string>* logs) TF_LOCKS_EXCLUDED(mu_) {
mutex_lock lock(mu_);
for (auto& msg : messages_) {
logs->push_back(msg);
}
}
void Send(const TFLogEntry& entry) override TF_LOCKS_EXCLUDED(mu_) {
if (entry.log_severity() < absl::LogSeverity::kWarning) return;
mutex_lock lock(mu_);
messages_.emplace_back(entry.ToString());
if (messages_.size() > static_cast<size_t>(num_messages_)) {
messages_.pop_front();
}
}
private:
mutex mu_;
// for allowing repeated/concurrent calls to enable()
absl::once_flag flag_;
int num_messages_ = 0;
std::deque<std::string> messages_ TF_GUARDED_BY(mu_);
};
} // namespace
Status::Status(tensorflow::error::Code code, absl::string_view msg,
std::vector<StackFrame>&& stack_trace) {
assert(code != tensorflow::error::OK);
state_ = std::unique_ptr<State>(new State);
state_->code = code;
state_->msg = std::string(msg);
state_->stack_trace = std::move(stack_trace);
VLOG(5) << "Generated non-OK status: \"" << *this << "\". "
<< CurrentStackTrace();
}
void Status::Update(const Status& new_status) {
if (ok()) {
*this = new_status;
}
}
void Status::SlowCopyFrom(const State* src) {
if (src == nullptr) {
state_ = nullptr;
} else {
state_ = std::unique_ptr<State>(new State(*src));
}
}
const std::string& Status::empty_string() {
static string* empty = new string;
return *empty;
}
const std::vector<StackFrame>& Status::empty_stack_trace() {
static std::vector<StackFrame>* empty = new std::vector<StackFrame>();
return *empty;
}
std::string error_name(error::Code code) {
switch (code) {
case tensorflow::error::OK:
return "OK";
break;
case tensorflow::error::CANCELLED:
return "CANCELLED";
break;
case tensorflow::error::UNKNOWN:
return "UNKNOWN";
break;
case tensorflow::error::INVALID_ARGUMENT:
return "INVALID_ARGUMENT";
break;
case tensorflow::error::DEADLINE_EXCEEDED:
return "DEADLINE_EXCEEDED";
break;
case tensorflow::error::NOT_FOUND:
return "NOT_FOUND";
break;
case tensorflow::error::ALREADY_EXISTS:
return "ALREADY_EXISTS";
break;
case tensorflow::error::PERMISSION_DENIED:
return "PERMISSION_DENIED";
break;
case tensorflow::error::UNAUTHENTICATED:
return "UNAUTHENTICATED";
break;
case tensorflow::error::RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED";
break;
case tensorflow::error::FAILED_PRECONDITION:
return "FAILED_PRECONDITION";
break;
case tensorflow::error::ABORTED:
return "ABORTED";
break;
case tensorflow::error::OUT_OF_RANGE:
return "OUT_OF_RANGE";
break;
case tensorflow::error::UNIMPLEMENTED:
return "UNIMPLEMENTED";
break;
case tensorflow::error::INTERNAL:
return "INTERNAL";
break;
case tensorflow::error::UNAVAILABLE:
return "UNAVAILABLE";
break;
case tensorflow::error::DATA_LOSS:
return "DATA_LOSS";
break;
default:
char tmp[30];
snprintf(tmp, sizeof(tmp), "UNKNOWN_CODE(%d)", static_cast<int>(code));
return tmp;
break;
}
}
std::string Status::ToString() const {
if (state_ == nullptr) {
return "OK";
} else {
std::string result(error_name(state_->code));
result += ": ";
result += state_->msg;
for (const std::pair<const std::string, std::string>& element :
state_->payloads) {
absl::StrAppend(&result, " [", element.first, "='",
absl::CHexEscape(element.second), "']");
}
return result;
}
}
void Status::IgnoreError() const {
// no-op
}
void Status::SetPayload(absl::string_view type_url, absl::string_view payload) {
if (ok()) return;
state_->payloads[std::string(type_url)] = std::string(payload);
}
absl::optional<absl::string_view> Status::GetPayload(
absl::string_view type_url) const {
if (ok()) return absl::nullopt;
auto payload_iter = state_->payloads.find(std::string(type_url));
if (payload_iter == state_->payloads.end()) return absl::nullopt;
return absl::string_view(payload_iter->second);
}
bool Status::ErasePayload(absl::string_view type_url) {
if (ok()) return false;
auto payload_iter = state_->payloads.find(std::string(type_url));
if (payload_iter == state_->payloads.end()) return false;
state_->payloads.erase(payload_iter);
return true;
}
void Status::ForEachPayload(
const std::function<void(absl::string_view, absl::string_view)>& visitor)
const {
if (ok()) return;
for (const auto& payload : state_->payloads) {
visitor(payload.first, payload.second);
}
}
std::ostream& operator<<(std::ostream& os, const Status& x) {
os << x.ToString();
return os;
}
std::string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
const char* msg) {
std::string r("Non-OK-status: ");
r += msg;
r += " status: ";
r += v.ToString();
// Leaks string but this is only to be used in a fatal error message
return new std::string(r);
}
StatusGroup::StatusGroup() {}
StatusGroup::StatusGroup(std::initializer_list<Status> statuses) {
for (const Status& s : statuses) {
Update(s);
}
}
static constexpr const char kDerivedStatusProtoUrl[] =
"type.googleapis.com/tensorflow.DerivedStatus";
Status StatusGroup::MakeDerived(const Status& s) {
if (IsDerived(s)) {
return s;
} else {
Status derived(s);
// TODO(b/200167936): Serialize an instance of DerivedStatus proto instead
// of using the string directly. The string is never used so it is not
// causing any issues at the moment.
derived.SetPayload(kDerivedStatusProtoUrl, "");
return derived;
}
}
bool StatusGroup::IsDerived(const Status& s) {
return s.GetPayload(kDerivedStatusProtoUrl).has_value();
}
void StatusGroup::ConfigureLogHistory() {
StatusLogSink::GetInstance()->enable();
}
void StatusGroup::Update(const Status& s) {
if (s.ok()) {
++num_ok_;
} else {
ok_ = false;
if (IsDerived(s)) {
derived_.insert(s);
} else {
non_derived_.insert(s);
}
}
}
static constexpr int kMaxAggregatedStatusMessageSize = 8 * 1024;
static constexpr int kMaxAttachedLogMessageSize = 512;
std::unordered_map<std::string, std::string> StatusGroup::GetPayloads() const {
std::unordered_map<std::string, std::string> payloads;
auto capture_payload = [&payloads](absl::string_view key,
absl::string_view value) {
payloads[std::string(key)] = std::string(value);
};
for (const auto& status : derived_) {
status.ForEachPayload(capture_payload);
}
// If a key appears in both derived_ and non_derived_ payloads, then the
// non_derived_ payload receives priority.
for (const auto& status : non_derived_) {
status.ForEachPayload(capture_payload);
}
payloads.erase(kDerivedStatusProtoUrl);
return payloads;
}
Status MakeStatus(
tensorflow::error::Code code, absl::string_view message,
const std::unordered_map<std::string, std::string>& payloads) {
Status status(code, message);
for (const auto& payload : payloads) {
status.SetPayload(payload.first, payload.second);
}
return status;
}
std::string MakeString(const Status& status) {
return absl::StrCat(error_name(status.code()), ": ", status.error_message());
}
// Summarize all the status objects in the StatusGroup. This is used when
// individual Status objects in the StatusGroup are not already summarized.
Status StatusGroup::as_summary_status() const {
if (ok_) {
return Status::OK();
}
// Gather recent logs as a string
auto get_recent_logs = [this]() -> std::string {
if (!recent_logs_.empty()) {
std::vector<std::string> fmt;
fmt.push_back("\nRecent warning and error logs:");
for (auto& log : recent_logs_) {
// Add an indentation to make it look nicer.
fmt.push_back(" " + log.substr(0, kMaxAttachedLogMessageSize));
}
return absl::StrJoin(fmt, "\n");
} else {
return "";
}
};
// If only one root status is found, do not add summary header and footer.
if (non_derived_.size() == 1) {
return MakeStatus(non_derived_.begin()->code(),
strings::StrCat(non_derived_.begin()->error_message(),
get_recent_logs()),
GetPayloads());
}
if (!non_derived_.empty()) {
std::vector<std::string> fmt;
fmt.push_back(
strings::Printf("%zu root error(s) found.", non_derived_.size()));
int index = 0;
auto code = tensorflow::error::CANCELLED;
for (const auto& s : non_derived_) {
// NOTE: Avoid using CANCELLED as the code of summary status if the group
// contains other error code.
if (code == tensorflow::error::CANCELLED &&
s.code() != tensorflow::error::CANCELLED) {
code = s.code();
}
fmt.emplace_back(strings::StrCat(" (", index, ") ", MakeString(s)));
++index;
}
fmt.push_back(strings::Printf("%zu successful operations.", num_ok_));
fmt.push_back(
strings::Printf("%zu derived errors ignored.", derived_.size()));
std::string error_msg =
absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize);
return MakeStatus(code, strings::StrCat(error_msg, get_recent_logs()),
GetPayloads());
} else {
// All statuses are derived. Pick the first available status to return.
return MakeDerived(MakeStatus(derived_.begin()->code(),
derived_.begin()->error_message(),
GetPayloads()));
}
}
// Concatenate all the status objects in the StatusGroup. This is used when
// individual Status objects in the StatusGroup are already summarized Status.
Status StatusGroup::as_concatenated_status() const {
if (ok_) {
return Status::OK();
}
// If only one root status is found, return it directly.
if (non_derived_.size() == 1) {
return MakeStatus(non_derived_.begin()->code(),
non_derived_.begin()->error_message(), GetPayloads());
}
if (!non_derived_.empty()) {
std::vector<string> fmt;
fmt.emplace_back("\n=====================");
for (const auto& s : non_derived_) {
fmt.emplace_back(MakeString(s));
}
fmt.emplace_back("=====================\n");
return MakeStatus(
non_derived_.begin()->code(),
absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize),
GetPayloads());
} else {
// All statuses are derived. Pick the first available status to return.
// This should not happen in normal execution.
return MakeDerived(MakeStatus(derived_.begin()->code(),
derived_.begin()->error_message(),
GetPayloads()));
}
}
void StatusGroup::AttachLogMessages() {
recent_logs_.clear();
StatusLogSink::GetInstance()->GetMessages(&recent_logs_);
}
} // namespace tensorflow