Consolidate stream logic under DeviceMessageStream

Bug: 162361398
Test: Association and reconnect succeed
Change-Id: Ia3b251aa1b4369f667183a1afe5317196eb6ac55
diff --git a/connected-device-lib/Android.bp b/connected-device-lib/Android.bp
index 55ee640..adbdf0c 100644
--- a/connected-device-lib/Android.bp
+++ b/connected-device-lib/Android.bp
@@ -21,8 +21,6 @@
 
     manifest: "AndroidManifest.xml",
 
-    resource_dirs: ["res"],
-
     optimize: {
         enabled: false,
     },
diff --git a/connected-device-lib/proto/ble_device_message.proto b/connected-device-lib/proto/device_message.proto
similarity index 93%
rename from connected-device-lib/proto/ble_device_message.proto
rename to connected-device-lib/proto/device_message.proto
index b83ed42..79b98df 100644
--- a/connected-device-lib/proto/ble_device_message.proto
+++ b/connected-device-lib/proto/device_message.proto
@@ -24,8 +24,7 @@
 option java_outer_classname = "DeviceMessageProto";
 
 // A message between devices.
-// TODO(b/160163265): Rename the proto to a more generic name.
-message BleDeviceMessage {
+message Message {
   // The operation that this message represents.
   OperationType operation = 1;
 
diff --git a/connected-device-lib/proto/ble_packet.proto b/connected-device-lib/proto/packet.proto
similarity index 85%
rename from connected-device-lib/proto/ble_packet.proto
rename to connected-device-lib/proto/packet.proto
index c2ce262..f761622 100644
--- a/connected-device-lib/proto/ble_packet.proto
+++ b/connected-device-lib/proto/packet.proto
@@ -18,11 +18,11 @@
 
 package com.android.car.connecteddevice.proto;
 
-option java_package = "com.android.car.connecteddevice.BleStreamProtos";
-option java_outer_classname = "BlePacketProto";
+option java_package = "com.android.car.connecteddevice.StreamProtos";
+option java_outer_classname = "PacketProto";
 
-// A packet across a BLE channel.
-message BlePacket {
+// A packet across a stream.
+message Packet {
   // A 1-based packet number. The first message will have a value of "1" rather
   // than "0".
   fixed32 packet_number = 1;
diff --git a/connected-device-lib/proto/ble_version_exchange.proto b/connected-device-lib/proto/version_exchange.proto
similarity index 97%
rename from connected-device-lib/proto/ble_version_exchange.proto
rename to connected-device-lib/proto/version_exchange.proto
index f2b80a6..a7c3493 100644
--- a/connected-device-lib/proto/ble_version_exchange.proto
+++ b/connected-device-lib/proto/version_exchange.proto
@@ -21,7 +21,7 @@
 option java_package = "com.android.car.connecteddevice.StreamProtos";
 option java_outer_classname = "VersionExchangeProto";
 
-message BleVersionExchange {
+message VersionExchange {
   // Minimum supported protobuf version.
   int32 minSupportedMessagingVersion = 1;
 
diff --git a/connected-device-lib/res/values/config.xml b/connected-device-lib/res/values/config.xml
index 2653a98..c719462 100644
--- a/connected-device-lib/res/values/config.xml
+++ b/connected-device-lib/res/values/config.xml
@@ -16,27 +16,5 @@
   -->
 
 <resources xmlns:xliff="urn:oasis:names:tc:xliff:document:1.2">
-    <!-- These values must be modified for the connected device lib to function properly.-->
-    <string name="car_service_uuid" translatable="false">00000000-0000-0000-0000-000000000000</string>
-    <string name="car_association_service_uuid" translatable="false">00000000-0000-0000-0000-000000000000</string>
-    <string name="car_reconnect_service_uuid" translatable="false">00000000-0000-0000-0000-000000000000</string>
-    <string name="car_reconnect_data_uuid" translatable="false">00000000-0000-0000-0000-000000000000</string>
-    <string name="car_bg_mask" translatable="false">00000000000000000000000000000000</string>
-
-    <string name="car_secure_read_uuid" translatable="false">5e2a68a6-27be-43f9-8d1e-4546976fabd7</string>
-    <string name="car_secure_write_uuid" translatable="false">5e2a68a5-27be-43f9-8d1e-4546976fabd7</string>
-
     <string name="connected_device_shared_preferences" translatable="false">com.android.car.connecteddevice</string>
-
-    <string name="car_spp_service_uuid_secure" translatable="false">00000000-0000-0000-0000-000000000000</string>
-    <string name="car_spp_service_uuid_insecure" translatable="false">00000000-0000-0000-0000-000000000000</string>
-
-    <bool name="enable_secure_rfcomm_channel">true</bool>
-    <bool name="enable_spp_support">false</bool>
-    <!--
-    This value must be between 23 and 185. 23 is the default MTU size for Android, and 185 is
-    the max MTU size supported for iOS. Verify your device and target companion devices support a
-    larger MTU prior to modifying.
-    -->
-    <integer name="car_default_mtu_size">23</integer>
 </resources>
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/CarBluetoothManager.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/CarBluetoothManager.java
index 7671787..eec53c1 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/CarBluetoothManager.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/CarBluetoothManager.java
@@ -43,7 +43,7 @@
  */
 public abstract class CarBluetoothManager {
 
-    private static final String TAG = "CarConnectionManager";
+    private static final String TAG = "CarBluetoothManager";
 
     protected final ConnectedDeviceStorage mStorage;
 
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessage.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessage.java
index af487d6..5b2b1b1 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessage.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessage.java
@@ -16,7 +16,7 @@
 
 package com.android.car.connecteddevice.connection;
 
-import static com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.BleDeviceMessage;
+import static com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.Message;
 
 import android.annotation.NonNull;
 import android.annotation.Nullable;
@@ -25,7 +25,7 @@
 import java.util.Objects;
 import java.util.UUID;
 
-/** Holds the needed data from a {@link BleDeviceMessage}. */
+/** Holds the needed data from a {@link Message}. */
 public class DeviceMessage {
 
     private static final String TAG = "DeviceMessage";
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessageStream.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessageStream.java
index 9e37a7f..d2cd59f 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessageStream.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/DeviceMessageStream.java
@@ -16,26 +16,77 @@
 
 package com.android.car.connecteddevice.connection;
 
+import static com.android.car.connecteddevice.util.SafeLog.logd;
+import static com.android.car.connecteddevice.util.SafeLog.loge;
+import static com.android.car.connecteddevice.util.SafeLog.logw;
+
+import android.annotation.IntRange;
 import android.annotation.NonNull;
 import android.annotation.Nullable;
 
-import com.android.car.connecteddevice.StreamProtos.OperationProto;
+import com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.Message;
+import com.android.car.connecteddevice.StreamProtos.OperationProto.OperationType;
+import com.android.car.connecteddevice.StreamProtos.PacketProto.Packet;
+import com.android.car.connecteddevice.StreamProtos.VersionExchangeProto.VersionExchange;
+import com.android.car.connecteddevice.util.ByteUtils;
+import com.android.car.protobuf.ByteString;
+import com.android.car.protobuf.InvalidProtocolBufferException;
+import com.android.internal.annotations.VisibleForTesting;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Abstract class which includes common logic of different types of {@link DeviceMessageStream}.
  */
 public abstract class DeviceMessageStream {
 
-    /**
-     * Listener which will be notified when there is new {@link DeviceMessage} received.
-     */
+    private static final String TAG = "DeviceMessageStream";
+
+    // Only version 2 of the messaging and version 2 of the security supported.
+    @VisibleForTesting
+    public static final int MESSAGING_VERSION = 2;
+    @VisibleForTesting
+    public static final int SECURITY_VERSION = 2;
+
+    private final ArrayDeque<Packet> mPacketQueue = new ArrayDeque<>();
+
+    private final Map<Integer, ByteArrayOutputStream> mPendingData = new HashMap<>();
+
+    // messageId -> nextExpectedPacketNumber
+    private final Map<Integer, Integer> mPendingPacketNumber = new HashMap<>();
+
+    private final MessageIdGenerator mMessageIdGenerator = new MessageIdGenerator();
+
+    private final AtomicBoolean mIsVersionExchanged = new AtomicBoolean(false);
+
+    private final AtomicBoolean mIsSendingInProgress = new AtomicBoolean(false);
+
+    private int mMaxWriteSize;
+
+    /** Listener which will be notified when there is new {@link DeviceMessage} received. */
     private MessageReceivedListener mMessageReceivedListener;
 
-    /**
-     * Listener which will be notified when there is error parsing the received message.
-     */
+    /** Listener which will be notified when there is error parsing the received message. */
     private MessageReceivedErrorListener mMessageReceivedErrorListener;
 
+    public DeviceMessageStream(int defaultMaxWriteSize) {
+        mMaxWriteSize = defaultMaxWriteSize;
+    }
+
+    /**
+     * Send data to the connected device. Note: {@link #sendCompleted()} must be called when the
+     * bytes have successfully been sent to indicate the stream is ready to send more data.
+     */
+    protected abstract void send(byte[] data);
+
     /**
      * Set the given listener to be notified when a new message was received from the client. If
      * listener is {@code null}, clear.
@@ -61,7 +112,7 @@
      * @param operationType The operation type of the message.
      */
     protected final void notifyMessageReceivedListener(@NonNull DeviceMessage deviceMessage,
-            OperationProto.OperationType operationType) {
+            OperationType operationType) {
         if (mMessageReceivedListener != null) {
             mMessageReceivedListener.onMessageReceived(deviceMessage, operationType);
         }
@@ -80,13 +131,207 @@
     }
 
     /**
+     * Writes the given message to the write characteristic of this stream with operation type
+     * {@code CLIENT_MESSAGE}.
+     *
+     * This method will handle the chunking of messages based on the max write size.
+     *
+     * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message.
+     */
+    public final void writeMessage(@NonNull DeviceMessage deviceMessage) {
+        writeMessage(deviceMessage, OperationType.CLIENT_MESSAGE);
+    }
+
+    /**
      * Send {@link DeviceMessage} to remote connected devices.
      *
      * @param deviceMessage The message which need to be sent
      * @param operationType The operation type of current message
      */
-    public abstract void writeMessage(@NonNull DeviceMessage deviceMessage,
-            OperationProto.OperationType operationType);
+    public final void writeMessage(@NonNull DeviceMessage deviceMessage,
+            OperationType operationType) {
+        Message.Builder builder = Message.newBuilder()
+                .setOperation(operationType)
+                .setIsPayloadEncrypted(deviceMessage.isMessageEncrypted())
+                .setPayload(ByteString.copyFrom(deviceMessage.getMessage()));
+
+        UUID recipient = deviceMessage.getRecipient();
+        if (recipient != null) {
+            builder.setRecipient(ByteString.copyFrom(ByteUtils.uuidToBytes(recipient)));
+        }
+
+        Message message = builder.build();
+        byte[] rawBytes = message.toByteArray();
+        List<Packet> packets;
+        try {
+            packets = PacketFactory.makePackets(rawBytes, mMessageIdGenerator.next(),
+                    mMaxWriteSize);
+        } catch (PacketFactoryException e) {
+            loge(TAG, "Error while creating message packets.", e);
+            return;
+        }
+        mPacketQueue.addAll(packets);
+        writeNextMessageInQueue();
+    }
+
+    private void writeNextMessageInQueue() {
+        if (mPacketQueue.isEmpty()) {
+            logd(TAG, "No more packets to send.");
+            return;
+        }
+        boolean isLockAcquired = mIsSendingInProgress.compareAndSet(false, true);
+        if (!isLockAcquired) {
+            logd(TAG, "Unable to send packet at this time.");
+            return;
+        }
+
+        Packet packet = mPacketQueue.remove();
+        logd(TAG, "Writing packet " + packet.getPacketNumber() + " of "
+                + packet.getTotalPackets() + " for " + packet.getMessageId() + ".");
+        send(packet.toByteArray());
+    }
+
+    /** Process incoming data from stream. */
+    protected final void onDataReceived(byte[] data) {
+        if (!hasVersionBeenExchanged()) {
+            processVersionExchange(data);
+            return;
+        }
+
+        Packet packet;
+        try {
+            packet = Packet.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            loge(TAG, "Can not parse packet from client.", e);
+            notifyMessageReceivedErrorListener(e);
+            return;
+        }
+        processPacket(packet);
+    }
+
+    protected final void processPacket(@NonNull Packet packet) {
+        int messageId = packet.getMessageId();
+        int packetNumber = packet.getPacketNumber();
+        int expectedPacket = mPendingPacketNumber.getOrDefault(messageId, 1);
+        if (packetNumber == expectedPacket - 1) {
+            logw(TAG, "Received duplicate packet " + packet.getPacketNumber() + " for message "
+                    + messageId + ". Ignoring.");
+            return;
+        }
+        if (packetNumber != expectedPacket) {
+            loge(TAG, "Received unexpected packet " + packetNumber + " for message "
+                    + messageId + ".");
+            notifyMessageReceivedErrorListener(
+                    new IllegalStateException("Packet received out of order."));
+            return;
+        }
+        mPendingPacketNumber.put(messageId, packetNumber + 1);
+
+        ByteArrayOutputStream currentPayloadStream =
+                mPendingData.getOrDefault(messageId, new ByteArrayOutputStream());
+        mPendingData.putIfAbsent(messageId, currentPayloadStream);
+
+        byte[] payload = packet.getPayload().toByteArray();
+        try {
+            currentPayloadStream.write(payload);
+        } catch (IOException e) {
+            loge(TAG, "Error writing packet to stream.", e);
+            notifyMessageReceivedErrorListener(e);
+            return;
+        }
+        logd(TAG, "Parsed packet " + packet.getPacketNumber() + " of "
+                + packet.getTotalPackets() + " for message " + messageId + ". Writing "
+                + payload.length + ".");
+
+        if (packet.getPacketNumber() != packet.getTotalPackets()) {
+            return;
+        }
+
+        byte[] messageBytes = currentPayloadStream.toByteArray();
+        mPendingData.remove(messageId);
+
+        logd(TAG, "Received complete device message " + messageId + " of " + messageBytes.length
+                + " bytes.");
+        Message message;
+        try {
+            message = Message.parseFrom(messageBytes);
+        } catch (InvalidProtocolBufferException e) {
+            loge(TAG, "Cannot parse device message from client.", e);
+            notifyMessageReceivedErrorListener(e);
+            return;
+        }
+
+        DeviceMessage deviceMessage = new DeviceMessage(
+                ByteUtils.bytesToUUID(message.getRecipient().toByteArray()),
+                message.getIsPayloadEncrypted(), message.getPayload().toByteArray());
+        notifyMessageReceivedListener(deviceMessage, message.getOperation());
+    }
+
+    /** The maximum amount of bytes that can be written in a single packet. */
+    public final void setMaxWriteSize(@IntRange(from = 1) int maxWriteSize) {
+        if (maxWriteSize <= 0) {
+            return;
+        }
+        mMaxWriteSize = maxWriteSize;
+    }
+
+    private boolean hasVersionBeenExchanged() {
+        return mIsVersionExchanged.get();
+    }
+
+    /** Indicate current send operation has completed. */
+    @VisibleForTesting
+    public final void sendCompleted() {
+        mIsSendingInProgress.set(false);
+        writeNextMessageInQueue();
+    }
+
+    private void processVersionExchange(@NonNull byte[] value) {
+        VersionExchange versionExchange;
+        try {
+            versionExchange = VersionExchange.parseFrom(value);
+        } catch (InvalidProtocolBufferException e) {
+            loge(TAG, "Could not parse version exchange message", e);
+            notifyMessageReceivedErrorListener(e);
+
+            return;
+        }
+        int minMessagingVersion = versionExchange.getMinSupportedMessagingVersion();
+        int maxMessagingVersion = versionExchange.getMaxSupportedMessagingVersion();
+        int minSecurityVersion = versionExchange.getMinSupportedSecurityVersion();
+        int maxSecurityVersion = versionExchange.getMaxSupportedSecurityVersion();
+        if (minMessagingVersion > MESSAGING_VERSION || maxMessagingVersion < MESSAGING_VERSION
+                || minSecurityVersion > SECURITY_VERSION || maxSecurityVersion < SECURITY_VERSION) {
+            loge(TAG, "Unsupported message version for min " + minMessagingVersion + " and max "
+                    + maxMessagingVersion + " or security version for " + minSecurityVersion
+                    + " and max " + maxSecurityVersion + ".");
+            notifyMessageReceivedErrorListener(new IllegalStateException("Unsupported version."));
+            return;
+        }
+
+        VersionExchange headunitVersion = VersionExchange.newBuilder()
+                .setMinSupportedMessagingVersion(MESSAGING_VERSION)
+                .setMaxSupportedMessagingVersion(MESSAGING_VERSION)
+                .setMinSupportedSecurityVersion(SECURITY_VERSION)
+                .setMaxSupportedSecurityVersion(SECURITY_VERSION)
+                .build();
+
+        send(headunitVersion.toByteArray());
+        mIsVersionExchanged.set(true);
+        logd(TAG, "Sent supported version to the phone.");
+    }
+
+
+    /** A generator of unique IDs for messages. */
+    private static class MessageIdGenerator {
+        private final AtomicInteger mMessageId = new AtomicInteger(0);
+
+        int next() {
+            int current = mMessageId.getAndIncrement();
+            mMessageId.compareAndSet(Integer.MAX_VALUE, 0);
+            return current;
+        }
+    }
 
     /**
      * Listener to be invoked when a complete message is received from the client.
@@ -97,10 +342,10 @@
          * Called when a complete message is received from the client.
          *
          * @param deviceMessage The message received from the client.
-         * @param operationType The {@link OperationProto.OperationType} of the received message.
+         * @param operationType The {@link OperationType} of the received message.
          */
         void onMessageReceived(@NonNull DeviceMessage deviceMessage,
-                OperationProto.OperationType operationType);
+                OperationType operationType);
     }
 
     /**
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BlePacketFactory.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/PacketFactory.java
similarity index 80%
rename from connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BlePacketFactory.java
rename to connected-device-lib/src/com/android/car/connecteddevice/connection/PacketFactory.java
index 7b1115b..608469b 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BlePacketFactory.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/PacketFactory.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2019 The Android Open Source Project
+ * Copyright (C) 2020 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,11 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.android.car.connecteddevice.connection.ble;
+package com.android.car.connecteddevice.connection;
 
 import static com.android.car.connecteddevice.util.SafeLog.loge;
 
-import com.android.car.connecteddevice.BleStreamProtos.BlePacketProto.BlePacket;
+import com.android.car.connecteddevice.StreamProtos.PacketProto.Packet;
 import com.android.car.protobuf.ByteString;
 import com.android.internal.annotations.VisibleForTesting;
 
@@ -26,10 +26,10 @@
 import java.util.List;
 
 /**
- * Factory for creating {@link BlePacket} protos.
+ * Factory for creating {@link Packet} protos.
  */
-class BlePacketFactory {
-    private static final String TAG = "BlePacketFactory";
+class PacketFactory {
+    private static final String TAG = "PacketFactory";
 
     /**
      * The size in bytes of a {@code fixed32} field in the proto.
@@ -39,7 +39,7 @@
     /**
      * The bytes needed to encode the field number in the proto.
      *
-     * <p>Since the {@link BlePacket} only has 4 fields, it will only take 1 additional byte to
+     * <p>Since the {@link Packet} only has 4 fields, it will only take 1 additional byte to
      * encode.
      */
     private static final int FIELD_NUMBER_ENCODING_SIZE = 1;
@@ -53,15 +53,15 @@
     /**
      * Split given data if necessary to fit within the given {@code maxSize}.
      *
-     * @param payload The payload to potentially split across multiple {@link BlePacket}s.
+     * @param payload The payload to potentially split across multiple {@link Packet}s.
      * @param messageId The unique id for identifying message.
      * @param maxSize The maximum size of each chunk.
-     * @return A list of {@link BlePacket}s.
-     * @throws BlePacketFactoryException if an error occurred during the splitting of data.
+     * @return A list of {@link Packet}s.
+     * @throws PacketFactoryException if an error occurred during the splitting of data.
      */
-    static List<BlePacket> makeBlePackets(byte[] payload, int messageId, int maxSize)
-            throws BlePacketFactoryException {
-        List<BlePacket> blePackets = new ArrayList<>();
+    static List<Packet> makePackets(byte[] payload, int messageId, int maxSize)
+            throws PacketFactoryException {
+        List<Packet> blePackets = new ArrayList<>();
         int payloadSize = payload.length;
         int totalPackets = getTotalPacketNumber(messageId, payloadSize, maxSize);
         int maxPayloadSize = maxSize
@@ -70,7 +70,7 @@
         int start = 0;
         int end = Math.min(payloadSize, maxPayloadSize);
         for (int packetNum = 1; packetNum <= totalPackets; packetNum++) {
-            blePackets.add(BlePacket.newBuilder()
+            blePackets.add(Packet.newBuilder()
                     .setPacketNumber(packetNum)
                     .setTotalPackets(totalPackets)
                     .setMessageId(messageId)
@@ -83,7 +83,7 @@
     }
 
     /**
-     * Compute the header size for the {@link BlePacket} proto in bytes. This method assumes that
+     * Compute the header size for the {@link Packet} proto in bytes. This method assumes that
      * the proto contains a payload.
      */
     @VisibleForTesting
@@ -99,7 +99,7 @@
      */
     @VisibleForTesting
     static int getTotalPacketNumber(int messageId, int payloadSize, int maxSize)
-            throws BlePacketFactoryException {
+            throws PacketFactoryException {
         int headerSizeWithoutTotalPackets = FIXED_32_SIZE + FIELD_NUMBER_ENCODING_SIZE
                 + getEncodedSize(messageId) + FIELD_NUMBER_ENCODING_SIZE
                 + getEncodedSize(Math.min(payloadSize, maxSize)) + FIELD_NUMBER_ENCODING_SIZE;
@@ -109,7 +109,7 @@
                     + FIELD_NUMBER_ENCODING_SIZE;
             int maxPayloadSize = maxSize - packetHeaderSize;
             if (maxPayloadSize < 0) {
-                throw new BlePacketFactoryException("Packet header size too large.");
+                throw new PacketFactoryException("Packet header size too large.");
             }
             int totalPackets = (int) Math.ceil(payloadSize / (double) maxPayloadSize);
             if (getEncodedSize(totalPackets) == value) {
@@ -119,7 +119,7 @@
 
         loge(TAG, "Cannot get valid total packet number for message: messageId: "
                 + messageId + ", payloadSize: " + payloadSize + ", maxSize: " + maxSize);
-        throw new BlePacketFactoryException("No valid total packet number.");
+        throw new PacketFactoryException("No valid total packet number.");
     }
 
     /**
@@ -151,5 +151,5 @@
         return 5;
     }
 
-    private BlePacketFactory() {}
+    private PacketFactory() {}
 }
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BlePacketFactoryException.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/PacketFactoryException.java
similarity index 69%
rename from connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BlePacketFactoryException.java
rename to connected-device-lib/src/com/android/car/connecteddevice/connection/PacketFactoryException.java
index ec3726e..41c11fc 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BlePacketFactoryException.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/PacketFactoryException.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2019 The Android Open Source Project
+ * Copyright (C) 2020 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,13 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.android.car.connecteddevice.connection.ble;
+package com.android.car.connecteddevice.connection;
 
 /**
- * Exception for signaling {@link BlePacketFactory} errors.
+ * Exception for signaling {@link PacketFactory} errors.
  */
-class BlePacketFactoryException extends Exception {
-    BlePacketFactoryException(String message) {
+class PacketFactoryException extends Exception {
+    PacketFactoryException(String message) {
         super(message);
     }
 }
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BleDeviceMessageStream.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BleDeviceMessageStream.java
index 2ae1c0f..e715750 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BleDeviceMessageStream.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/ble/BleDeviceMessageStream.java
@@ -16,11 +16,7 @@
 
 package com.android.car.connecteddevice.connection.ble;
 
-import static com.android.car.connecteddevice.BleStreamProtos.BlePacketProto.BlePacket;
-import static com.android.car.connecteddevice.StreamProtos.OperationProto.OperationType;
-import static com.android.car.connecteddevice.StreamProtos.VersionExchangeProto.BleVersionExchange;
 import static com.android.car.connecteddevice.util.SafeLog.logd;
-import static com.android.car.connecteddevice.util.SafeLog.loge;
 import static com.android.car.connecteddevice.util.SafeLog.logw;
 
 import android.annotation.NonNull;
@@ -29,23 +25,8 @@
 import android.os.Handler;
 import android.os.Looper;
 
-import com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.BleDeviceMessage;
-import com.android.car.connecteddevice.connection.DeviceMessage;
 import com.android.car.connecteddevice.connection.DeviceMessageStream;
-import com.android.car.connecteddevice.util.ByteUtils;
-import com.android.car.protobuf.ByteString;
-import com.android.car.protobuf.InvalidProtocolBufferException;
-import com.android.internal.annotations.VisibleForTesting;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /** BLE message stream to a device. */
@@ -53,10 +34,6 @@
 
     private static final String TAG = "BleDeviceMessageStream";
 
-    // Only version 2 of the messaging and version 2 of the security supported.
-    private static final int MESSAGING_VERSION = 2;
-    private static final int SECURITY_VERSION = 2;
-
     /*
      * During bandwidth testing, it was discovered that allowing the stream to send as fast as it
      * can blocked outgoing notifications from being received by the connected device. Adding a
@@ -66,21 +43,8 @@
     private static final long THROTTLE_DEFAULT_MS = 10L;
     private static final long THROTTLE_WAIT_MS = 75L;
 
-    private final ArrayDeque<BlePacket> mPacketQueue = new ArrayDeque<>();
-
-    private final Map<Integer, ByteArrayOutputStream> mPendingData = new HashMap<>();
-
-    // messageId -> nextExpectedPacketNumber
-    private final Map<Integer, Integer> mPendingPacketNumber = new HashMap<>();
-
-    private final MessageIdGenerator mMessageIdGenerator = new MessageIdGenerator();
-
     private final Handler mHandler = new Handler(Looper.getMainLooper());
 
-    private final AtomicBoolean mIsVersionExchanged = new AtomicBoolean(false);
-
-    private final AtomicBoolean mIsSendingInProgress = new AtomicBoolean(false);
-
     private final AtomicLong mThrottleDelay = new AtomicLong(THROTTLE_DEFAULT_MS);
 
     private final BlePeripheralManager mBlePeripheralManager;
@@ -91,91 +55,25 @@
 
     private final BluetoothGattCharacteristic mReadCharacteristic;
 
-    private int mMaxWriteSize;
-
     BleDeviceMessageStream(@NonNull BlePeripheralManager blePeripheralManager,
             @NonNull BluetoothDevice device,
             @NonNull BluetoothGattCharacteristic writeCharacteristic,
             @NonNull BluetoothGattCharacteristic readCharacteristic,
             int defaultMaxWriteSize) {
+        super(defaultMaxWriteSize);
         mBlePeripheralManager = blePeripheralManager;
         mDevice = device;
         mWriteCharacteristic = writeCharacteristic;
         mReadCharacteristic = readCharacteristic;
         mBlePeripheralManager.addOnCharacteristicWriteListener(this::onCharacteristicWrite);
         mBlePeripheralManager.addOnCharacteristicReadListener(this::onCharacteristicRead);
-        mMaxWriteSize = defaultMaxWriteSize;
     }
 
-    /**
-     * Writes the given message to the write characteristic of this stream with operation type
-     * {@code CLIENT_MESSAGE}.
-     *
-     * This method will handle the chunking of messages based on the max write size.
-     *
-     * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message.
-     */
-    void writeMessage(@NonNull DeviceMessage deviceMessage) {
-        writeMessage(deviceMessage, OperationType.CLIENT_MESSAGE);
-    }
-
-    /**
-     * Writes the given message to the write characteristic of this stream.
-     *
-     * This method will handle the chunking of messages based on the max write size. If it is
-     * a handshake message, the message recipient should be {@code null} and it cannot be
-     * encrypted.
-     *
-     * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message.
-     * @param operationType The {@link OperationType} of this message.
-     */
     @Override
-    public void writeMessage(@NonNull DeviceMessage deviceMessage, OperationType operationType) {
-        logd(TAG, "Writing message with " + deviceMessage.getMessage().length + " bytes to device: "
-                + mDevice.getAddress() + ".");
-        BleDeviceMessage.Builder builder = BleDeviceMessage.newBuilder()
-                .setOperation(operationType)
-                .setIsPayloadEncrypted(deviceMessage.isMessageEncrypted())
-                .setPayload(ByteString.copyFrom(deviceMessage.getMessage()));
-
-        UUID recipient = deviceMessage.getRecipient();
-        if (recipient != null) {
-            builder.setRecipient(ByteString.copyFrom(ByteUtils.uuidToBytes(recipient)));
-        }
-
-        BleDeviceMessage bleDeviceMessage = builder.build();
-        byte[] rawBytes = bleDeviceMessage.toByteArray();
-        List<BlePacket> blePackets;
-        try {
-            blePackets = BlePacketFactory.makeBlePackets(rawBytes, mMessageIdGenerator.next(),
-                    mMaxWriteSize);
-        } catch (BlePacketFactoryException e) {
-            loge(TAG, "Error while creating message packets.", e);
-            return;
-        }
-        mPacketQueue.addAll(blePackets);
-        writeNextMessageInQueue();
-    }
-
-    private void writeNextMessageInQueue() {
-        mHandler.postDelayed(() -> {
-            if (mPacketQueue.isEmpty()) {
-                logd(TAG, "No more packets to send.");
-                return;
-            }
-            boolean isLockAcquired = mIsSendingInProgress.compareAndSet(false, true);
-            if (!isLockAcquired) {
-                logd(TAG, "Unable to send packet at this time.");
-                return;
-            }
-
-            BlePacket packet = mPacketQueue.remove();
-            logd(TAG, "Writing packet " + packet.getPacketNumber() + " of "
-                    + packet.getTotalPackets() + " for " + packet.getMessageId() + ".");
-            mWriteCharacteristic.setValue(packet.toByteArray());
-            mBlePeripheralManager.notifyCharacteristicChanged(mDevice, mWriteCharacteristic,
-                    /* confirm= */ false);
-        }, mThrottleDelay.get());
+    protected void send(byte[] data) {
+        mWriteCharacteristic.setValue(data);
+        mBlePeripheralManager.notifyCharacteristicChanged(mDevice, mWriteCharacteristic,
+                /* confirm= */ false);
     }
 
     private void onCharacteristicRead(@NonNull BluetoothDevice device) {
@@ -187,8 +85,7 @@
         }
 
         logd(TAG, "Releasing lock on characteristic.");
-        mIsSendingInProgress.set(false);
-        writeNextMessageInQueue();
+        sendCompleted();
     }
 
     private void onCharacteristicWrite(@NonNull BluetoothDevice device,
@@ -207,142 +104,6 @@
                     + "Ignoring.");
             return;
         }
-
-        if (!mIsVersionExchanged.get()) {
-            processVersionExchange(device, value);
-            return;
-        }
-
-        BlePacket packet;
-        try {
-            packet = BlePacket.parseFrom(value);
-        } catch (InvalidProtocolBufferException e) {
-            loge(TAG, "Can not parse Ble packet from client.", e);
-            notifyMessageReceivedErrorListener(e);
-            return;
-        }
-        processPacket(packet);
-    }
-
-    private void processVersionExchange(@NonNull BluetoothDevice device, @NonNull byte[] value) {
-        BleVersionExchange versionExchange;
-        try {
-            versionExchange = BleVersionExchange.parseFrom(value);
-        } catch (InvalidProtocolBufferException e) {
-            loge(TAG, "Could not parse version exchange message", e);
-            notifyMessageReceivedErrorListener(e);
-
-            return;
-        }
-        int minMessagingVersion = versionExchange.getMinSupportedMessagingVersion();
-        int maxMessagingVersion = versionExchange.getMaxSupportedMessagingVersion();
-        int minSecurityVersion = versionExchange.getMinSupportedSecurityVersion();
-        int maxSecurityVersion = versionExchange.getMaxSupportedSecurityVersion();
-        if (minMessagingVersion > MESSAGING_VERSION || maxMessagingVersion < MESSAGING_VERSION
-                || minSecurityVersion > SECURITY_VERSION || maxSecurityVersion < SECURITY_VERSION) {
-            loge(TAG, "Unsupported message version for min " + minMessagingVersion + " and max "
-                    + maxMessagingVersion + " or security version for " + minSecurityVersion
-                    + " and max " + maxSecurityVersion + ".");
-            notifyMessageReceivedErrorListener(new IllegalStateException("Unsupported version."));
-            return;
-        }
-
-        BleVersionExchange headunitVersion = BleVersionExchange.newBuilder()
-                .setMinSupportedMessagingVersion(MESSAGING_VERSION)
-                .setMaxSupportedMessagingVersion(MESSAGING_VERSION)
-                .setMinSupportedSecurityVersion(SECURITY_VERSION)
-                .setMaxSupportedSecurityVersion(SECURITY_VERSION)
-                .build();
-        mWriteCharacteristic.setValue(headunitVersion.toByteArray());
-        mBlePeripheralManager.notifyCharacteristicChanged(device, mWriteCharacteristic,
-                /* confirm= */ false);
-        mIsVersionExchanged.set(true);
-        logd(TAG, "Sent supported version to the phone.");
-    }
-
-    @VisibleForTesting
-    void processPacket(@NonNull BlePacket packet) {
-        // Messages are coming in. Need to throttle outgoing messages to allow outgoing
-        // notifications to make it to the device.
-        mThrottleDelay.set(THROTTLE_WAIT_MS);
-
-        int messageId = packet.getMessageId();
-        int packetNumber = packet.getPacketNumber();
-        int expectedPacket = mPendingPacketNumber.getOrDefault(messageId, 1);
-        if (packetNumber == expectedPacket - 1) {
-            logw(TAG, "Received duplicate packet " + packet.getPacketNumber() + " for message "
-                    + messageId + ". Ignoring.");
-            return;
-        }
-        if (packetNumber != expectedPacket) {
-            loge(TAG, "Received unexpected packet " + packetNumber + " for message "
-                    + messageId + ".");
-            notifyMessageReceivedErrorListener(
-                    new IllegalStateException("Packet received out of order."));
-            return;
-        }
-        mPendingPacketNumber.put(messageId, packetNumber + 1);
-
-        ByteArrayOutputStream currentPayloadStream =
-                mPendingData.getOrDefault(messageId, new ByteArrayOutputStream());
-        mPendingData.putIfAbsent(messageId, currentPayloadStream);
-
-        byte[] payload = packet.getPayload().toByteArray();
-        try {
-            currentPayloadStream.write(payload);
-        } catch (IOException e) {
-            loge(TAG, "Error writing packet to stream.", e);
-            notifyMessageReceivedErrorListener(e);
-            return;
-        }
-        logd(TAG, "Parsed packet " + packet.getPacketNumber() + " of "
-                + packet.getTotalPackets() + " for message " + messageId + ". Writing "
-                + payload.length + ".");
-
-        if (packet.getPacketNumber() != packet.getTotalPackets()) {
-            return;
-        }
-
-        byte[] messageBytes = currentPayloadStream.toByteArray();
-        mPendingData.remove(messageId);
-
-        // All message packets received. Resetting throttle back to default until next message
-        // started.
-        mThrottleDelay.set(THROTTLE_DEFAULT_MS);
-
-        logd(TAG, "Received complete device message " + messageId + " of " + messageBytes.length
-                + " bytes.");
-        BleDeviceMessage message;
-        try {
-            message = BleDeviceMessage.parseFrom(messageBytes);
-        } catch (InvalidProtocolBufferException e) {
-            loge(TAG, "Cannot parse device message from client.", e);
-            notifyMessageReceivedErrorListener(e);
-            return;
-        }
-
-        DeviceMessage deviceMessage = new DeviceMessage(
-                ByteUtils.bytesToUUID(message.getRecipient().toByteArray()),
-                message.getIsPayloadEncrypted(), message.getPayload().toByteArray());
-        notifyMessageReceivedListener(deviceMessage, message.getOperation());
-    }
-
-    /** The maximum amount of bytes that can be written over BLE. */
-    void setMaxWriteSize(int maxWriteSize) {
-        if (maxWriteSize <= 0) {
-            return;
-        }
-        mMaxWriteSize = maxWriteSize;
-    }
-
-    /** A generator of unique IDs for messages. */
-    private static class MessageIdGenerator {
-        private final AtomicInteger mMessageId = new AtomicInteger(0);
-
-        int next() {
-            int current = mMessageId.getAndIncrement();
-            mMessageId.compareAndSet(Integer.MAX_VALUE, 0);
-            return current;
-        }
+        onDataReceived(value);
     }
 }
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/CarSppManager.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/CarSppManager.java
index 56bb13c..c4e9a55 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/CarSppManager.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/CarSppManager.java
@@ -52,11 +52,14 @@
 
     private final UUID mAssociationServiceUuid;
 
+    private final int mPacketMaxBytes;
+
     private String mReconnectDeviceId;
 
     private OobConnectionManager mOobConnectionManager;
 
     private Executor mCallbackExecutor;
+
     private AssociationCallback mAssociationCallback;
 
     /**
@@ -64,14 +67,17 @@
      *
      * @param sppManager             {@link SppManager} for establishing connection.
      * @param connectedDeviceStorage Shared {@link ConnectedDeviceStorage} for companion features.
+     * @param packetMaxBytes         Maximum size in bytes to write in one packet.
      */
     public CarSppManager(@NonNull SppManager sppManager,
             @NonNull ConnectedDeviceStorage connectedDeviceStorage,
-            @NonNull UUID associationServiceUuid) {
+            @NonNull UUID associationServiceUuid,
+            int packetMaxBytes) {
         super(connectedDeviceStorage);
         mSppManager = sppManager;
         mCallbackExecutor = Executors.newSingleThreadExecutor();
         mAssociationServiceUuid = associationServiceUuid;
+        mPacketMaxBytes = packetMaxBytes;
     }
 
     @Override
@@ -176,7 +182,8 @@
         EventLog.onDeviceConnected();
         setClientDeviceAddress(device.getAddress());
         setClientDeviceName(device.getName());
-        DeviceMessageStream secureStream = new SppDeviceMessageStream(mSppManager, device);
+        DeviceMessageStream secureStream = new SppDeviceMessageStream(mSppManager, device,
+                mPacketMaxBytes);
         secureStream.setMessageReceivedErrorListener(
                 exception -> {
                     disconnectWithError("Error occurred in stream: " + exception.getMessage(),
diff --git a/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStream.java b/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStream.java
index 1b31dd5..6a08ed6 100644
--- a/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStream.java
+++ b/connected-device-lib/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStream.java
@@ -16,27 +16,17 @@
 
 package com.android.car.connecteddevice.connection.spp;
 
-import static com.android.car.connecteddevice.StreamProtos.VersionExchangeProto.BleVersionExchange;
 import static com.android.car.connecteddevice.util.SafeLog.logd;
-import static com.android.car.connecteddevice.util.SafeLog.loge;
 import static com.android.car.connecteddevice.util.SafeLog.logw;
 
 import android.annotation.NonNull;
 import android.bluetooth.BluetoothDevice;
 
-import com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.BleDeviceMessage;
-import com.android.car.connecteddevice.StreamProtos.OperationProto;
-import com.android.car.connecteddevice.connection.DeviceMessage;
 import com.android.car.connecteddevice.connection.DeviceMessageStream;
-import com.android.car.connecteddevice.util.ByteUtils;
-import com.android.car.protobuf.ByteString;
-import com.android.car.protobuf.InvalidProtocolBufferException;
 import com.android.internal.annotations.VisibleForTesting;
 
-import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Spp message stream to a device.
@@ -45,45 +35,23 @@
 
     private static final String TAG = "SppDeviceMessageStream";
 
-    // Only version 2 of the messaging and version 2 of the security supported.
-    @VisibleForTesting
-    static final int MESSAGING_VERSION = 2;
-    @VisibleForTesting
-    static final int SECURITY_VERSION = 2;
-
-    private final AtomicBoolean mIsVersionExchanged = new AtomicBoolean(false);
     private final SppManager mSppManager;
     private final BluetoothDevice mDevice;
     private final Executor mCallbackExecutor = Executors.newSingleThreadExecutor();
 
 
     SppDeviceMessageStream(@NonNull SppManager sppManager,
-            @NonNull BluetoothDevice device) {
+            @NonNull BluetoothDevice device, int maxWriteSize) {
+        super(maxWriteSize);
         mSppManager = sppManager;
         mDevice = device;
         mSppManager.addOnMessageReceivedListener(this::onMessageReceived, mCallbackExecutor);
     }
 
-    /**
-     * Send the given message to remote connected bluetooth device.
-     *
-     * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message.
-     */
     @Override
-    public void writeMessage(@NonNull DeviceMessage deviceMessage,
-            OperationProto.OperationType operationType) {
-        BleDeviceMessage.Builder builder = BleDeviceMessage.newBuilder()
-                .setOperation(operationType)
-                .setIsPayloadEncrypted(deviceMessage.isMessageEncrypted())
-                .setPayload(ByteString.copyFrom(deviceMessage.getMessage()));
-
-        UUID recipient = deviceMessage.getRecipient();
-        if (recipient != null) {
-            builder.setRecipient(ByteString.copyFrom(ByteUtils.uuidToBytes(recipient)));
-        }
-
-        BleDeviceMessage bleDeviceMessage = builder.build();
-        mSppManager.write(bleDeviceMessage.toByteArray());
+    protected void send(byte[] data) {
+        mSppManager.write(data);
+        sendCompleted();
     }
 
     @VisibleForTesting
@@ -96,59 +64,6 @@
             return;
         }
 
-        if (!mIsVersionExchanged.get()) {
-            processVersionExchange(device, value);
-            return;
-        }
-
-        logd(TAG, "Received complete device message: " + value.length
-                + " bytes.");
-        BleDeviceMessage message;
-        try {
-            message = BleDeviceMessage.parseFrom(value);
-        } catch (InvalidProtocolBufferException e) {
-            loge(TAG, "Cannot parse device message from client.", e);
-            notifyMessageReceivedErrorListener(e);
-            return;
-        }
-
-        DeviceMessage deviceMessage = new DeviceMessage(
-                ByteUtils.bytesToUUID(message.getRecipient().toByteArray()),
-                message.getIsPayloadEncrypted(), message.getPayload().toByteArray());
-        notifyMessageReceivedListener(deviceMessage, message.getOperation());
-    }
-
-
-    private void processVersionExchange(@NonNull BluetoothDevice device, @NonNull byte[] value) {
-        BleVersionExchange versionExchange;
-        try {
-            versionExchange = BleVersionExchange.parseFrom(value);
-        } catch (InvalidProtocolBufferException e) {
-            loge(TAG, "Could not parse version exchange message", e);
-            notifyMessageReceivedErrorListener(e);
-            return;
-        }
-        int minMessagingVersion = versionExchange.getMinSupportedMessagingVersion();
-        int maxMessagingVersion = versionExchange.getMaxSupportedMessagingVersion();
-        int minSecurityVersion = versionExchange.getMinSupportedSecurityVersion();
-        int maxSecurityVersion = versionExchange.getMaxSupportedSecurityVersion();
-        if (minMessagingVersion > MESSAGING_VERSION || maxMessagingVersion < MESSAGING_VERSION
-                || minSecurityVersion > SECURITY_VERSION || maxSecurityVersion < SECURITY_VERSION) {
-            loge(TAG, "Unsupported message version for min " + minMessagingVersion + " and max "
-                    + maxMessagingVersion + " or security version for " + minSecurityVersion
-                    + " and max " + maxSecurityVersion + ".");
-            notifyMessageReceivedErrorListener(new IllegalStateException("Unsupported version."));
-            return;
-        }
-
-        BleVersionExchange headunitVersion = BleVersionExchange.newBuilder()
-                .setMinSupportedMessagingVersion(MESSAGING_VERSION)
-                .setMaxSupportedMessagingVersion(MESSAGING_VERSION)
-                .setMinSupportedSecurityVersion(SECURITY_VERSION)
-                .setMaxSupportedSecurityVersion(SECURITY_VERSION)
-                .build();
-        mSppManager.write(headunitVersion.toByteArray());
-        mIsVersionExchanged.set(true);
-        logd(TAG, "Sent supported version to the phone.");
+        onDataReceived(value);
     }
 }
diff --git a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/ble/BleDeviceMessageStreamTest.java b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/DeviceMessageStreamTest.java
similarity index 75%
rename from connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/ble/BleDeviceMessageStreamTest.java
rename to connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/DeviceMessageStreamTest.java
index 8970029..7c74c91 100644
--- a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/ble/BleDeviceMessageStreamTest.java
+++ b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/DeviceMessageStreamTest.java
@@ -14,11 +14,11 @@
  * limitations under the License.
  */
 
-package com.android.car.connecteddevice.connection.ble;
+package com.android.car.connecteddevice.connection;
 
-import static com.android.car.connecteddevice.BleStreamProtos.BlePacketProto.BlePacket;
-import static com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.BleDeviceMessage;
+import static com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.Message;
 import static com.android.car.connecteddevice.StreamProtos.OperationProto.OperationType;
+import static com.android.car.connecteddevice.StreamProtos.PacketProto.Packet;
 import static com.android.car.connecteddevice.connection.DeviceMessageStream.MessageReceivedErrorListener;
 import static com.android.car.connecteddevice.connection.DeviceMessageStream.MessageReceivedListener;
 
@@ -26,28 +26,20 @@
 import static com.google.common.truth.Truth.assertWithMessage;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mockitoSession;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import android.annotation.NonNull;
-import android.bluetooth.BluetoothDevice;
-import android.bluetooth.BluetoothGattCharacteristic;
 
 import androidx.test.ext.junit.runners.AndroidJUnit4;
 
-import com.android.car.connecteddevice.connection.DeviceMessage;
 import com.android.car.connecteddevice.util.ByteUtils;
 import com.android.car.protobuf.ByteString;
 
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.MockitoSession;
-import org.mockito.quality.Strictness;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,42 +49,18 @@
 import java.util.concurrent.TimeUnit;
 
 @RunWith(AndroidJUnit4.class)
-public class BleDeviceMessageStreamTest {
+public class DeviceMessageStreamTest {
 
-    private static final int PACKET_SIZE = 500;
+    private static final int WRITE_SIZE = 500;
 
-    private BleDeviceMessageStream mStream;
-
-    @Mock
-    private BlePeripheralManager mMockBlePeripheralManager;
-
-    @Mock
-    private BluetoothDevice mMockBluetoothDevice;
-
-    @Mock
-    private BluetoothGattCharacteristic mMockWriteCharacteristic;
-
-    @Mock
-    private BluetoothGattCharacteristic mMockReadCharacteristic;
-
-    private MockitoSession mMockingSession;
+    private DeviceMessageStream mStream;
 
     @Before
     public void setup() {
-        mMockingSession = mockitoSession()
-                .initMocks(this)
-                .strictness(Strictness.LENIENT)
-                .startMocking();
-
-        mStream = new BleDeviceMessageStream(mMockBlePeripheralManager, mMockBluetoothDevice,
-                mMockWriteCharacteristic, mMockReadCharacteristic, PACKET_SIZE);
-    }
-
-    @After
-    public void cleanup() {
-        if (mMockingSession != null) {
-            mMockingSession.finishMocking();
-        }
+        mStream = spy(new DeviceMessageStream(WRITE_SIZE) {
+            @Override
+            protected void send(byte[] data) { }
+        });
     }
 
     @Test
@@ -128,9 +96,9 @@
         Semaphore semaphore = new Semaphore(0);
         MessageReceivedListener listener = createMessageReceivedListener(semaphore);
         mStream.setMessageReceivedListener(listener);
-        byte[] data = ByteUtils.randomBytes((int) (PACKET_SIZE * 1.5));
-        List<BlePacket> packets1 = createPackets(data);
-        List<BlePacket> packets2 = createPackets(data);
+        byte[] data = ByteUtils.randomBytes((int) (WRITE_SIZE * 1.5));
+        List<Packet> packets1 = createPackets(data);
+        List<Packet> packets2 = createPackets(data);
 
         for (int i = 0; i < packets1.size(); i++) {
             mStream.processPacket(packets1.get(i));
@@ -158,8 +126,8 @@
         Semaphore semaphore = new Semaphore(0);
         mStream.setMessageReceivedListener(createMessageReceivedListener(semaphore));
         mStream.setMessageReceivedErrorListener(createMessageReceivedErrorListener(semaphore));
-        byte[] data = ByteUtils.randomBytes((int) (PACKET_SIZE * 1.5));
-        List<BlePacket> packets = createPackets(data);
+        byte[] data = ByteUtils.randomBytes((int) (WRITE_SIZE * 1.5));
+        List<Packet> packets = createPackets(data);
         for (int i = 0; i < packets.size() - 1; i++) {
             mStream.processPacket(packets.get(i));
         }
@@ -169,11 +137,11 @@
     @Test
     public void processPacket_ignoresDuplicatePacket() {
         Semaphore semaphore = new Semaphore(0);
-        byte[] data = ByteUtils.randomBytes((int) (PACKET_SIZE * 2.5));
+        byte[] data = ByteUtils.randomBytes((int) (WRITE_SIZE * 2.5));
         MessageReceivedListener listener = createMessageReceivedListener(semaphore);
         mStream.setMessageReceivedListener(listener);
         ArgumentCaptor<DeviceMessage> messageCaptor = ArgumentCaptor.forClass(DeviceMessage.class);
-        List<BlePacket> packets = createPackets(data);
+        List<Packet> packets = createPackets(data);
         for (int i = 0; i < packets.size(); i++) {
             mStream.processPacket(packets.get(i));
             mStream.processPacket(packets.get(i)); // Process each packet twice.
@@ -187,7 +155,7 @@
             throws InterruptedException {
         Semaphore semaphore = new Semaphore(0);
         mStream.setMessageReceivedErrorListener(createMessageReceivedErrorListener(semaphore));
-        List<BlePacket> packets = createPackets(ByteUtils.randomBytes((int) (PACKET_SIZE * 2.5)));
+        List<Packet> packets = createPackets(ByteUtils.randomBytes((int) (WRITE_SIZE * 2.5)));
         mStream.processPacket(packets.get(0));
         mStream.processPacket(packets.get(1));
         mStream.processPacket(packets.get(0));
@@ -199,20 +167,20 @@
             throws InterruptedException {
         Semaphore semaphore = new Semaphore(0);
         mStream.setMessageReceivedErrorListener(createMessageReceivedErrorListener(semaphore));
-        List<BlePacket> packets = createPackets(ByteUtils.randomBytes((int) (PACKET_SIZE * 1.5)));
+        List<Packet> packets = createPackets(ByteUtils.randomBytes((int) (WRITE_SIZE * 1.5)));
         mStream.processPacket(packets.get(1));
         assertThat(tryAcquire(semaphore)).isTrue();
     }
 
     @NonNull
-    private List<BlePacket> createPackets(byte[] data) {
+    private List<Packet> createPackets(byte[] data) {
         try {
-            BleDeviceMessage message = BleDeviceMessage.newBuilder()
+            Message message = Message.newBuilder()
                     .setPayload(ByteString.copyFrom(data))
                     .setOperation(OperationType.CLIENT_MESSAGE)
                     .build();
-            return BlePacketFactory.makeBlePackets(message.toByteArray(),
-                    ThreadLocalRandom.current().nextInt(), PACKET_SIZE);
+            return PacketFactory.makePackets(message.toByteArray(),
+                    ThreadLocalRandom.current().nextInt(), WRITE_SIZE);
         } catch (Exception e) {
             assertWithMessage("Uncaught exception while making packets.").fail();
             return new ArrayList<>();
@@ -220,8 +188,8 @@
     }
 
     private void processMessage(byte[] data) {
-        List<BlePacket> packets = createPackets(data);
-        for (BlePacket packet : packets) {
+        List<Packet> packets = createPackets(data);
+        for (Packet packet : packets) {
             mStream.processPacket(packet);
         }
     }
diff --git a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/ble/BlePacketFactoryTest.java b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/PacketFactoryTest.java
similarity index 81%
rename from connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/ble/BlePacketFactoryTest.java
rename to connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/PacketFactoryTest.java
index a6ee20e..d5fdc03 100644
--- a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/ble/BlePacketFactoryTest.java
+++ b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/PacketFactoryTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2019 The Android Open Source Project
+ * Copyright (C) 2020 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -14,13 +14,13 @@
  * limitations under the License.
  */
 
-package com.android.car.connecteddevice.connection.ble;
+package com.android.car.connecteddevice.connection;
 
 import static com.google.common.truth.Truth.assertThat;
 
 import androidx.test.ext.junit.runners.AndroidJUnit4;
 
-import com.android.car.connecteddevice.BleStreamProtos.BlePacketProto.BlePacket;
+import com.android.car.connecteddevice.StreamProtos.PacketProto.Packet;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -30,7 +30,7 @@
 import java.util.Random;
 
 @RunWith(AndroidJUnit4.class)
-public class BlePacketFactoryTest {
+public class PacketFactoryTest {
     @Test
     public void testGetHeaderSize() {
         // 1 byte to encode the ID, 1 byte for the field number.
@@ -51,13 +51,13 @@
         int expectedHeaderSize = messageIdEncodingSize + payloadSizeEncodingSize
                 + totalPacketsEncodingSize + packetNumberEncodingSize;
 
-        assertThat(BlePacketFactory.getPacketHeaderSize(totalPackets, messageId, payloadSize))
+        assertThat(PacketFactory.getPacketHeaderSize(totalPackets, messageId, payloadSize))
                 .isEqualTo(expectedHeaderSize);
     }
 
     @Test
     public void testGetTotalPackets_withVarintSize1_returnsCorrectPackets()
-            throws BlePacketFactoryException {
+            throws PacketFactoryException {
         int messageId = 1;
         int maxSize = 49;
         int payloadSize = 100;
@@ -67,13 +67,13 @@
         // payload. ceil(payloadSize/38) gives the total packets.
         int expectedTotalPackets = 3;
 
-        assertThat(BlePacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
+        assertThat(PacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
                 .isEqualTo(expectedTotalPackets);
     }
 
     @Test
     public void testGetTotalPackets_withVarintSize2_returnsCorrectPackets()
-            throws BlePacketFactoryException {
+            throws PacketFactoryException {
         int messageId = 1;
         int maxSize = 49;
         int payloadSize = 6000;
@@ -83,13 +83,13 @@
         // payload. ceil(payloadSize/37) gives the total packets.
         int expectedTotalPackets = 163;
 
-        assertThat(BlePacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
+        assertThat(PacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
                 .isEqualTo(expectedTotalPackets);
     }
 
     @Test
     public void testGetTotalPackets_withVarintSize3_returnsCorrectPackets()
-            throws BlePacketFactoryException {
+            throws PacketFactoryException {
         int messageId = 1;
         int maxSize = 49;
         int payloadSize = 1000000;
@@ -99,13 +99,13 @@
         // payload. ceil(payloadSize/36) gives the total packets.
         int expectedTotalPackets = 27778;
 
-        assertThat(BlePacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
+        assertThat(PacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
                 .isEqualTo(expectedTotalPackets);
     }
 
     @Test
     public void testGetTotalPackets_withVarintSize4_returnsCorrectPackets()
-            throws BlePacketFactoryException {
+            throws PacketFactoryException {
         int messageId = 1;
         int maxSize = 49;
         int payloadSize = 178400320;
@@ -115,7 +115,7 @@
         // payload. ceil(payloadSize/35) gives the total packets.
         int expectedTotalPackets = 5097152;
 
-        assertThat(BlePacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
+        assertThat(PacketFactory.getTotalPacketNumber(messageId, payloadSize, maxSize))
                 .isEqualTo(expectedTotalPackets);
     }
 
@@ -125,15 +125,15 @@
         byte[] payload = makePayload(/* length= */ 100);
         int maxSize = 1000;
 
-        List<BlePacket> packets =
-                BlePacketFactory.makeBlePackets(payload, /* mesageId= */ 1, maxSize);
+        List<Packet> packets =
+                PacketFactory.makePackets(payload, /* messageId= */ 1, maxSize);
 
         assertThat(packets).hasSize(1);
 
         ByteArrayOutputStream reconstructedPayload = new ByteArrayOutputStream();
 
         // Combine together all the payloads within the BlePackets.
-        for (BlePacket packet : packets) {
+        for (Packet packet : packets) {
             reconstructedPayload.write(packet.getPayload().toByteArray());
         }
 
@@ -146,15 +146,15 @@
         byte[] payload = makePayload(/* length= */ 10000);
         int maxSize = 50;
 
-        List<BlePacket> packets =
-                BlePacketFactory.makeBlePackets(payload, /* mesageId= */ 1, maxSize);
+        List<Packet> packets =
+                PacketFactory.makePackets(payload, /* messageId= */ 1, maxSize);
 
         assertThat(packets.size()).isGreaterThan(1);
 
         ByteArrayOutputStream reconstructedPayload = new ByteArrayOutputStream();
 
         // Combine together all the payloads within the BlePackets.
-        for (BlePacket packet : packets) {
+        for (Packet packet : packets) {
             reconstructedPayload.write(packet.getPayload().toByteArray());
         }
 
diff --git a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/CarSppManagerTest.java b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/CarSppManagerTest.java
index eeb58c9..7a17f7c 100644
--- a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/CarSppManagerTest.java
+++ b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/CarSppManagerTest.java
@@ -52,6 +52,7 @@
     private static final UUID TEST_REMOTE_DEVICE_ID = UUID.randomUUID();
     private static final UUID TEST_SERVICE_UUID = UUID.randomUUID();
     private static final String TEST_VERIFICATION_CODE = "000000";
+    private static final int MAX_PACKET_SIZE = 700;
     @Mock
     private SppManager mMockSppManager;
     @Mock
@@ -67,7 +68,8 @@
                 .initMocks(this)
                 .strictness(Strictness.WARN)
                 .startMocking();
-        mCarSppManager = new CarSppManager(mMockSppManager, mMockStorage, TEST_SERVICE_UUID);
+        mCarSppManager = new CarSppManager(mMockSppManager, mMockStorage, TEST_SERVICE_UUID,
+                MAX_PACKET_SIZE);
     }
 
     @After
diff --git a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStreamTest.java b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStreamTest.java
index 7a7c7ee..a126d04 100644
--- a/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStreamTest.java
+++ b/connected-device-lib/tests/unit/src/com/android/car/connecteddevice/connection/spp/SppDeviceMessageStreamTest.java
@@ -16,7 +16,6 @@
 
 package com.android.car.connecteddevice.connection.spp;
 
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mockitoSession;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -26,13 +25,7 @@
 
 import androidx.test.ext.junit.runners.AndroidJUnit4;
 
-import com.android.car.connecteddevice.StreamProtos.DeviceMessageProto.BleDeviceMessage;
-import com.android.car.connecteddevice.StreamProtos.OperationProto;
-import com.android.car.connecteddevice.StreamProtos.VersionExchangeProto;
-import com.android.car.connecteddevice.connection.DeviceMessage;
-import com.android.car.connecteddevice.connection.DeviceMessageStream;
 import com.android.car.connecteddevice.util.ByteUtils;
-import com.android.car.protobuf.ByteString;
 
 import org.junit.After;
 import org.junit.Before;
@@ -42,41 +35,12 @@
 import org.mockito.MockitoSession;
 import org.mockito.quality.Strictness;
 
-import java.util.UUID;
-
 @RunWith(AndroidJUnit4.class)
 public class SppDeviceMessageStreamTest {
-    private final byte[] mTestData = "testData".getBytes();
-    private final UUID mTestUuid = UUID.randomUUID();
-    private final OperationProto.OperationType mTestOperationType =
-            OperationProto.OperationType.CLIENT_MESSAGE;
-    private final boolean mIsEncrypted = false;
-    private final DeviceMessage mTestDeviceMessage = new DeviceMessage(mTestUuid, mIsEncrypted,
-            mTestData);
-    private final BleDeviceMessage mBleDeviceMessage = BleDeviceMessage.newBuilder()
-            .setOperation(mTestOperationType)
-            .setIsPayloadEncrypted(mIsEncrypted)
-            .setPayload(ByteString.copyFrom(mTestData))
-            .setRecipient(ByteString.copyFrom(ByteUtils.uuidToBytes(mTestUuid)))
-            .build();
+    private static final int MAX_WRITE_SIZE = 700;
 
-    private final byte[] mVersionExchangeMessage =
-            VersionExchangeProto.BleVersionExchange.newBuilder()
-                    .setMinSupportedMessagingVersion(SppDeviceMessageStream.MESSAGING_VERSION)
-                    .setMaxSupportedMessagingVersion(SppDeviceMessageStream.MESSAGING_VERSION)
-                    .setMinSupportedSecurityVersion(SppDeviceMessageStream.SECURITY_VERSION)
-                    .setMaxSupportedSecurityVersion(SppDeviceMessageStream.SECURITY_VERSION)
-                    .build()
-                    .toByteArray();
     @Mock
     private SppManager mMockSppManager;
-    private DeviceMessageStream.MessageReceivedListener mMessageReceivedListener = spy(
-            (deviceMessage, operationType) -> {
-            });
-    private DeviceMessageStream.MessageReceivedErrorListener mMessageReceivedErrorListener = spy(
-            exception -> {
-            }
-    );
     private BluetoothDevice mBluetoothDevice = BluetoothAdapter.getDefaultAdapter().getRemoteDevice(
             "00:11:22:33:44:55");
     private MockitoSession mMockingSession;
@@ -88,7 +52,8 @@
                 .initMocks(this)
                 .strictness(Strictness.WARN)
                 .startMocking();
-        mSppDeviceMessageStream = new SppDeviceMessageStream(mMockSppManager, mBluetoothDevice);
+        mSppDeviceMessageStream = spy(
+                new SppDeviceMessageStream(mMockSppManager, mBluetoothDevice, MAX_WRITE_SIZE));
     }
 
     @After
@@ -99,26 +64,10 @@
     }
 
     @Test
-    public void testWriteMessage_CallSppManagerWriteMethod() {
-        mSppDeviceMessageStream.writeMessage(mTestDeviceMessage, mTestOperationType);
-        verify(mMockSppManager).write(mBleDeviceMessage.toByteArray());
+    public void send_callsWriteAndSendCompleted() {
+        byte[] data = ByteUtils.randomBytes(10);
+        mSppDeviceMessageStream.send(data);
+        verify(mMockSppManager).write(data);
+        verify(mSppDeviceMessageStream).sendCompleted();
     }
-
-    @Test
-    public void testOnMessageReceived_InformMessageReceivedListener() {
-        mSppDeviceMessageStream.onMessageReceived(mBluetoothDevice, mVersionExchangeMessage);
-        mSppDeviceMessageStream.setMessageReceivedListener(mMessageReceivedListener);
-        mSppDeviceMessageStream.onMessageReceived(mBluetoothDevice,
-                mBleDeviceMessage.toByteArray());
-        verify(mMessageReceivedListener).onMessageReceived(mTestDeviceMessage, mTestOperationType);
-    }
-
-    @Test
-    public void testOnMessageReceived_InformOnMessageReceivedErrorListener() {
-        mSppDeviceMessageStream.onMessageReceived(mBluetoothDevice, mVersionExchangeMessage);
-        mSppDeviceMessageStream.setMessageReceivedErrorListener(mMessageReceivedErrorListener);
-        mSppDeviceMessageStream.onMessageReceived(mBluetoothDevice, mTestData);
-        verify(mMessageReceivedErrorListener).onMessageReceivedError(any());
-    }
-
 }