pw_transfer: Add typescript library
The current test suite ensures feature parity with the python transfer
implementation.
Read and write failed transfers will retry a specified number of times
before failing completely.
Tested against flakes by running
`bazel test //pw_transfer/ts:test -t- --test_output=errors
--test_strategy=exclusive --runs_per_test=5`
Bug: b/204818208
Change-Id: I3087ea124c8233fbd56cecdf2773384e20a4a3e6
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/66866
Reviewed-by: Paul Mathieu <paulmathieu@google.com>
Pigweed-Auto-Submit: Jared Weinstein <jaredweinstein@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/pw_presubmit/py/pw_presubmit/pigweed_presubmit.py b/pw_presubmit/py/pw_presubmit/pigweed_presubmit.py
index 5765b39..d0ada62 100755
--- a/pw_presubmit/py/pw_presubmit/pigweed_presubmit.py
+++ b/pw_presubmit/py/pw_presubmit/pigweed_presubmit.py
@@ -315,8 +315,8 @@
'//pw_multisink/...',
'//pw_polyfill/...',
'//pw_preprocessor/...',
- '//pw_protobuf_compiler/...',
'//pw_protobuf/...',
+ '//pw_protobuf_compiler/...',
'//pw_random/...',
'//pw_result/...',
'//pw_rpc/...',
@@ -336,6 +336,7 @@
'//pw_thread_stl/...',
'//pw_tool/...',
'//pw_toolchain/...',
+ '//pw_transfer/...',
'//pw_unit_test/...',
'//pw_varint/...',
'//pw_web_ui/...',
diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts
index f4e648d..0e44b4d 100644
--- a/pw_rpc/ts/client.ts
+++ b/pw_rpc/ts/client.ts
@@ -30,7 +30,7 @@
/**
* Object for managing RPC service and contained methods.
*/
-class ServiceClient {
+export class ServiceClient {
private service: Service;
private methods: MethodStub[] = [];
private methodsByName = new Map<string, MethodStub>();
@@ -48,12 +48,16 @@
method(methodName: string): MethodStub | undefined {
return this.methodsByName.get(methodName);
}
+
+ get id(): number {
+ return this.service.id;
+ }
}
/**
* Object for managing RPC channel and contained services.
*/
-class ChannelClient {
+export class ChannelClient {
readonly channel: Channel;
private services = new Map<string, ServiceClient>();
@@ -65,7 +69,13 @@
});
}
- private service(serviceName: string): ServiceClient | undefined {
+ /**
+ * Find a service client via its full name.
+ *
+ * For example:
+ * `service = client.channel().service('the.package.FooService');`
+ */
+ service(serviceName: string): ServiceClient | undefined {
return this.services.get(serviceName);
}
@@ -74,7 +84,6 @@
*
* For example:
* `method = client.channel().methodStub('the.package.AService.AMethod');`
- *
*/
methodStub(name: string): MethodStub | undefined {
const index = name.lastIndexOf('.');
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
index b363df9..7f10287 100644
--- a/pw_rpc/ts/method.ts
+++ b/pw_rpc/ts/method.ts
@@ -55,6 +55,10 @@
this.channel = channel;
this.rpc = new Rpc(channel, method.service, method);
}
+
+ get id(): number {
+ return this.method.id;
+ }
}
export class UnaryMethodStub extends MethodStub {
diff --git a/pw_transfer/BUILD.bazel b/pw_transfer/BUILD.bazel
index 5868f09..fca5fe1 100644
--- a/pw_transfer/BUILD.bazel
+++ b/pw_transfer/BUILD.bazel
@@ -16,6 +16,7 @@
load("//pw_protobuf_compiler:proto.bzl", "pw_proto_library")
load("@com_google_protobuf//:protobuf.bzl", "py_proto_library")
load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@rules_proto_grpc//js:defs.bzl", "js_proto_library")
package(default_visibility = ["//visibility:public"])
@@ -168,3 +169,8 @@
name = "transfer_proto_pb2",
srcs = ["transfer.proto"],
)
+
+js_proto_library(
+ name = "transfer_proto_tspb",
+ protos = [":transfer_proto"],
+)
diff --git a/pw_transfer/docs.rst b/pw_transfer/docs.rst
index c9f6da9..0cffdf4 100644
--- a/pw_transfer/docs.rst
+++ b/pw_transfer/docs.rst
@@ -108,6 +108,33 @@
except pw_transfer.Error as err:
print('Failed to write:', err.status)
+Typescript
+==========
+
+Provides a simple interface for transferring bulk data over pw_rpc.
+
+**Example**
+
+.. code-block:: typescript
+
+ import {Manager} from '@pigweed/pw_transfer'
+
+ const client = new CustomRpcClient();
+ service = client.channel()!.service('pw.transfer.Transfer')!;
+
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ manager.read(3).then((data: Uint8Array) => {
+ console.log(data);
+ }).catch(error => {
+ console.log(`Failed to read: ${error.status}`);
+ });
+
+ manager.write(2, textEncoder.encode('hello world'))
+ .catch(error => {
+ console.log(`Failed to read: ${error.status}`);
+ });
+
--------
Protocol
--------
diff --git a/pw_transfer/ts/BUILD.bazel b/pw_transfer/ts/BUILD.bazel
new file mode 100644
index 0000000..c84824f
--- /dev/null
+++ b/pw_transfer/ts/BUILD.bazel
@@ -0,0 +1,74 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+load("@build_bazel_rules_nodejs//:index.bzl", "js_library")
+load("@npm//@bazel/typescript:index.bzl", "ts_library", "ts_project")
+load("@npm//@bazel/jasmine:index.bzl", "jasmine_node_test")
+load("//pw_protobuf_compiler/ts:ts_proto_collection.bzl", "ts_proto_collection")
+
+package(default_visibility = ["//visibility:public"])
+
+ts_proto_collection(
+ name = "transfer_proto_collection",
+ js_proto_library = "@//pw_transfer:transfer_proto_tspb",
+ proto_library = "@//pw_transfer:transfer_proto",
+)
+
+ts_project(
+ name = "lib",
+ srcs = [
+ "client.ts",
+ "transfer.ts",
+ ],
+ declaration = True,
+ source_map = True,
+ deps = [
+ "//pw_rpc/ts:pw_rpc",
+ "//pw_status/ts:pw_status",
+ "//pw_transfer:transfer_proto_tspb",
+ "@npm//:node_modules", # can't use fine-grained deps
+ ],
+)
+
+js_library(
+ name = "pw_transfer",
+ package_name = "@pigweed/pw_transfer",
+ srcs = ["package.json"],
+ deps = [
+ ":lib",
+ ],
+)
+
+ts_library(
+ name = "test_lib",
+ srcs = [
+ "transfer_test.ts",
+ ],
+ deps = [
+ ":lib",
+ ":transfer_proto_collection",
+ "//pw_rpc:packet_proto_tspb",
+ "//pw_rpc/ts:lib",
+ "//pw_status/ts:pw_status",
+ "//pw_transfer:transfer_proto_tspb",
+ "@npm//@types/jasmine",
+ ],
+)
+
+jasmine_node_test(
+ name = "test",
+ srcs = [
+ ":test_lib",
+ ],
+)
diff --git a/pw_transfer/ts/BUILD.gn b/pw_transfer/ts/BUILD.gn
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/pw_transfer/ts/BUILD.gn
diff --git a/pw_transfer/ts/client.ts b/pw_transfer/ts/client.ts
new file mode 100644
index 0000000..5cce046
--- /dev/null
+++ b/pw_transfer/ts/client.ts
@@ -0,0 +1,264 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/** Client for the pw_transfer service, which transmits data over pw_rpc. */
+
+import {
+ BidirectionalStreamingCall,
+ BidirectionalStreamingMethodStub,
+ ServiceClient,
+} from '@pigweed/pw_rpc';
+import {Status} from '@pigweed/pw_status';
+import {Chunk} from 'transfer_proto_tspb/transfer_proto_tspb_pb/pw_transfer/transfer_pb';
+
+import {ReadTransfer, Transfer, WriteTransfer} from './transfer';
+
+type TransferDict = {
+ [key: number]: Transfer;
+};
+
+const DEFAULT_MAX_RETRIES = 3;
+const DEFAULT_RESPONSE_TIMEOUT_S = 2;
+const DEFAULT_INITIAL_RESPONSE_TIMEOUT = 4;
+
+/**
+ * A manager for transmitting data through an RPC TransferService.
+ *
+ * This should be initialized with an active Manager over an RPC channel. Only
+ * one instance of this class should exist for a configured RPC TransferService
+ * -- the Manager supports multiple simultaneous transfers.
+ *
+ * When created, a Manager starts a separate thread in which transfer
+ * communications and events are handled.
+ */
+export class Manager {
+ // Ongoing transfers in the service by ID
+ private readTransfers: TransferDict = {};
+ private writeTransfers: TransferDict = {};
+
+ // RPC streams for read and write transfers. These are shareable by
+ // multiple transfers of the same type.
+ private readStream?: BidirectionalStreamingCall;
+ private writeStream?: BidirectionalStreamingCall;
+
+ /**
+ * Initializes a Manager on top of a TransferService.
+ *
+ * Args:
+ * @param{ServiceClient} service: the pw_rpc transfer service
+ * client
+ * @param{number} defaultResponseTimeoutS: max time to wait between receiving
+ * packets
+ * @param{number} initialResponseTimeoutS: timeout for the first packet; may
+ * be longer to account for transfer handler initialization
+ * @param{number} maxRetries: number of times to retry after a timeout
+ */
+ constructor(
+ private service: ServiceClient,
+ private defaultResponseTimeoutS = DEFAULT_RESPONSE_TIMEOUT_S,
+ private initialResponseTimeoutS = DEFAULT_INITIAL_RESPONSE_TIMEOUT,
+ private maxRetries = DEFAULT_MAX_RETRIES
+ ) {}
+
+ /**
+ * Receives ("downloads") data from the server.
+ *
+ * @throws Throws an error when the transfer fails to complete.
+ */
+ async read(transferId: number): Promise<Uint8Array> {
+ if (transferId in this.readTransfers) {
+ throw new Error(`Read transfer ${transferId} already exists`);
+ }
+ const transfer = new ReadTransfer(
+ transferId,
+ this.sendReadChunkCallback,
+ this.defaultResponseTimeoutS,
+ this.maxRetries
+ );
+
+ this.startReadTransfer(transfer);
+
+ const status = await transfer.done;
+
+ delete this.readTransfers[transfer.id];
+ if (status !== Status.OK) {
+ throw new TransferError(transfer.id, transfer.status);
+ }
+ return transfer.data;
+ }
+
+ /** Begins a new read transfer, opening the stream if it isn't. */
+ startReadTransfer(transfer: Transfer): void {
+ this.readTransfers[transfer.id] = transfer;
+
+ if (this.readStream === undefined) {
+ this.openReadStream();
+ }
+ console.debug(`Starting new read transfer ${transfer.id}`);
+ transfer.begin();
+ }
+
+ /**
+ Transmits (uploads) data to the server.
+ *
+ * @param{number} transferId: ID of the write transfer
+ * @param{Uint8Array} data: Data to send to the server.
+ */
+ async write(transferId: number, data: Uint8Array): Promise<void> {
+ const transfer = new WriteTransfer(
+ transferId,
+ data,
+ this.sendWriteChunkCallback,
+ this.defaultResponseTimeoutS,
+ this.initialResponseTimeoutS,
+ this.maxRetries
+ );
+ this.startWriteTransfer(transfer);
+
+ const status = await transfer.done;
+
+ delete this.writeTransfers[transfer.id];
+ if (transfer.status !== Status.OK) {
+ throw new TransferError(transfer.id, transfer.status);
+ }
+ }
+
+ sendReadChunkCallback = (chunk: Chunk) => {
+ this.readStream!.send(chunk);
+ };
+
+ sendWriteChunkCallback = (chunk: Chunk) => {
+ this.writeStream!.send(chunk);
+ };
+
+ /** Begins a new write transfer, opening the stream if it isn't */
+ startWriteTransfer(transfer: Transfer): void {
+ this.writeTransfers[transfer.id] = transfer;
+
+ if (!this.writeStream) {
+ this.openWriteStream();
+ }
+
+ console.debug(`Starting new write transfer ${transfer.id}`);
+ transfer.begin();
+ }
+
+ private openReadStream(): void {
+ const readRpc = this.service.method(
+ 'Read'
+ )! as BidirectionalStreamingMethodStub;
+ this.readStream = readRpc.invoke(
+ (chunk: Chunk) => {
+ this.handleChunk(this.readTransfers, chunk);
+ },
+ () => {},
+ this.onReadError
+ );
+ }
+
+ private openWriteStream(): void {
+ const writeRpc = this.service.method(
+ 'Write'
+ )! as BidirectionalStreamingMethodStub;
+ this.writeStream = writeRpc.invoke(
+ (chunk: Chunk) => {
+ this.handleChunk(this.writeTransfers, chunk);
+ },
+ () => {},
+ this.onWriteError
+ );
+ }
+
+ /**
+ * Callback for an RPC error in the read stream.
+ */
+ private onReadError = (status: Status) => {
+ if (status === Status.FAILED_PRECONDITION) {
+ // FAILED_PRECONDITION indicates that the stream packet was not
+ // recognized as the stream is not open. This could occur if the
+ // server resets during an active transfer. Re-open the stream to
+ // allow pending transfers to continue.
+ this.openReadStream();
+ return;
+ }
+
+ // Other errors are unrecoverable. Clear the stream and cancel any
+ // pending transfers with an INTERNAL status as this is a system
+ // error.
+ this.readStream = undefined;
+
+ for (const key in this.readTransfers) {
+ const transfer = this.readTransfers[key];
+ transfer.finish(Status.INTERNAL);
+ }
+ this.readTransfers = {};
+ console.error(`Read stream shut down ${Status[status]}`);
+ };
+
+ private onWriteError = (status: Status) => {
+ if (status === Status.FAILED_PRECONDITION) {
+ // FAILED_PRECONDITION indicates that the stream packet was not
+ // recognized as the stream is not open. This could occur if the
+ // server resets during an active transfer. Re-open the stream to
+ // allow pending transfers to continue.
+ this.openWriteStream();
+ } else {
+ // Other errors are unrecoverable. Clear the stream and cancel any
+ // pending transfers with an INTERNAL status as this is a system
+ // error.
+ this.writeStream = undefined;
+
+ for (const key in this.writeTransfers) {
+ const transfer = this.writeTransfers[key];
+ transfer.finish(Status.INTERNAL);
+ }
+ this.writeTransfers = {};
+ console.error(`Write stream shut down: ${Status[status]}`);
+ }
+ };
+
+ /**
+ * Processes an incoming chunk from a stream.
+ *
+ * The chunk is dispatched to an active transfer based on its ID. If the
+ * transfer indicates that it is complete, the provided completion callback
+ * is invoked.
+ */
+ private async handleChunk(transfers: TransferDict, chunk: Chunk) {
+ const transfer = transfers[chunk.getTransferId()];
+ if (transfer === undefined) {
+ console.error(
+ `TransferManager received chunk for unknown transfer ${chunk.getTransferId()}`
+ );
+ return;
+ }
+ transfer.handleChunk(chunk);
+ }
+}
+
+/**
+ * Exception raised when a transfer fails.
+ *
+ * Stores the ID of the failed transfer and the error that occured.
+ */
+class TransferError extends Error {
+ id: number;
+ status: Status;
+
+ constructor(id: number, status: Status) {
+ super(`Transfer ${id} failed with status ${Status[status]}`);
+ this.status = status;
+ this.id = id;
+ }
+}
diff --git a/pw_transfer/ts/index.ts b/pw_transfer/ts/index.ts
new file mode 100644
index 0000000..5ba7e83
--- /dev/null
+++ b/pw_transfer/ts/index.ts
@@ -0,0 +1,15 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+export {Manager} from './client';
diff --git a/pw_transfer/ts/package.json b/pw_transfer/ts/package.json
new file mode 100644
index 0000000..81ddd21
--- /dev/null
+++ b/pw_transfer/ts/package.json
@@ -0,0 +1,14 @@
+{
+ "name": "@pigweed/pw_transfer",
+ "version": "1.0.0",
+ "main": "index.js",
+ "license": "Apache-2.0",
+ "dependencies": {
+ "@bazel/jasmine": "^4.1.0",
+ "@pigweed/pw_rpc": "link:../../pw_status/pw_rpc",
+ "@pigweed/pw_status": "link:../../pw_status/ts",
+ "@types/jasmine": "^3.9.0",
+ "jasmine": "^3.9.0",
+ "jasmine-core": "^3.9.0"
+ }
+}
diff --git a/pw_transfer/ts/transfer.ts b/pw_transfer/ts/transfer.ts
new file mode 100644
index 0000000..3ed7b89
--- /dev/null
+++ b/pw_transfer/ts/transfer.ts
@@ -0,0 +1,406 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+import {
+ BidirectionalStreamingCall,
+ BidirectionalStreamingMethodStub,
+ ServiceClient,
+} from '@pigweed/pw_rpc';
+import {Status} from '@pigweed/pw_status';
+import {Chunk} from 'transfer_proto_tspb/transfer_proto_tspb_pb/pw_transfer/transfer_pb';
+
+/** A Timer which invokes a callback after a certain timeout. */
+class Timer {
+ private task?: ReturnType<typeof setTimeout>;
+
+ constructor(
+ readonly timeoutS: number,
+ private readonly callback: () => any
+ ) {}
+
+ /**
+ * Starts a new timer.
+ *
+ * If a timer is already running, it is stopped and a new timer started.
+ * This can be used to implement watchdog-like behavior, where a callback
+ * is invoked after some time without a kick.
+ */
+ start() {
+ this.stop();
+ this.task = setTimeout(this.callback, this.timeoutS * 1000);
+ }
+
+ /** Terminates a running timer. */
+ stop() {
+ if (this.task !== undefined) {
+ clearTimeout(this.task);
+ this.task = undefined;
+ }
+ }
+}
+
+/**
+ * A client-side data transfer through a Manager.
+ *
+ * Subclasses are responsible for implementing all of the logic for their type
+ * of transfer, receiving messages from the server and sending the appropriate
+ * messages in response.
+ */
+export abstract class Transfer {
+ status: Status = Status.OK;
+
+ protected data = new Uint8Array();
+
+ private retries = 0;
+ private responseTimer?: Timer;
+
+ done: Promise<Status>;
+ private resolve?: (value: Status | PromiseLike<Status>) => void;
+
+ constructor(
+ public id: number,
+ protected sendChunk: (chunk: Chunk) => void,
+ responseTimeoutS: number,
+ private maxRetries: number
+ ) {
+ this.responseTimer = new Timer(responseTimeoutS, this.onTimeout);
+ this.done = new Promise<Status>(resolve => {
+ this.resolve = resolve!;
+ });
+ }
+
+ /** Returns the initial chunk to notify the server of the transfer. */
+ protected abstract get initialChunk(): Chunk;
+
+ /** Handles a chunk that contains or requests data. */
+ protected abstract handleDataChunk(chunk: Chunk): void;
+
+ /** Retries after a timeout occurs. */
+ protected abstract retryAfterTimeout(): void;
+
+ /** Handles a timeout while waiting for a chunk. */
+ private onTimeout = () => {
+ this.retries += 1;
+ if (this.retries > this.maxRetries) {
+ this.finish(Status.DEADLINE_EXCEEDED);
+ return;
+ }
+
+ console.debug(
+ `Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}`
+ );
+
+ this.retryAfterTimeout();
+ this.responseTimer?.start();
+ };
+
+ /** Sends an error chunk to the server and finishes the transfer. */
+ protected sendError(error: Status): void {
+ const chunk = new Chunk();
+ chunk.setStatus(error);
+ chunk.setTransferId(this.id);
+ this.sendChunk(chunk);
+ this.finish(error);
+ }
+
+ /** Sends the initial chunk of the transfer. */
+ begin(): void {
+ this.sendChunk(this.initialChunk);
+ this.responseTimer?.start();
+ }
+
+ /** Ends the transfer with the specified status. */
+ finish(status: Status): void {
+ this.responseTimer?.stop();
+ this.responseTimer = undefined;
+ this.status = status;
+
+ if (status === Status.OK) {
+ const totalSize = this.data.length;
+ }
+
+ this.resolve!(this.status);
+ }
+
+ /**
+ * Processes an incoming chunk from the server.
+ *
+ * Handles terminating chunks (i.e. those with a status) and forwards
+ * non-terminating chunks to handle_data_chunk.
+ */
+ handleChunk(chunk: Chunk): void {
+ this.responseTimer?.stop();
+ this.retries = 0; // Received data from service, so reset the retries.
+
+ console.debug(`Received chunk:(${chunk})`);
+
+ // Status chunks are only used to terminate a transfer. They do not
+ // contain any data that requires processing.
+ if (chunk.hasStatus()) {
+ this.finish(chunk.getStatus());
+ return;
+ }
+
+ this.handleDataChunk(chunk);
+
+ // Start the timeout for the server to send a chunk in response.
+ this.responseTimer?.start();
+ }
+}
+
+/**
+ * A client <= server read transfer.
+ *
+ * Although typescript can effectively handle an unlimited transfer window, this
+ * client sets a conservative window and chunk size to avoid overloading the
+ * device. These are configurable in the constructor.
+ */
+export class ReadTransfer extends Transfer {
+ private maxBytesToReceive: number;
+ private maxChunkSize: number;
+ private chunkDelayMicroS?: number; // Microseconds
+ private remainingTransferSize?: number;
+ private offset = 0;
+ private pendingBytes: number;
+
+ data = new Uint8Array();
+
+ constructor(
+ id: number,
+ sendChunk: (chunk: Chunk) => void,
+ responseTimeoutS: number,
+ maxRetries: number,
+ maxBytesToReceive = 8192,
+ maxChunkSize = 1024,
+ chunkDelayMicroS?: number
+ ) {
+ super(id, sendChunk, responseTimeoutS, maxRetries);
+ this.maxBytesToReceive = maxBytesToReceive;
+ this.maxChunkSize = maxChunkSize;
+ this.chunkDelayMicroS = chunkDelayMicroS;
+ this.pendingBytes = maxBytesToReceive;
+ }
+
+ protected get initialChunk(): Chunk {
+ return this.transferParameters();
+ }
+
+ /** Builds an updated transfer parameters chunk to send the server. */
+ private transferParameters(): Chunk {
+ this.pendingBytes = this.maxBytesToReceive;
+
+ const chunk = new Chunk();
+ chunk.setTransferId(this.id);
+ chunk.setPendingBytes(this.pendingBytes);
+ chunk.setMaxChunkSizeBytes(this.maxChunkSize);
+ chunk.setOffset(this.offset);
+
+ if (this.chunkDelayMicroS !== 0) {
+ chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!);
+ }
+ return chunk;
+ }
+
+ /**
+ * Processes an incoming chunk from the server.
+ *
+ * In a read transfer, the client receives data chunks from the server.
+ * Once all pending data is received, the transfer parameters are updated.
+ */
+ protected handleDataChunk(chunk: Chunk): void {
+ if (chunk.getOffset() != this.offset) {
+ // Initially, the transfer service only supports in-order transfers.
+ // If data is received out of order, request that the server
+ // retransmit from the previous offset.
+ this.pendingBytes = 0;
+ this.sendChunk(this.transferParameters());
+ return;
+ }
+
+ const oldData = this.data;
+ const chunkData = chunk.getData() as Uint8Array;
+ this.data = new Uint8Array(chunkData.length + oldData.length);
+ this.data.set(oldData);
+ this.data.set(chunkData, oldData.length);
+
+ this.pendingBytes -= chunk.getData().length;
+ this.offset += chunk.getData().length;
+
+ if (chunk.hasRemainingBytes()) {
+ if (chunk.getRemainingBytes() === 0) {
+ // No more data to read. Aknowledge receipt and finish.
+ const endChunk = new Chunk();
+ endChunk.setTransferId(this.id);
+ endChunk.setStatus(Status.OK);
+ this.sendChunk(endChunk);
+ this.finish(Status.OK);
+ return;
+ }
+
+ this.remainingTransferSize = chunk.getRemainingBytes();
+ } else if (this.remainingTransferSize !== undefined) {
+ // Update the remaining transfer size, if it is known.
+ this.remainingTransferSize -= chunk.getData().length;
+
+ if (this.remainingTransferSize <= 0) {
+ this.remainingTransferSize = undefined;
+ }
+ }
+
+ if (this.pendingBytes === 0) {
+ // All pending data was received. Send out a new parameters chunk
+ // for the next block.
+ this.sendChunk(this.transferParameters());
+ }
+ }
+
+ protected retryAfterTimeout(): void {
+ this.sendChunk(this.transferParameters());
+ }
+}
+
+/**
+ * A client => server write transfer.
+ */
+export class WriteTransfer extends Transfer {
+ readonly data: Uint8Array;
+ private windowId = 0;
+ offset = 0;
+ maxChunkSize = 0;
+ chunkDelayMicroS?: number;
+ maxBytesToSend = 0;
+ lastChunk: Chunk;
+
+ constructor(
+ id: number,
+ data: Uint8Array,
+ sendChunk: (chunk: Chunk) => void,
+ responseTimeoutS: number,
+ initialResponseTimeoutS: number,
+ maxRetries: number
+ ) {
+ super(id, sendChunk, responseTimeoutS, maxRetries);
+ this.data = data;
+ this.lastChunk = this.initialChunk;
+ }
+
+ protected get initialChunk(): Chunk {
+ const chunk = new Chunk();
+ chunk.setTransferId(this.id);
+ return chunk;
+ }
+
+ /**
+ * Processes an incoming chunk from the server.
+ *
+ * In a write transfer, the server only sends transfer parameter updates
+ * to the client. When a message is received, update local parameters and
+ * send data accordingly.
+ */
+ protected handleDataChunk(chunk: Chunk): void {
+ this.windowId += 1;
+ const initialWindowId = this.windowId;
+
+ if (!this.handleParametersUpdate(chunk)) {
+ return;
+ }
+
+ const bytesAknowledged = chunk.getOffset();
+
+ let writeChunk: Chunk;
+ while (true) {
+ writeChunk = this.nextChunk();
+ this.offset += writeChunk.getData().length;
+ this.maxBytesToSend -= writeChunk.getData().length;
+ const sentRequestedBytes = this.maxBytesToSend === 0;
+
+ this.sendChunk(writeChunk);
+ if (sentRequestedBytes) {
+ break;
+ }
+ }
+
+ this.lastChunk = writeChunk;
+ }
+
+ /** Updates transfer state base on a transfer parameters update. */
+ private handleParametersUpdate(chunk: Chunk): boolean {
+ if (chunk.getOffset() > this.data.length) {
+ // Bad offset; terminate the transfer.
+ console.error(
+ `Transfer ${
+ this.id
+ }: server requested invalid offset ${chunk.getOffset()} (size ${
+ this.data.length
+ })`
+ );
+
+ this.sendError(Status.OUT_OF_RANGE);
+ return false;
+ }
+
+ if (chunk.getPendingBytes() === 0) {
+ console.error(
+ `Transfer ${this.id}: service requested 0 bytes (invalid); aborting`
+ );
+ this.sendError(Status.INTERNAL);
+ return false;
+ }
+
+ // Check whether the client has sent a previous data offset, which
+ // indicates that some chunks were lost in transmission.
+ if (chunk.getOffset() < this.offset) {
+ console.debug(
+ `Write transfer ${
+ this.id
+ } rolling back to offset ${chunk.getOffset()} from ${this.offset}`
+ );
+ }
+
+ this.offset = chunk.getOffset();
+ this.maxBytesToSend = Math.min(
+ chunk.getPendingBytes(),
+ this.data.length - this.offset
+ );
+
+ if (chunk.hasMaxChunkSizeBytes()) {
+ this.maxChunkSize = chunk.getMaxChunkSizeBytes();
+ }
+
+ if (chunk.hasMinDelayMicroseconds()) {
+ this.chunkDelayMicroS = chunk.getMinDelayMicroseconds();
+ }
+ return true;
+ }
+
+ /** Returns the next Chunk message to send in the data transfer. */
+ private nextChunk(): Chunk {
+ const chunk = new Chunk();
+ chunk.setTransferId(this.id);
+ chunk.setOffset(this.offset);
+ const maxBytesInChunk = Math.min(this.maxChunkSize, this.maxBytesToSend);
+
+ chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk));
+
+ // Mark the final chunk of the transfer.
+ if (this.data.length - this.offset <= maxBytesInChunk) {
+ chunk.setRemainingBytes(0);
+ }
+ return chunk;
+ }
+
+ protected retryAfterTimeout(): void {
+ this.sendChunk(this.lastChunk);
+ }
+}
diff --git a/pw_transfer/ts/transfer_test.ts b/pw_transfer/ts/transfer_test.ts
new file mode 100644
index 0000000..53fac33
--- /dev/null
+++ b/pw_transfer/ts/transfer_test.ts
@@ -0,0 +1,521 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+/* eslint-env browser, jasmine */
+import 'jasmine';
+
+import {
+ Channel,
+ Client,
+ decode,
+ MethodStub,
+ ServiceClient,
+} from '@pigweed/pw_rpc';
+import {Status} from '@pigweed/pw_status';
+import {
+ PacketType,
+ RpcPacket,
+} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb';
+import {ProtoCollection} from 'transfer_proto_collection/generated/ts_proto_collection';
+import {Chunk} from 'transfer_proto_tspb/transfer_proto_tspb_pb/pw_transfer/transfer_pb';
+
+import {Manager} from './client';
+
+const DEFAULT_TIMEOUT_S = 0.3;
+
+describe('Encoder', () => {
+ const textEncoder = new TextEncoder();
+ const textDecoder = new TextDecoder();
+ let client: Client;
+ let service: ServiceClient;
+ let sentChunks: Chunk[];
+ let packetsToSend: Uint8Array[][];
+
+ beforeEach(() => {
+ const lib = new ProtoCollection();
+ const channels: Channel[] = [new Channel(1, handleRequest)];
+ client = Client.fromProtoSet(channels, lib);
+ service = client.channel(1)!.service('pw.transfer.Transfer')!;
+
+ sentChunks = [];
+ packetsToSend = [];
+ });
+
+ function handleRequest(data: Uint8Array): void {
+ const packet = decode(data);
+ if (packet.getType() !== PacketType.CLIENT_STREAM) {
+ return;
+ }
+
+ const chunk = Chunk.deserializeBinary(packet.getPayload_asU8());
+ sentChunks.push(chunk);
+
+ if (packetsToSend.length > 0) {
+ const responses = packetsToSend.shift()!;
+ for (const response of responses) {
+ client.processPacket(response);
+ }
+ }
+ }
+
+ function receivedData(): Uint8Array {
+ let length = 0;
+ sentChunks.forEach((chunk: Chunk) => {
+ length += chunk.getData().length;
+ });
+ const data = new Uint8Array(length);
+ let offset = 0;
+ sentChunks.forEach((chunk: Chunk) => {
+ data.set(chunk.getData() as Uint8Array, offset);
+ offset += chunk.getData().length;
+ });
+ return data;
+ }
+
+ function enqueueServerError(method: MethodStub, error: Status): void {
+ const packet = new RpcPacket();
+ packet.setType(PacketType.SERVER_ERROR);
+ packet.setChannelId(1);
+ packet.setServiceId(service.id);
+ packet.setMethodId(method.id);
+ packet.setStatus(error);
+ packetsToSend.push([packet.serializeBinary()]);
+ }
+
+ function enqueueServerResponses(method: MethodStub, responses: Chunk[][]) {
+ for (const responseGroup of responses) {
+ const serializedGroup = [];
+ for (const response of responseGroup) {
+ const packet = new RpcPacket();
+ packet.setType(PacketType.SERVER_STREAM);
+ packet.setChannelId(1);
+ packet.setServiceId(service.id);
+ packet.setMethodId(method.id);
+ packet.setStatus(Status.OK);
+ packet.setPayload(response.serializeBinary());
+ serializedGroup.push(packet.serializeBinary());
+ }
+ packetsToSend.push(serializedGroup);
+ }
+ }
+
+ function buildChunk(
+ transferId: number,
+ offset: number,
+ data: string,
+ remainingBytes: number
+ ): Chunk {
+ const chunk = new Chunk();
+ chunk.setTransferId(transferId);
+ chunk.setOffset(offset);
+ chunk.setData(textEncoder.encode(data));
+ chunk.setRemainingBytes(remainingBytes);
+ return chunk;
+ }
+
+ it('read transfer basic', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk1 = buildChunk(3, 0, 'abc', 0);
+ enqueueServerResponses(service.method('Read')!, [[chunk1]]);
+
+ const data = await manager.read(3);
+ expect(textDecoder.decode(data)).toEqual('abc');
+ expect(sentChunks).toHaveSize(2);
+ expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
+ expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
+ });
+
+ it('read transfer multichunk', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk1 = buildChunk(3, 0, 'abc', 3);
+ const chunk2 = buildChunk(3, 3, 'def', 0);
+ enqueueServerResponses(service.method('Read')!, [[chunk1, chunk2]]);
+
+ const data = await manager.read(3);
+ expect(data).toEqual(textEncoder.encode('abcdef'));
+ expect(sentChunks).toHaveSize(2);
+ expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
+ expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
+ });
+
+ // it('read transfer progress callback', async () => {});
+
+ it('read transfer retry bad offset', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk1 = buildChunk(3, 0, '123', 6);
+ const chunk2 = buildChunk(3, 1, '456', 3); // Incorrect offset; expecting 3
+ const chunk3 = buildChunk(3, 3, '456', 3);
+ const chunk4 = buildChunk(3, 6, '789', 0);
+
+ enqueueServerResponses(service.method('Read')!, [
+ [chunk1, chunk2],
+ [chunk3, chunk4],
+ ]);
+
+ const data = await manager.read(3);
+ expect(data).toEqual(textEncoder.encode('123456789'));
+ expect(sentChunks).toHaveSize(3);
+ expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
+ expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
+ });
+
+ it('read transfer retry timeout', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = buildChunk(3, 0, 'xyz', 0);
+ enqueueServerResponses(service.method('Read')!, [[], [chunk]]);
+
+ const data = await manager.read(3);
+ expect(textDecoder.decode(data)).toEqual('xyz');
+
+ // Two transfer parameter requests should have been sent.
+ expect(sentChunks).toHaveSize(3);
+ expect(sentChunks[sentChunks.length - 1].hasStatus()).toBeTrue();
+ expect(sentChunks[sentChunks.length - 1].getStatus()).toEqual(Status.OK);
+ });
+
+ it('read transfer timeout', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ await manager
+ .read(27)
+ .then(() => {
+ fail('Unexpected completed promise');
+ })
+ .catch(error => {
+ expect(error.id).toEqual(27);
+ expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
+ expect(sentChunks).toHaveSize(4);
+ });
+ });
+
+ it('read transfer error', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = new Chunk();
+ chunk.setStatus(Status.NOT_FOUND);
+ chunk.setTransferId(31);
+ enqueueServerResponses(service.method('Read')!, [[chunk]]);
+
+ await manager
+ .read(31)
+ .then(() => {
+ fail('Unexpected completed promise');
+ })
+ .catch(error => {
+ expect(error.id).toEqual(31);
+ expect(Status[error.status]).toEqual(Status[Status.NOT_FOUND]);
+ });
+ });
+
+ it('read transfer server error', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ enqueueServerError(service.method('Read')!, Status.NOT_FOUND);
+ await manager
+ .read(31)
+ .then(data => {
+ fail('Unexpected completed promise');
+ })
+ .catch(error => {
+ expect(error.id).toEqual(31);
+ expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
+ });
+ });
+
+ it('write transfer basic', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = new Chunk();
+ chunk.setTransferId(4);
+ chunk.setOffset(0);
+ chunk.setPendingBytes(32);
+ chunk.setMaxChunkSizeBytes(8);
+
+ const completeChunk = new Chunk();
+ completeChunk.setTransferId(4);
+ completeChunk.setStatus(Status.OK);
+
+ enqueueServerResponses(service.method('Write')!, [
+ [chunk],
+ [completeChunk],
+ ]);
+
+ await manager.write(4, textEncoder.encode('hello'));
+ expect(sentChunks).toHaveSize(2);
+ expect(receivedData()).toEqual(textEncoder.encode('hello'));
+ });
+
+ it('write transfer max chunk size', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = new Chunk();
+ chunk.setTransferId(4);
+ chunk.setOffset(0);
+ chunk.setPendingBytes(32);
+ chunk.setMaxChunkSizeBytes(8);
+
+ const completeChunk = new Chunk();
+ completeChunk.setTransferId(4);
+ completeChunk.setStatus(Status.OK);
+
+ enqueueServerResponses(service.method('Write')!, [
+ [chunk],
+ [completeChunk],
+ ]);
+
+ await manager.write(4, textEncoder.encode('hello world'));
+ expect(sentChunks).toHaveSize(3);
+ expect(receivedData()).toEqual(textEncoder.encode('hello world'));
+ expect(sentChunks[1].getData()).toEqual(textEncoder.encode('hello wo'));
+ expect(sentChunks[2].getData()).toEqual(textEncoder.encode('rld'));
+ });
+
+ it('write transfer multiple parameters', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = new Chunk();
+ chunk.setTransferId(4);
+ chunk.setOffset(0);
+ chunk.setPendingBytes(8);
+ chunk.setMaxChunkSizeBytes(8);
+
+ const chunk2 = new Chunk();
+ chunk2.setTransferId(4);
+ chunk2.setOffset(8);
+ chunk2.setPendingBytes(8);
+ chunk2.setMaxChunkSizeBytes(8);
+
+ const completeChunk = new Chunk();
+ completeChunk.setTransferId(4);
+ completeChunk.setStatus(Status.OK);
+
+ enqueueServerResponses(service.method('Write')!, [
+ [chunk],
+ [chunk2],
+ [completeChunk],
+ ]);
+
+ await manager.write(4, textEncoder.encode('data to write'));
+ expect(sentChunks).toHaveSize(3);
+ expect(receivedData()).toEqual(textEncoder.encode('data to write'));
+ expect(sentChunks[1].getData()).toEqual(textEncoder.encode('data to '));
+ expect(sentChunks[2].getData()).toEqual(textEncoder.encode('write'));
+ });
+
+ // it('write transfer progress callback', () => {});
+
+ it('write transfer rewind', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk1 = new Chunk();
+ chunk1.setTransferId(4);
+ chunk1.setOffset(0);
+ chunk1.setPendingBytes(8);
+ chunk1.setMaxChunkSizeBytes(8);
+
+ const chunk2 = new Chunk();
+ chunk2.setTransferId(4);
+ chunk2.setOffset(8);
+ chunk2.setPendingBytes(8);
+ chunk2.setMaxChunkSizeBytes(8);
+
+ const chunk3 = new Chunk();
+ chunk3.setTransferId(4);
+ chunk3.setOffset(4); // Rewind
+ chunk3.setPendingBytes(8);
+ chunk3.setMaxChunkSizeBytes(8);
+
+ const chunk4 = new Chunk();
+ chunk4.setTransferId(4);
+ chunk4.setOffset(12); // Rewind
+ chunk4.setPendingBytes(16);
+ chunk4.setMaxChunkSizeBytes(16);
+
+ const completeChunk = new Chunk();
+ completeChunk.setTransferId(4);
+ completeChunk.setStatus(Status.OK);
+
+ enqueueServerResponses(service.method('Write')!, [
+ [chunk1],
+ [chunk2],
+ [chunk3],
+ [chunk4],
+ [completeChunk],
+ ]);
+
+ await manager.write(4, textEncoder.encode('pigweed data transfer'));
+ expect(sentChunks).toHaveSize(5);
+ expect(sentChunks[1].getData()).toEqual(textEncoder.encode('pigweed '));
+ expect(sentChunks[2].getData()).toEqual(textEncoder.encode('data tra'));
+ expect(sentChunks[3].getData()).toEqual(textEncoder.encode('eed data'));
+ expect(sentChunks[4].getData()).toEqual(textEncoder.encode(' transfer'));
+ });
+
+ it('write transfer bad offset', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk1 = new Chunk();
+ chunk1.setTransferId(4);
+ chunk1.setOffset(0);
+ chunk1.setPendingBytes(8);
+ chunk1.setMaxChunkSizeBytes(8);
+
+ const chunk2 = new Chunk();
+ chunk2.setTransferId(4);
+ chunk2.setOffset(100); // larger offset than data
+ chunk2.setPendingBytes(8);
+ chunk2.setMaxChunkSizeBytes(8);
+
+ const completeChunk = new Chunk();
+ completeChunk.setTransferId(4);
+ completeChunk.setStatus(Status.OK);
+
+ enqueueServerResponses(service.method('Write')!, [
+ [chunk1],
+ [chunk2],
+ [completeChunk],
+ ]);
+
+ await manager
+ .write(4, textEncoder.encode('small data'))
+ .then(() => {
+ fail('Unexpected succesful promise');
+ })
+ .catch(error => {
+ expect(error.id).toEqual(4);
+ expect(Status[error.status]).toEqual(Status[Status.OUT_OF_RANGE]);
+ });
+ });
+
+ it('write transfer error', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = new Chunk();
+ chunk.setTransferId(21);
+ chunk.setStatus(Status.UNAVAILABLE);
+
+ enqueueServerResponses(service.method('Write')!, [[chunk]]);
+
+ await manager
+ .write(21, textEncoder.encode('no write'))
+ .then(() => {
+ fail('Unexpected succesful promise');
+ })
+ .catch(error => {
+ expect(error.id).toEqual(21);
+ expect(Status[error.status]).toEqual(Status[Status.UNAVAILABLE]);
+ });
+ });
+
+ it('write transfer server error', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = new Chunk();
+ chunk.setTransferId(21);
+ chunk.setStatus(Status.NOT_FOUND);
+
+ enqueueServerError(service.method('Write')!, Status.NOT_FOUND);
+
+ await manager
+ .write(21, textEncoder.encode('server error'))
+ .then(() => {
+ fail('Unexpected succesful promise');
+ })
+ .catch(error => {
+ expect(error.id).toEqual(21);
+ expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
+ });
+ });
+
+ it('write transfer timeout after initial chunk', async () => {
+ const manager = new Manager(service, 0.001, 4, 2);
+
+ await manager
+ .write(22, textEncoder.encode('no server response!'))
+ .then(() => {
+ fail('unexpected succesful write');
+ })
+ .catch(error => {
+ expect(sentChunks).toHaveSize(3); // Initial chunk + two retries.
+ expect(error.id).toEqual(22);
+ expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
+ });
+ });
+
+ it('write transfer timeout after intermediate chunk', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S, 4, 2);
+
+ const chunk = new Chunk();
+ chunk.setTransferId(22);
+ chunk.setPendingBytes(10);
+ chunk.setMaxChunkSizeBytes(5);
+
+ enqueueServerResponses(service.method('Write')!, [[chunk]]);
+
+ await manager
+ .write(22, textEncoder.encode('0123456789'))
+ .then(() => {
+ fail('unexpected succesful write');
+ })
+ .catch(error => {
+ const expectedChunk1 = new Chunk();
+ expectedChunk1.setTransferId(22);
+ const expectedChunk2 = new Chunk();
+ expectedChunk2.setTransferId(22);
+ expectedChunk2.setData(textEncoder.encode('01234'));
+ const lastChunk = new Chunk();
+ lastChunk.setTransferId(22);
+ lastChunk.setData(textEncoder.encode('56789'));
+ lastChunk.setOffset(5);
+ lastChunk.setRemainingBytes(0);
+
+ const expectedChunks = [
+ expectedChunk1,
+ expectedChunk2,
+ lastChunk,
+ lastChunk, // retry 1
+ lastChunk, // retry 2
+ ];
+
+ expect(sentChunks).toEqual(expectedChunks);
+
+ expect(error.id).toEqual(22);
+ expect(Status[error.status]).toEqual(Status[Status.DEADLINE_EXCEEDED]);
+ });
+ });
+
+ it('write zero pending bytes is internal error', async () => {
+ const manager = new Manager(service, DEFAULT_TIMEOUT_S);
+
+ const chunk = new Chunk();
+ chunk.setTransferId(23);
+ chunk.setPendingBytes(0);
+
+ enqueueServerResponses(service.method('Write')!, [[chunk]]);
+
+ await manager
+ .write(23, textEncoder.encode('no write'))
+ .then(() => {
+ fail('Unexpected succesful promise');
+ })
+ .catch(error => {
+ expect(error.id).toEqual(23);
+ expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
+ });
+ });
+});
diff --git a/pw_transfer/ts/tsconfig.json b/pw_transfer/ts/tsconfig.json
new file mode 100644
index 0000000..0fab6a4
--- /dev/null
+++ b/pw_transfer/ts/tsconfig.json
@@ -0,0 +1,35 @@
+{
+ "compilerOptions": {
+ "allowUnreachableCode": false,
+ "allowUnusedLabels": false,
+ "declaration": true,
+ "forceConsistentCasingInFileNames": true,
+ "lib": [
+ "es2018",
+ "dom",
+ "dom.iterable",
+ "esnext"
+ ],
+ "module": "commonjs",
+ "noEmitOnError": true,
+ "noFallthroughCasesInSwitch": true,
+ "noImplicitReturns": true,
+ "pretty": true,
+ "sourceMap": true,
+ "strict": true,
+ "target": "es2018",
+ "jsx": "react",
+ "plugins": [
+ {
+ "name": "@bazel/tsetse",
+ "disabledRules": [
+ "must-use-promises"
+ ]
+ }
+ ]
+ },
+ "exclude": [
+ "node_modules"
+ ]
+}
+