ui: Fetch websocket info from the device

Fetch the API version from the device.

Add functionality to the websocket connection to support shell commands.

Manage syncronisation between a websocket's `onclose` and `onmessage`.
Otherwise, if `onmessage` is async `onclose` will close the stream
and the result won't be returned.

Bug: 209947455
Change-Id: Ifeaa8a99a8150aa1d7c82e70e89603b33a728bdc
diff --git a/ui/src/common/recordingV2/adb_connection_over_websocket.ts b/ui/src/common/recordingV2/adb_connection_over_websocket.ts
index aed3dd1..62284ba 100644
--- a/ui/src/common/recordingV2/adb_connection_over_websocket.ts
+++ b/ui/src/common/recordingV2/adb_connection_over_websocket.ts
@@ -14,10 +14,12 @@
 
 import {_TextDecoder} from 'custom_utils';
 
-import {defer} from '../../base/deferred';
+import {defer, Deferred} from '../../base/deferred';
+import {ArrayBufferBuilder} from '../array_buffer_builder';
 
 import {buildAbdWebsocketCommand} from './adb_over_websocket_utils';
 import {ALLOW_USB_DEBUGGING} from './adb_targets_utils';
+import {RecordingError} from './recording_error_handling';
 import {
   AdbConnection,
   ByteStream,
@@ -44,45 +46,38 @@
     throw new Error('Not implemented yet');
   }
 
