blob: 796f5416f6aa5f9173d3a04844faf5169f4b4b39 [file] [log] [blame]
syntax = "proto3";
package tensorflow;
import "tensorflow/compiler/xla/pjrt/distributed/protocol.proto";
import "tensorflow/core/framework/device_attributes.proto";
option go_package = "github.com/tensorflow/tensorflow/tensorflow/go/core/protobuf/for_core_protos_go_proto";
// Status payload for all coordination service errors.
// Note: an empty proto may be set if the error is triggered by the task's own
// agent calls (i.e. not propagated by the service from another remote task).
message CoordinationServiceError {
// Denotes which worker hit the error. If unset, the error originated from the
// same task that is processing this error.
string job = 1;
int32 task = 2;
// If true, error is reported via the agent API by the user (and not an
// internal service error).
bool is_reported_error = 3;
}
// Represent device information from different runtimes.
message TfDeviceList {
repeated DeviceAttributes devices = 1;
}
message XlaDeviceList {
xla.GlobalTopologyProto devices = 1;
}
message CoordinationServiceDeviceInfo {
oneof type {
TfDeviceList tf = 1;
XlaDeviceList xla = 2;
}
}
// Request and response messages for registering a worker to the cluster leader.
// Use `job` and `task` to represent the role of the worker, and use
// `incarnation` to uniquely identify a worker process. Leader responds with its
// `incarnation` to identify a leader process.
message RegisterWorkerRequest {
string job = 1;
int32 task = 2;
fixed64 incarnation = 3;
// Moved the field `local_device_attributes` from this request message to
// WaitForAllTasksRequest defined below.
reserved 4;
}
message RegisterWorkerResponse {
fixed64 leader_incarnation = 1;
}
// Request and response messages for sending heartbeats.
message HeartbeatRequest {
string job = 1;
int32 task = 2;
fixed64 incarnation = 3;
}
message HeartbeatResponse {
fixed64 leader_incarnation = 1;
// If there are failures in cluster, use additional metadata in response to
// broadcast error code and message to other workers.
}
// Request and response messages for waiting for all tasks.
message WaitForAllTasksRequest {
string job = 1;
int32 task = 2;
// Removed field that specifically used TF device info.
reserved 3;
// All local device attributes on the request sender.
CoordinationServiceDeviceInfo local_device_info = 4;
}
message WaitForAllTasksResponse {
fixed64 leader_incarnation = 1;
// Removed field that specifically used TF device info.
reserved 2;
// All devices in the cluster.
CoordinationServiceDeviceInfo cluster_device_info = 3;
}
// Request and response messages for reporting errors to task.
message ReportErrorToAgentRequest {
int32 error_code = 1;
string error_message = 2;
// Removed fields that are embedded in payload.
reserved 3, 4;
CoordinationServiceError error_payload = 5;
}
message ReportErrorToAgentResponse {}
// Request and response messages for reporting errors to service instance.
message ReportErrorToServiceRequest {
int32 error_code = 1;
string error_message = 2;
string source_job = 3;
int32 source_task = 4;
}
message ReportErrorToServiceResponse {}
// Message for configuration key value.
// Key is structured like Unix file system, with multiple levels of directory
// names separated by the slash ('/') characters.
message KeyValueEntry {
string key = 1;
bytes value = 2;
}
// Request and response messages for inserting configuration key-value data.
message InsertKeyValueRequest {
KeyValueEntry kv = 1;
}
message InsertKeyValueResponse {}
// Request and response messages for getting configuration key-value data.
message GetKeyValueRequest {
string key = 1;
}
message GetKeyValueResponse {
KeyValueEntry kv = 1;
}
// Request and response messages for deleting configuration key-value data.
// When is_directory is true, delete key-values recursively under `key`.
message DeleteKeyValueRequest {
string key = 1;
bool is_directory = 2;
}
message DeleteKeyValueResponse {}
// Coordination Service defines a TensorFlow service that controls and
// coordinates distributed execution in a cluster of multiple workers.
//
// The service keeps track of the cluster configuration and the state of cluster
// members or the leader depending on the role of the current worker. The
// distributed runtime leverages this service to coordinate and perform cluster
// initialization, check the healthiness of workers, and propagate error
// messages to the cluster.
service CoordinationService {
// Register task to coordination service so that the service starts to track
// liveness of the task. RPC blocks and returns only when it registers to
// the service successfully, or error happens in the registering process.
rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerResponse);
// Heartbeat message from task to coordination service. Heartbeat is sent from
// a task to refresh its timestamp on leader to avoid it becoming stale.
// RPC responds immediately after refreshing the timestamp on leader.
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
// Wait for all tasks in the cluster to be up and running. The RPC request
// only gets responded when all workers are registered, or some error occurs.
rpc WaitForAllTasks(WaitForAllTasksRequest) returns (WaitForAllTasksResponse);
// Report error to the task. RPC sets the receiving instance of coordination
// service agent to error state permanently.
// TODO(b/195990880): Consider splitting this into a different RPC service.
rpc ReportErrorToAgent(ReportErrorToAgentRequest)
returns (ReportErrorToAgentResponse);
// Report task error to coordination service. RPC sets the service-side task
// state to error, and propagate the error to other tasks in the cluster.
rpc ReportErrorToService(ReportErrorToServiceRequest)
returns (ReportErrorToServiceResponse);
// Insert configuration key-value that will be accessible to all cluster
// workers. The key can be formatted as Unix file path with hierarchy. The
// coordination service key-value store should only be used for cluster
// configuration data.
rpc InsertKeyValue(InsertKeyValueRequest) returns (InsertKeyValueResponse);
// Get configuration key-value. The request blocks until the key-value data
// becomes available (i.e., set by a worker in the cluster).
rpc GetKeyValue(GetKeyValueRequest) returns (GetKeyValueResponse);
// Delete configuration key-value. If is_directory is set in request,
// recursively clean up all key-values under the path specified by `key`.
rpc DeleteKeyValue(DeleteKeyValueRequest) returns (DeleteKeyValueResponse);
}