Add more logs to http protocol
Bug: 306242121
Test: atest
Change-Id: Ic09549860bdc05a61e91d42058f8ff10e16ff5c4
diff --git a/federatedcompute/src/com/android/federatedcompute/services/http/HttpFederatedProtocol.java b/federatedcompute/src/com/android/federatedcompute/services/http/HttpFederatedProtocol.java
index a6b53e6..a55fcf4 100644
--- a/federatedcompute/src/com/android/federatedcompute/services/http/HttpFederatedProtocol.java
+++ b/federatedcompute/src/com/android/federatedcompute/services/http/HttpFederatedProtocol.java
@@ -50,7 +50,6 @@
/** Implements a single session of HTTP-based federated compute protocol. */
public final class HttpFederatedProtocol {
public static final String TAG = "HttpFederatedProtocol";
-
private final String mClientVersion;
private final String mPopulationName;
private final HttpClient mHttpClient;
@@ -83,20 +82,17 @@
getTaskAssignmentHttpResponse ->
getTaskAssignment(getTaskAssignmentHttpResponse),
getLightweightExecutor());
-
ListenableFuture<FederatedComputeHttpResponse> planDataResponseFuture =
FluentFuture.from(taskAssignmentFuture)
.transformAsync(
taskAssignment -> fetchTaskResource(taskAssignment.getPlan()),
getBackgroundExecutor());
-
ListenableFuture<FederatedComputeHttpResponse> checkpointDataResponseFuture =
FluentFuture.from(taskAssignmentFuture)
.transformAsync(
taskAssignment ->
fetchTaskResource(taskAssignment.getInitCheckpoint()),
getBackgroundExecutor());
-
return Futures.whenAllSucceed(
taskAssignmentFuture, planDataResponseFuture, checkpointDataResponseFuture)
.callAsync(
@@ -114,7 +110,7 @@
/** Helper functions to reporting result and upload result. */
public FluentFuture<Void> reportResult(ComputationResult computationResult) {
- if (computationResult.isResultSuccess()) {
+ if (computationResult != null && computationResult.isResultSuccess()) {
return FluentFuture.from(performReportResult(computationResult))
.transformAsync(
reportResp ->
@@ -165,7 +161,6 @@
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse StartTaskAssignmentResponse proto", e);
}
- LogUtil.i(TAG, "start task assignment response: %s", taskAssignmentResponse);
if (taskAssignmentResponse.hasRejectionInfo()) {
throw new IllegalStateException("Device rejected by server.");
}
@@ -174,7 +169,13 @@
"Could not find both task assignment and rejection info.");
}
validateTaskAssignment(taskAssignmentResponse.getTaskAssignment());
- return taskAssignmentResponse.getTaskAssignment();
+ TaskAssignment taskAssignment = taskAssignmentResponse.getTaskAssignment();
+ LogUtil.d(
+ TAG,
+ "Receive CreateTaskAssignmentResponse: task name %s assignment id %s",
+ taskAssignment.getTaskName(),
+ taskAssignment.getAssignmentId());
+ return taskAssignment;
}
private void validateTaskAssignment(TaskAssignment taskAssignment) {
@@ -207,7 +208,6 @@
ClientOnlyPlan clientOnlyPlan;
try {
clientOnlyPlan = ClientOnlyPlan.parseFrom(planDataResponse.getPayload());
-
} catch (InvalidProtocolBufferException e) {
LogUtil.e(TAG, e, "Could not parse ClientOnlyPlan proto");
return Futures.immediateFailedFuture(
@@ -217,7 +217,6 @@
writeToFile(inputCheckpointFile, checkpointDataResponse.getPayload());
return Futures.immediateFuture(
new CheckinResult(inputCheckpointFile, clientOnlyPlan, taskAssignment));
-
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
@@ -225,7 +224,10 @@
private ListenableFuture<FederatedComputeHttpResponse> performReportResult(
ComputationResult computationResult) {
- Result result = computationResult.isResultSuccess() ? Result.COMPLETED : Result.FAILED;
+ Result result =
+ (computationResult != null && computationResult.isResultSuccess())
+ ? Result.COMPLETED
+ : Result.FAILED;
ReportResultRequest startDataUploadRequest =
ReportResultRequest.newBuilder().setResult(result).build();
String startDataUploadUri =
@@ -233,6 +235,14 @@
"/taskassignment/v1/population/%1$s/task/%2$s/aggregation"
+ "/%3$s/task-assignment/%4$s:report-result",
mPopulationName, mTaskId, mAggregationId, mAssignmentId);
+ LogUtil.d(
+ TAG,
+ "send ReportResultRequest: population name %s, task name %s,"
+ + " assignment id %s, result %s",
+ mPopulationName,
+ mTaskId,
+ mAssignmentId,
+ result.toString());
FederatedComputeHttpRequest httpRequest =
mTaskAssignmentRequestCreator.createProtoRequest(
startDataUploadUri,
@@ -250,7 +260,6 @@
validateHttpResponseStatus("ReportResult", httpResponse);
ReportResultResponse reportResultResponse =
ReportResultResponse.parseFrom(httpResponse.getPayload());
-
// TODO(b/297605806): better handle rejection info.
if (reportResultResponse.hasRejectionInfo()) {
return Futures.immediateFailedFuture(
@@ -261,7 +270,6 @@
!computationResult.getOutputCheckpointFile().isEmpty(),
"Output checkpoint file should not be empty");
byte[] outputBytes = readFileAsByteArray(computationResult.getOutputCheckpointFile());
-
UploadInstruction uploadInstruction = reportResultResponse.getUploadInstruction();
Preconditions.checkArgument(
!uploadInstruction.getUploadLocation().isEmpty(),
@@ -273,6 +281,13 @@
(key, value) -> {
requestHeader.put(key, value);
});
+ LogUtil.d(
+ TAG,
+ "Start upload training result: population name %s, task name %s,"
+ + " assignment id %s",
+ mPopulationName,
+ mTaskId,
+ mAssignmentId);
FederatedComputeHttpRequest httpUploadRequest =
FederatedComputeHttpRequest.create(
uploadInstruction.getUploadLocation(),
@@ -290,6 +305,8 @@
String stage, FederatedComputeHttpResponse httpResponse) {
if (!HTTP_OK_STATUS.contains(httpResponse.getStatusCode())) {
throw new IllegalStateException(stage + " failed: " + httpResponse.getStatusCode());
+ } else {
+ LogUtil.d(TAG, stage + " success.");
}
}