-  shell(_cmd: string): Promise<AdbOverWebsocketStream> {
-    // TODO(octaviant) implement
-    throw new Error('Not implemented yet');
+  // Starts a shell command, then gathers all its output and returns it as
+  // a string.
+  async shellAndGetOutput(cmd: string): Promise<string> {
+    const adbStream = await this.shell(cmd);
+    const commandOutput = new ArrayBufferBuilder();
+    const onStreamingEnded = defer<string>();
+
+    adbStream.addOnStreamData((data: Uint8Array) => {
+      commandOutput.append(data);
+    });
+    adbStream.addOnStreamClose(() => {
+      onStreamingEnded.resolve(
+          textDecoder.decode(commandOutput.toArrayBuffer()));
+    });
+    return onStreamingEnded;
   }
 
-  async connectSocket(path: string): Promise<ByteStream> {
-    const webSocket = new WebSocket(this.websocketUrl);
-    const byteStreamConnected = defer<AdbOverWebsocketStream>();
-    const byteStream = new AdbOverWebsocketStream(webSocket);
-    byteStream.addOnStreamClose(() => this.closeStream(byteStream));
+  shell(cmd: string): Promise<AdbOverWebsocketStream> {
+    return this.openStream('shell:' + cmd);
+  }
 
-    webSocket.onopen = () => webSocket.send(
-        buildAbdWebsocketCommand(`host:transport:${this.deviceSerialNumber}`));
+  connectSocket(path: string): Promise<AdbOverWebsocketStream> {
+    return this.openStream(path);
+  }
 
-    webSocket.onmessage = async (evt) => {
-      if (!byteStream.isOpen()) {
-        const txt = await evt.data.text();
-        const prefix = txt.substr(0, 4);
-        if (prefix === 'OKAY') {
-          byteStream.setStreamOpen();
-          webSocket.send(buildAbdWebsocketCommand(path));
-          byteStreamConnected.resolve(byteStream);
-        } else if (prefix === 'FAIL' && txt.includes('device unauthorized')) {
-          byteStreamConnected.reject(ALLOW_USB_DEBUGGING);
-        } else {
-          byteStreamConnected.reject(WEBSOCKET_UNABLE_TO_CONNECT);
-        }
-        return;
-      }
-
-      // Upon a successful connection we first receive an 'OKAY' message.
-      // After that, we receive messages with traced binary payloads.
-      const arrayBufferResponse = await evt.data.arrayBuffer();
-      if (textDecoder.decode(arrayBufferResponse) !== 'OKAY') {
-        byteStream.signalStreamData(new Uint8Array(arrayBufferResponse));
-      }
-    };
-
-    return byteStreamConnected;
+  private async openStream(destination: string):
+      Promise<AdbOverWebsocketStream> {
+    return AdbOverWebsocketStream.create(
+        this.websocketUrl,
+        destination,
+        this.deviceSerialNumber,
+        this.closeStream.bind(this));
   }
 
   disconnect(): void {
@@ -106,16 +101,36 @@
   }
 }
 
-// An AdbOverWebsocketStream is instantiated after the creation of a socket to
-// the device. Thanks to this, we can send commands and receive their output.
-// Messages are received in the main adb class, and are forwarded to an instance
-// of this class based on a stream id match.
+// An AdbOverWebsocketStream instantiates a websocket connection to the device.
+// It exposes an API to write commands to this websocket and read its output.
 export class AdbOverWebsocketStream implements ByteStream {
-  private _isOpen = false;
+  private websocket: WebSocket;
+  // commandSentSignal gets resolved if we successfully connect to the device
+  // and send the command this socket wraps. commandSentSignal gets rejected if
+  // we fail to connect to the device.
+  private commandSentSignal = defer<AdbOverWebsocketStream>();
+  // We store a promise for each messge while the message is processed.
+  // This way, if the websocket server closes the connection, we first process
+  // all previously received messages and only afterwards disconnect.
+  // An application is when the stream wraps a shell command. The websocket
+  // server will reply and then immediately disconnect.
+  private messageProcessedSignals: Set<Deferred<void>> = new Set();
+
+  private _isConnected = false;
   private onStreamDataCallbacks: OnStreamDataCallback[] = [];
   private onStreamCloseCallbacks: OnStreamCloseCallback[] = [];
 
-  constructor(private websocket: WebSocket) {}
+  private constructor(
+      websocketUrl: string, destination: string, deviceSerialNumber: string,
+      private removeFromConnection: (stream: AdbOverWebsocketStream) => void) {
+    this.websocket = new WebSocket(websocketUrl);
+
+    this.websocket.onopen = this.onOpen.bind(this, deviceSerialNumber);
+    this.websocket.onmessage = this.onMessage.bind(this, destination);
+    // The websocket may be closed by the websocket server. This happens
+    // for instance when we get the full result of a shell command.
+    this.websocket.onclose = this.onClose.bind(this);
+  }
 
   addOnStreamData(onStreamData: OnStreamDataCallback) {
     this.onStreamDataCallbacks.push(onStreamData);
@@ -145,8 +160,16 @@
 
   // We close the websocket and notify the AdbConnection to remove this stream.
   close(): void {
-    this.websocket.close();
-    this._isOpen = false;
+    // If the websocket connection is still open (ie. the close did not
+    // originate from the server), we close the websocket connection.
+    if (this.websocket.readyState === this.websocket.OPEN) {
+      this.websocket.close();
+      // We remove the 'onclose' callback so the 'close' method doesn't get
+      // executed twice.
+      this.websocket.onclose = null;
+    }
+    this._isConnected = false;
+    this.removeFromConnection(this);
     this.signalStreamClosed();
   }
 
@@ -154,11 +177,65 @@
     this.websocket.send(msg);
   }
 
-  isOpen(): boolean {
-    return this._isOpen;
+  isConnected(): boolean {
+    return this._isConnected;
   }
 
-  setStreamOpen(): void {
-    this._isOpen = true;
+  private async onOpen(deviceSerialNumber: string): Promise<void> {
+    this.websocket.send(
+        buildAbdWebsocketCommand(`host:transport:${deviceSerialNumber}`));
+  }
+
+  private async onMessage(destination: string, evt: MessageEvent):
+      Promise<void> {
+    const messageProcessed = defer<void>();
+    this.messageProcessedSignals.add(messageProcessed);
+    try {
+      if (!this._isConnected) {
+        const txt = await evt.data.text();
+        const prefix = txt.substr(0, 4);
+        if (prefix === 'OKAY') {
+          this._isConnected = true;
+          this.websocket.send(buildAbdWebsocketCommand(destination));
+          this.commandSentSignal.resolve(this);
+        } else if (prefix === 'FAIL' && txt.includes('device unauthorized')) {
+          this.commandSentSignal.reject(
+              new RecordingError(ALLOW_USB_DEBUGGING));
+          this.close();
+        } else {
+          this.commandSentSignal.reject(
+              new RecordingError(WEBSOCKET_UNABLE_TO_CONNECT));
+          this.close();
+        }
+      } else {
+        // Upon a successful connection we first receive an 'OKAY' message.
+        // After that, we receive messages with traced binary payloads.
+        const arrayBufferResponse = await evt.data.arrayBuffer();
+        if (textDecoder.decode(arrayBufferResponse) !== 'OKAY') {
+          this.signalStreamData(new Uint8Array(arrayBufferResponse));
+        }
+      }
+      messageProcessed.resolve();
+    } finally {
+      this.messageProcessedSignals.delete(messageProcessed);
+    }
+  }
+
+  private async onClose(): Promise<void> {
+    // Wait for all messages to be processed before closing the connection.
+    await Promise.allSettled(this.messageProcessedSignals);
+    this.close();
+  }
+
+  static create(
+      websocketUrl: string, destination: string, deviceSerialNumber: string,
+      removeFromConnection: (stream: AdbOverWebsocketStream) => void):
+      Promise<AdbOverWebsocketStream> {
+    return (new AdbOverWebsocketStream(
+                websocketUrl,
+                destination,
+                deviceSerialNumber,
+                removeFromConnection))
+        .commandSentSignal;
   }
 }
diff --git a/ui/src/common/recordingV2/adb_connection_over_webusb.ts b/ui/src/common/recordingV2/adb_connection_over_webusb.ts
index 98f8b34..493ca6d 100644
--- a/ui/src/common/recordingV2/adb_connection_over_webusb.ts
+++ b/ui/src/common/recordingV2/adb_connection_over_webusb.ts
@@ -498,7 +498,7 @@
 // of this class based on a stream id match.
 export class AdbOverWebusbStream implements ByteStream {
   private adbConnection: AdbConnectionOverWebusb;
-  private _isOpen: boolean;
+  private _isConnected: boolean;
   private onStreamDataCallbacks: OnStreamDataCallback[] = [];
   private onStreamCloseCallbacks: OnStreamCloseCallback[] = [];
   localStreamId: number;
@@ -511,7 +511,7 @@
     this.localStreamId = localStreamId;
     this.remoteStreamId = remoteStreamId;
     // When the stream is created, the connection has been already established.
-    this._isOpen = true;
+    this._isConnected = true;
   }
 
   addOnStreamData(onStreamData: OnStreamDataCallback): void {
@@ -540,12 +540,13 @@
     this.onStreamCloseCallbacks = [];
   }
 
+
   close(): void {
     this.closeAndWaitForTeardown();
   }
 
   async closeAndWaitForTeardown(): Promise<void> {
-    this._isOpen = false;
+    this._isConnected = false;
     await this.adbConnection.streamClose(this);
   }
 
@@ -553,8 +554,8 @@
     this.adbConnection.streamWrite(msg, this);
   }
 
-  isOpen(): boolean {
-    return this._isOpen;
+  isConnected(): boolean {
+    return this._isConnected;
   }
 }
 
diff --git a/ui/src/common/recordingV2/auth/credentials_interfaces.d.ts b/ui/src/common/recordingV2/auth/credentials_interfaces.d.ts
index 2ee28a1..f127ad5 100644
--- a/ui/src/common/recordingV2/auth/credentials_interfaces.d.ts
+++ b/ui/src/common/recordingV2/auth/credentials_interfaces.d.ts
@@ -13,7 +13,7 @@
 // limitations under the License.
 
 // Typescript interfaces for PasswordCredential don't exist as of
-// lib.dom es2018 (see tsconfig.json), so we had to define them here.
+// lib.dom es2020 (see tsconfig.json), so we had to define them here.
 declare global {
   export interface PasswordCredentialData {
     readonly id: string;
diff --git a/ui/src/common/recordingV2/recording_interfaces_v2.ts b/ui/src/common/recordingV2/recording_interfaces_v2.ts
index fd9b609..43b399b 100644
--- a/ui/src/common/recordingV2/recording_interfaces_v2.ts
+++ b/ui/src/common/recordingV2/recording_interfaces_v2.ts
@@ -164,7 +164,7 @@
   addOnStreamData(onStreamData: OnStreamDataCallback): void;
   addOnStreamClose(onStreamClose: OnStreamCloseCallback): void;
 
-  isOpen(): boolean;
+  isConnected(): boolean;
   write(data: string|Uint8Array): void;
   close(): void;
 }
diff --git a/ui/src/common/recordingV2/target_factories/android_websocket_target_factory.ts b/ui/src/common/recordingV2/target_factories/android_websocket_target_factory.ts
index 87e2250..2e9f027 100644
--- a/ui/src/common/recordingV2/target_factories/android_websocket_target_factory.ts
+++ b/ui/src/common/recordingV2/target_factories/android_websocket_target_factory.ts
@@ -106,7 +106,7 @@
   constructor(
       private websocket: WebSocket,
       private maybeClearConnection: (connection: WebsocketConnection) => void,
-      private onTargetChange?: OnTargetChangeCallback) {
+      private onTargetChange: OnTargetChangeCallback) {
     this.initWebsocket();
   }
 
@@ -181,13 +181,15 @@
         this.targets.set(
             listedDevice.serialNumber,
             new AndroidWebsocketTarget(
-                listedDevice.serialNumber, this.websocket.url));
+                listedDevice.serialNumber,
+                this.websocket.url,
+                this.onTargetChange));
         targetsUpdated = true;
       }
     }
 
     // Notify the calling code that the list of targets has been updated.
