| syntax = "proto3"; |
| |
| package tensorflow; |
| |
| import "tensorflow/compiler/xla/pjrt/distributed/protocol.proto"; |
| import "tensorflow/core/framework/device_attributes.proto"; |
| |
| option go_package = "github.com/tensorflow/tensorflow/tensorflow/go/core/protobuf/for_core_protos_go_proto"; |
| |
| // Status payload for all coordination service errors. |
| // Note: an empty proto may be set if the error is triggered by the task's own |
| // agent calls (i.e. not propagated by the service from another remote task). |
| message CoordinationServiceError { |
| // Denotes which worker hit the error. If unset, the error originated from the |
| // same task that is processing this error. |
| string job = 1; |
| int32 task = 2; |
| // If true, error is reported via the agent API by the user (and not an |
| // internal service error). |
| bool is_reported_error = 3; |
| } |
| |
| // Represent device information from different runtimes. |
| message TfDeviceList { |
| repeated DeviceAttributes devices = 1; |
| } |
| message XlaDeviceList { |
| xla.GlobalTopologyProto devices = 1; |
| } |
| message CoordinationServiceDeviceInfo { |
| oneof type { |
| TfDeviceList tf = 1; |
| XlaDeviceList xla = 2; |
| } |
| } |
| |
| // Request and response messages for registering a worker to the cluster leader. |
| // Use `job` and `task` to represent the role of the worker, and use |
| // `incarnation` to uniquely identify a worker process. Leader responds with its |
| // `incarnation` to identify a leader process. |
| message RegisterWorkerRequest { |
| string job = 1; |
| int32 task = 2; |
| fixed64 incarnation = 3; |
| // Moved the field `local_device_attributes` from this request message to |
| // WaitForAllTasksRequest defined below. |
| reserved 4; |
| } |
| |
| message RegisterWorkerResponse { |
| fixed64 leader_incarnation = 1; |
| } |
| |
| // Request and response messages for sending heartbeats. |
| message HeartbeatRequest { |
| string job = 1; |
| int32 task = 2; |
| fixed64 incarnation = 3; |
| } |
| |
| message HeartbeatResponse { |
| fixed64 leader_incarnation = 1; |
| // If there are failures in cluster, use additional metadata in response to |
| // broadcast error code and message to other workers. |
| } |
| |
| // Request and response messages for waiting for all tasks. |
| message WaitForAllTasksRequest { |
| string job = 1; |
| int32 task = 2; |
| // Removed field that specifically used TF device info. |
| reserved 3; |
| // All local device attributes on the request sender. |
| CoordinationServiceDeviceInfo local_device_info = 4; |
| } |
| |
| message WaitForAllTasksResponse { |
| fixed64 leader_incarnation = 1; |
| // Removed field that specifically used TF device info. |
| reserved 2; |
| // All devices in the cluster. |
| CoordinationServiceDeviceInfo cluster_device_info = 3; |
| } |
| |
| // Request and response messages for reporting errors to task. |
| message ReportErrorToAgentRequest { |
| int32 error_code = 1; |
| string error_message = 2; |
| // Removed fields that are embedded in payload. |
| reserved 3, 4; |
| CoordinationServiceError error_payload = 5; |
| } |
| |
| message ReportErrorToAgentResponse {} |
| |
| // Request and response messages for reporting errors to service instance. |
| message ReportErrorToServiceRequest { |
| int32 error_code = 1; |
| string error_message = 2; |
| string source_job = 3; |
| int32 source_task = 4; |
| } |
| |
| message ReportErrorToServiceResponse {} |
| |
| // Message for configuration key value. |
| // Key is structured like Unix file system, with multiple levels of directory |
| // names separated by the slash ('/') characters. |
| message KeyValueEntry { |
| string key = 1; |
| bytes value = 2; |
| } |
| |
| // Request and response messages for inserting configuration key-value data. |
| message InsertKeyValueRequest { |
| KeyValueEntry kv = 1; |
| } |
| |
| message InsertKeyValueResponse {} |
| |
| // Request and response messages for getting configuration key-value data. |
| message GetKeyValueRequest { |
| string key = 1; |
| } |
| |
| message GetKeyValueResponse { |
| KeyValueEntry kv = 1; |
| } |
| |
| // Request and response messages for deleting configuration key-value data. |
| // When is_directory is true, delete key-values recursively under `key`. |
| message DeleteKeyValueRequest { |
| string key = 1; |
| bool is_directory = 2; |
| } |
| |
| message DeleteKeyValueResponse {} |
| |
| // Coordination Service defines a TensorFlow service that controls and |
| // coordinates distributed execution in a cluster of multiple workers. |
| // |
| // The service keeps track of the cluster configuration and the state of cluster |
| // members or the leader depending on the role of the current worker. The |
| // distributed runtime leverages this service to coordinate and perform cluster |
| // initialization, check the healthiness of workers, and propagate error |
| // messages to the cluster. |
| service CoordinationService { |
| // Register task to coordination service so that the service starts to track |
| // liveness of the task. RPC blocks and returns only when it registers to |
| // the service successfully, or error happens in the registering process. |
| rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerResponse); |
| |
| // Heartbeat message from task to coordination service. Heartbeat is sent from |
| // a task to refresh its timestamp on leader to avoid it becoming stale. |
| // RPC responds immediately after refreshing the timestamp on leader. |
| rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); |
| |
| // Wait for all tasks in the cluster to be up and running. The RPC request |
| // only gets responded when all workers are registered, or some error occurs. |
| rpc WaitForAllTasks(WaitForAllTasksRequest) returns (WaitForAllTasksResponse); |
| |
| // Report error to the task. RPC sets the receiving instance of coordination |
| // service agent to error state permanently. |
| // TODO(b/195990880): Consider splitting this into a different RPC service. |
| rpc ReportErrorToAgent(ReportErrorToAgentRequest) |
| returns (ReportErrorToAgentResponse); |
| |
| // Report task error to coordination service. RPC sets the service-side task |
| // state to error, and propagate the error to other tasks in the cluster. |
| rpc ReportErrorToService(ReportErrorToServiceRequest) |
| returns (ReportErrorToServiceResponse); |
| |
| // Insert configuration key-value that will be accessible to all cluster |
| // workers. The key can be formatted as Unix file path with hierarchy. The |
| // coordination service key-value store should only be used for cluster |
| // configuration data. |
| rpc InsertKeyValue(InsertKeyValueRequest) returns (InsertKeyValueResponse); |
| |
| // Get configuration key-value. The request blocks until the key-value data |
| // becomes available (i.e., set by a worker in the cluster). |
| rpc GetKeyValue(GetKeyValueRequest) returns (GetKeyValueResponse); |
| |
| // Delete configuration key-value. If is_directory is set in request, |
| // recursively clean up all key-values under the path specified by `key`. |
| rpc DeleteKeyValue(DeleteKeyValueRequest) returns (DeleteKeyValueResponse); |
| } |