-    if (targetsUpdated && this.onTargetChange) {
+    if (targetsUpdated) {
       this.onTargetChange();
     }
   }
@@ -195,7 +197,7 @@
 
 export class AndroidWebsocketTargetFactory implements TargetFactory {
   readonly kind = ANDROID_WEBSOCKET_TARGET_FACTORY;
-  private onTargetChange?: OnTargetChangeCallback;
+  private onTargetChange: OnTargetChangeCallback = () => {};
   private websocketConnection?: WebsocketConnection;
 
   getName() {
diff --git a/ui/src/common/recordingV2/targets/android_websocket_target.ts b/ui/src/common/recordingV2/targets/android_websocket_target.ts
index 2b86fbd..b4621a9 100644
--- a/ui/src/common/recordingV2/targets/android_websocket_target.ts
+++ b/ui/src/common/recordingV2/targets/android_websocket_target.ts
@@ -15,6 +15,7 @@
 import {AdbConnectionOverWebsocket} from '../adb_connection_over_websocket';
 import {DEFAULT_TRACED_CONSUMER_SOCKET_PATH} from '../adb_targets_utils';
 import {
+  OnTargetChangeCallback,
   RecordingTargetV2,
   TargetInfo,
   TracingSession,
@@ -24,8 +25,12 @@
 
 export class AndroidWebsocketTarget implements RecordingTargetV2 {
   private adbConnection: AdbConnectionOverWebsocket;
+  private androidApiLevel?: number;
+  private consumerSocketPath = DEFAULT_TRACED_CONSUMER_SOCKET_PATH;
 
-  constructor(private serialNumber: string, websocketUrl: string) {
+  constructor(
+      private serialNumber: string, websocketUrl: string,
+      private onTargetChange: OnTargetChangeCallback) {
     this.adbConnection =
         new AdbConnectionOverWebsocket(serialNumber, websocketUrl);
   }
@@ -33,9 +38,8 @@
   getInfo(): TargetInfo {
     return {
       targetType: 'ANDROID',
-      // TODO(octaviant): fetch the OS from the adb connection
-      // once aosp/2127460 is in
-      androidApiLevel: undefined,
+      // 'androidApiLevel' will be populated after ADB authorization.
+      androidApiLevel: this.androidApiLevel,
       dataSources: [],
       name: this.serialNumber + ' WebSocket',
     };
@@ -60,9 +64,19 @@
   async createTracingSession(tracingSessionListener: TracingSessionListener):
       Promise<TracingSession> {
     this.adbConnection.onDisconnect = tracingSessionListener.onDisconnect;
-    const adbStream = await this.adbConnection.connectSocket(
-        DEFAULT_TRACED_CONSUMER_SOCKET_PATH);
 
+    if (!this.androidApiLevel) {
+      const version = await this.adbConnection.shellAndGetOutput(
+          'getprop ro.build.version.sdk');
+      this.androidApiLevel = Number(version);
+      this.onTargetChange();
+    }
+
+    // TODO(octaviant): bring the websocket targets at feature parity with the
+    // webusb ones after the chain from aosp/2122732 lands.
+
+    const adbStream =
+        await this.adbConnection.connectSocket(this.consumerSocketPath);
     const tracingSession =
         new TracedTracingSession(adbStream, tracingSessionListener);
     await tracingSession.initConnection();
diff --git a/ui/src/common/recordingV2/targets/android_webusb_target.ts b/ui/src/common/recordingV2/targets/android_webusb_target.ts
index f25b539..405fb28 100644
--- a/ui/src/common/recordingV2/targets/android_webusb_target.ts
+++ b/ui/src/common/recordingV2/targets/android_webusb_target.ts
@@ -52,7 +52,7 @@
         assertExists(this.device.serialNumber) + ' WebUsb';
     return {
       targetType: 'ANDROID',
-      // The method 'fetchInfo' will populate this after ADB authorization.
+      // 'androidApiLevel' will be populated after ADB authorization.
       androidApiLevel: this.androidApiLevel,
       dataSources: this.dataSources || [],
       name,
diff --git a/ui/src/common/recordingV2/traced_tracing_session.ts b/ui/src/common/recordingV2/traced_tracing_session.ts
index 20f6fe3..a2708b6 100644
--- a/ui/src/common/recordingV2/traced_tracing_session.ts
+++ b/ui/src/common/recordingV2/traced_tracing_session.ts
@@ -151,7 +151,9 @@
   }
 
   async getTraceBufferUsage(): Promise<number> {
-    if (!this.byteStream.isOpen()) {
+    if (!this.byteStream.isConnected()) {
+      // TODO(octaviant): make this more in line with the other trace buffer
+      //  error cases.
       return 0;
     }
     const bufferStats = await this.getBufferStats();
@@ -174,7 +176,7 @@
     return percentageUsed;
   }
 
-  async initConnection(): Promise<void> {
+  initConnection(): Promise<void> {
     // bind IPC methods
     const requestId = this.requestId++;
     const frame = new IPCFrame({
@@ -223,7 +225,7 @@
   }
 
   private rpcInvoke(methodName: string, argsProto: Uint8Array): void {
-    if (!this.byteStream.isOpen()) {
+    if (!this.byteStream.isConnected()) {
       return;
     }
     const method = this.availableMethods.find((m) => m.name === methodName);
diff --git a/ui/src/service_worker/tsconfig.json b/ui/src/service_worker/tsconfig.json
index 35ff5b6..1e77d1d 100644
--- a/ui/src/service_worker/tsconfig.json
+++ b/ui/src/service_worker/tsconfig.json
@@ -8,7 +8,7 @@
     "outDir": "../../out/tsc/service_worker",
     "lib": [
       "webworker",
-      "es2018",
+      "es2020",
     ],
     "types" : []
   }
diff --git a/ui/tsconfig.json b/ui/tsconfig.json
index 5867aea..5fd4e3d 100644
--- a/ui/tsconfig.json
+++ b/ui/tsconfig.json
@@ -11,7 +11,7 @@
     "outDir": "./out/tsc",
     "lib": [
       "dom",                               // Need to be explicitly mentioned now since we're overriding default included libs.
-      "es2018",                            // Need this to use Object.values.
+      "es2020",                            // Need this to use Promise.allSettled.
     ],
     "paths": {
       "*" : ["*", "./node_modules/@tsundoku/micromodal_types/*"]