diff --git a/google/api_core/__init__.py b/google/api_core/__init__.py
index b80ea37..89ce751 100644
--- a/google/api_core/__init__.py
+++ b/google/api_core/__init__.py
@@ -20,3 +20,6 @@
 from google.api_core import version as api_core_version
 
 __version__ = api_core_version.__version__
+
+# for backwards compatibility, expose async unary retries as google.api_core.retry_async
+from .retry import retry_unary_async as retry_async  # noqa: F401
diff --git a/google/api_core/retry.py b/google/api_core/retry.py
deleted file mode 100644
index 08f8209..0000000
--- a/google/api_core/retry.py
+++ /dev/null
@@ -1,477 +0,0 @@
-# Copyright 2017 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Helpers for retrying functions with exponential back-off.
-
-The :class:`Retry` decorator can be used to retry functions that raise
-exceptions using exponential backoff. Because a exponential sleep algorithm is
-used, the retry is limited by a `deadline`. The deadline is the maximum amount
-of time a method can block. This is used instead of total number of retries
-because it is difficult to ascertain the amount of time a function can block
-when using total number of retries and exponential backoff.
-
-By default, this decorator will retry transient
-API errors (see :func:`if_transient_error`). For example:
-
-.. code-block:: python
-
-    @retry.Retry()
-    def call_flaky_rpc():
-        return client.flaky_rpc()
-
-    # Will retry flaky_rpc() if it raises transient API errors.
-    result = call_flaky_rpc()
-
-You can pass a custom predicate to retry on different exceptions, such as
-waiting for an eventually consistent item to be available:
-
-.. code-block:: python
-
-    @retry.Retry(predicate=if_exception_type(exceptions.NotFound))
-    def check_if_exists():
-        return client.does_thing_exist()
-
-    is_available = check_if_exists()
-
-Some client library methods apply retry automatically. These methods can accept
-a ``retry`` parameter that allows you to configure the behavior:
-
-.. code-block:: python
-
-    my_retry = retry.Retry(deadline=60)
-    result = client.some_method(retry=my_retry)
-
-"""
-
-from __future__ import annotations
-
-import datetime
-import functools
-import logging
-import random
-import sys
-import time
-import inspect
-import warnings
-from typing import Any, Callable, TypeVar, TYPE_CHECKING
-
-import requests.exceptions
-
-from google.api_core import datetime_helpers
-from google.api_core import exceptions
-from google.auth import exceptions as auth_exceptions
-
-if TYPE_CHECKING:
-    if sys.version_info >= (3, 10):
-        from typing import ParamSpec
-    else:
-        from typing_extensions import ParamSpec
-
-    _P = ParamSpec("_P")
-    _R = TypeVar("_R")
-
-_LOGGER = logging.getLogger(__name__)
-_DEFAULT_INITIAL_DELAY = 1.0  # seconds
-_DEFAULT_MAXIMUM_DELAY = 60.0  # seconds
-_DEFAULT_DELAY_MULTIPLIER = 2.0
-_DEFAULT_DEADLINE = 60.0 * 2.0  # seconds
-_ASYNC_RETRY_WARNING = "Using the synchronous google.api_core.retry.Retry with asynchronous calls may lead to unexpected results. Please use google.api_core.retry_async.AsyncRetry instead."
-
-
-def if_exception_type(
-    *exception_types: type[BaseException],
-) -> Callable[[BaseException], bool]:
-    """Creates a predicate to check if the exception is of a given type.
-
-    Args:
-        exception_types (Sequence[:func:`type`]): The exception types to check
-            for.
-
-    Returns:
-        Callable[Exception]: A predicate that returns True if the provided
-            exception is of the given type(s).
-    """
-
-    def if_exception_type_predicate(exception: BaseException) -> bool:
-        """Bound predicate for checking an exception type."""
-        return isinstance(exception, exception_types)
-
-    return if_exception_type_predicate
-
-
-# pylint: disable=invalid-name
-# Pylint sees this as a constant, but it is also an alias that should be
-# considered a function.
-if_transient_error = if_exception_type(
-    exceptions.InternalServerError,
-    exceptions.TooManyRequests,
-    exceptions.ServiceUnavailable,
-    requests.exceptions.ConnectionError,
-    requests.exceptions.ChunkedEncodingError,
-    auth_exceptions.TransportError,
-)
-"""A predicate that checks if an exception is a transient API error.
-
-The following server errors are considered transient:
-
-- :class:`google.api_core.exceptions.InternalServerError` - HTTP 500, gRPC
-    ``INTERNAL(13)`` and its subclasses.
-- :class:`google.api_core.exceptions.TooManyRequests` - HTTP 429
-- :class:`google.api_core.exceptions.ServiceUnavailable` - HTTP 503
-- :class:`requests.exceptions.ConnectionError`
-- :class:`requests.exceptions.ChunkedEncodingError` - The server declared
-    chunked encoding but sent an invalid chunk.
-- :class:`google.auth.exceptions.TransportError` - Used to indicate an
-    error occurred during an HTTP request.
-"""
-# pylint: enable=invalid-name
-
-
-def exponential_sleep_generator(initial, maximum, multiplier=_DEFAULT_DELAY_MULTIPLIER):
-    """Generates sleep intervals based on the exponential back-off algorithm.
-
-    This implements the `Truncated Exponential Back-off`_ algorithm.
-
-    .. _Truncated Exponential Back-off:
-        https://cloud.google.com/storage/docs/exponential-backoff
-
-    Args:
-        initial (float): The minimum amount of time to delay. This must
-            be greater than 0.
-        maximum (float): The maximum amount of time to delay.
-        multiplier (float): The multiplier applied to the delay.
-
-    Yields:
-        float: successive sleep intervals.
-    """
-    delay = min(initial, maximum)
-    while True:
-        yield random.uniform(0.0, delay)
-        delay = min(delay * multiplier, maximum)
-
-
-def retry_target(
-    target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs
-):
-    """Call a function and retry if it fails.
-
-    This is the lowest-level retry helper. Generally, you'll use the
-    higher-level retry helper :class:`Retry`.
-
-    Args:
-        target(Callable): The function to call and retry. This must be a
-            nullary function - apply arguments with `functools.partial`.
-        predicate (Callable[Exception]): A callable used to determine if an
-            exception raised by the target should be considered retryable.
-            It should return True to retry or False otherwise.
-        sleep_generator (Iterable[float]): An infinite iterator that determines
-            how long to sleep between retries.
-        timeout (float): How long to keep retrying the target.
-        on_error (Callable[Exception]): A function to call while processing a
-            retryable exception.  Any error raised by this function will *not*
-            be caught.
-        deadline (float): DEPRECATED: use ``timeout`` instead. For backward
-            compatibility, if specified it will override ``timeout`` parameter.
-
-    Returns:
-        Any: the return value of the target function.
-
-    Raises:
-        google.api_core.RetryError: If the deadline is exceeded while retrying.
-        ValueError: If the sleep generator stops yielding values.
-        Exception: If the target raises a method that isn't retryable.
-    """
-
-    timeout = kwargs.get("deadline", timeout)
-
-    if timeout is not None:
-        deadline = datetime_helpers.utcnow() + datetime.timedelta(seconds=timeout)
-    else:
-        deadline = None
-
-    last_exc = None
-
-    for sleep in sleep_generator:
-        try:
-            result = target()
-            if inspect.isawaitable(result):
-                warnings.warn(_ASYNC_RETRY_WARNING)
-            return result
-
-        # pylint: disable=broad-except
-        # This function explicitly must deal with broad exceptions.
-        except Exception as exc:
-            if not predicate(exc):
-                raise
-            last_exc = exc
-            if on_error is not None:
-                on_error(exc)
-
-        if deadline is not None:
-            next_attempt_time = datetime_helpers.utcnow() + datetime.timedelta(
-                seconds=sleep
-            )
-            if deadline < next_attempt_time:
-                raise exceptions.RetryError(
-                    "Deadline of {:.1f}s exceeded while calling target function".format(
-                        timeout
-                    ),
-                    last_exc,
-                ) from last_exc
-
-        _LOGGER.debug(
-            "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
-        )
-        time.sleep(sleep)
-
-    raise ValueError("Sleep generator stopped yielding sleep values.")
-
-
-class Retry(object):
-    """Exponential retry decorator.
-
-    This class is a decorator used to add retry or polling behavior to an RPC
-    call.
-
-    Although the default behavior is to retry transient API errors, a
-    different predicate can be provided to retry other exceptions.
-
-    There two important concepts that retry/polling behavior may operate on,
-    Deadline and Timeout, which need to be properly defined for the correct
-    usage of this class and the rest of the library.
-
-    Deadline: a fixed point in time by which a certain operation must
-    terminate. For example, if a certain operation has a deadline
-    "2022-10-18T23:30:52.123Z" it must terminate (successfully or with an
-    error) by that time, regardless of when it was started or whether it
-    was started at all.
-
-    Timeout: the maximum duration of time after which a certain operation
-    must terminate (successfully or with an error). The countdown begins right
-    after an operation was started. For example, if an operation was started at
-    09:24:00 with timeout of 75 seconds, it must terminate no later than
-    09:25:15.
-
-    Unfortunately, in the past this class (and the api-core library as a whole) has not been
-    properly distinguishing the concepts of "timeout" and "deadline", and the
-    ``deadline`` parameter has meant ``timeout``. That is why
-    ``deadline`` has been deprecated and ``timeout`` should be used instead. If the
-    ``deadline`` parameter is set, it will override the ``timeout`` parameter. In other words,
-    ``retry.deadline`` should be treated as just a deprecated alias for
-    ``retry.timeout``.
-
-    Said another way, it is safe to assume that this class and the rest of this
-    library operate in terms of timeouts (not deadlines) unless explicitly
-    noted the usage of deadline semantics.
-
-    It is also important to
-    understand the three most common applications of the Timeout concept in the
-    context of this library.
-
-    Usually the generic Timeout term may stand for one of the following actual
-    timeouts: RPC Timeout, Retry Timeout, or Polling Timeout.
-
-    RPC Timeout: a value supplied by the client to the server so
-    that the server side knows the maximum amount of time it is expected to
-    spend handling that specific RPC. For example, in the case of gRPC transport,
-    RPC Timeout is represented by setting "grpc-timeout" header in the HTTP2
-    request. The `timeout` property of this class normally never represents the
-    RPC Timeout as it is handled separately by the ``google.api_core.timeout``
-    module of this library.
-
-    Retry Timeout: this is the most common meaning of the ``timeout`` property
-    of this class, and defines how long a certain RPC may be retried in case
-    the server returns an error.
-
-    Polling Timeout: defines how long the
-    client side is allowed to call the polling RPC repeatedly to check a status of a
-    long-running operation. Each polling RPC is
-    expected to succeed (its errors are supposed to be handled by the retry
-    logic). The decision as to whether a new polling attempt needs to be made is based
-    not on the RPC status code but  on the status of the returned
-    status of an operation. In other words: we will poll a long-running operation until the operation is done or the polling timeout expires. Each poll will inform us of the status of the operation. The poll consists of an RPC to the server that may itself be retried as per the poll-specific retry settings in case of errors. The operation-level retry settings do NOT apply to polling-RPC retries.
-
-    With the actual timeout types being defined above, the client libraries
-    often refer to just Timeout without clarifying which type specifically
-    that is. In that case the actual timeout type (sometimes also referred to as
-    Logical Timeout) can be determined from the context. If it is a unary rpc
-    call (i.e. a regular one) Timeout usually stands for the RPC Timeout (if
-    provided directly as a standalone value) or Retry Timeout (if provided as
-    ``retry.timeout`` property of the unary RPC's retry config). For
-    ``Operation`` or ``PollingFuture`` in general Timeout stands for
-    Polling Timeout.
-
-    Args:
-        predicate (Callable[Exception]): A callable that should return ``True``
-            if the given exception is retryable.
-        initial (float): The minimum amount of time to delay in seconds. This
-            must be greater than 0.
-        maximum (float): The maximum amount of time to delay in seconds.
-        multiplier (float): The multiplier applied to the delay.
-        timeout (float): How long to keep retrying, in seconds.
-        deadline (float): DEPRECATED: use `timeout` instead. For backward
-            compatibility, if specified it will override the ``timeout`` parameter.
-    """
-
-    def __init__(
-        self,
-        predicate: Callable[[BaseException], bool] = if_transient_error,
-        initial: float = _DEFAULT_INITIAL_DELAY,
-        maximum: float = _DEFAULT_MAXIMUM_DELAY,
-        multiplier: float = _DEFAULT_DELAY_MULTIPLIER,
-        timeout: float = _DEFAULT_DEADLINE,
-        on_error: Callable[[BaseException], Any] | None = None,
-        **kwargs: Any,
-    ) -> None:
-        self._predicate = predicate
-        self._initial = initial
-        self._multiplier = multiplier
-        self._maximum = maximum
-        self._timeout = kwargs.get("deadline", timeout)
-        self._deadline = self._timeout
-        self._on_error = on_error
-
-    def __call__(
-        self,
-        func: Callable[_P, _R],
-        on_error: Callable[[BaseException], Any] | None = None,
-    ) -> Callable[_P, _R]:
-        """Wrap a callable with retry behavior.
-
-        Args:
-            func (Callable): The callable to add retry behavior to.
-            on_error (Callable[Exception]): A function to call while processing
-                a retryable exception. Any error raised by this function will
-                *not* be caught.
-
-        Returns:
-            Callable: A callable that will invoke ``func`` with retry
-                behavior.
-        """
-        if self._on_error is not None:
-            on_error = self._on_error
-
-        @functools.wraps(func)
-        def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R:
-            """A wrapper that calls target function with retry."""
-            target = functools.partial(func, *args, **kwargs)
-            sleep_generator = exponential_sleep_generator(
-                self._initial, self._maximum, multiplier=self._multiplier
-            )
-            return retry_target(
-                target,
-                self._predicate,
-                sleep_generator,
-                self._timeout,
-                on_error=on_error,
-            )
-
-        return retry_wrapped_func
-
-    @property
-    def deadline(self):
-        """
-        DEPRECATED: use ``timeout`` instead.  Refer to the ``Retry`` class
-        documentation for details.
-        """
-        return self._timeout
-
-    @property
-    def timeout(self):
-        return self._timeout
-
-    def with_deadline(self, deadline):
-        """Return a copy of this retry with the given timeout.
-
-        DEPRECATED: use :meth:`with_timeout` instead. Refer to the ``Retry`` class
-        documentation for details.
-
-        Args:
-            deadline (float): How long to keep retrying in seconds.
-
-        Returns:
-            Retry: A new retry instance with the given timeout.
-        """
-        return self.with_timeout(timeout=deadline)
-
-    def with_timeout(self, timeout):
-        """Return a copy of this retry with the given timeout.
-
-        Args:
-            timeout (float): How long to keep retrying, in seconds.
-
-        Returns:
-            Retry: A new retry instance with the given timeout.
-        """
-        return Retry(
-            predicate=self._predicate,
-            initial=self._initial,
-            maximum=self._maximum,
-            multiplier=self._multiplier,
-            timeout=timeout,
-            on_error=self._on_error,
-        )
-
-    def with_predicate(self, predicate):
-        """Return a copy of this retry with the given predicate.
-
-        Args:
-            predicate (Callable[Exception]): A callable that should return
-                ``True`` if the given exception is retryable.
-
-        Returns:
-            Retry: A new retry instance with the given predicate.
-        """
-        return Retry(
-            predicate=predicate,
-            initial=self._initial,
-            maximum=self._maximum,
-            multiplier=self._multiplier,
-            timeout=self._timeout,
-            on_error=self._on_error,
-        )
-
-    def with_delay(self, initial=None, maximum=None, multiplier=None):
-        """Return a copy of this retry with the given delay options.
-
-        Args:
-            initial (float): The minimum amount of time to delay. This must
-                be greater than 0.
-            maximum (float): The maximum amount of time to delay.
-            multiplier (float): The multiplier applied to the delay.
-
-        Returns:
-            Retry: A new retry instance with the given predicate.
-        """
-        return Retry(
-            predicate=self._predicate,
-            initial=initial if initial is not None else self._initial,
-            maximum=maximum if maximum is not None else self._maximum,
-            multiplier=multiplier if multiplier is not None else self._multiplier,
-            timeout=self._timeout,
-            on_error=self._on_error,
-        )
-
-    def __str__(self):
-        return (
-            "<Retry predicate={}, initial={:.1f}, maximum={:.1f}, "
-            "multiplier={:.1f}, timeout={}, on_error={}>".format(
-                self._predicate,
-                self._initial,
-                self._maximum,
-                self._multiplier,
-                self._timeout,  # timeout can be None, thus no {:.1f}
-                self._on_error,
-            )
-        )
diff --git a/google/api_core/retry/__init__.py b/google/api_core/retry/__init__.py
new file mode 100644
index 0000000..7942841
--- /dev/null
+++ b/google/api_core/retry/__init__.py
@@ -0,0 +1,46 @@
+# Copyright 2017 Google LLC
+
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Retry implementation for Google API client libraries."""
+
+from .retry_base import exponential_sleep_generator
+from .retry_base import if_exception_type
+from .retry_base import if_transient_error
+from .retry_base import build_retry_error
+from .retry_base import RetryFailureReason
+from .retry_unary import Retry
+from .retry_unary import retry_target
+from .retry_unary_async import AsyncRetry
+from .retry_unary_async import retry_target as retry_target_async
+from .retry_streaming import StreamingRetry
+from .retry_streaming import retry_target_stream
+from .retry_streaming_async import AsyncStreamingRetry
+from .retry_streaming_async import retry_target_stream as retry_target_stream_async
+
+__all__ = (
+    "exponential_sleep_generator",
+    "if_exception_type",
+    "if_transient_error",
+    "build_retry_error",
+    "RetryFailureReason",
+    "Retry",
+    "AsyncRetry",
+    "StreamingRetry",
+    "AsyncStreamingRetry",
+    "retry_target",
+    "retry_target_async",
+    "retry_target_stream",
+    "retry_target_stream_async",
+)
diff --git a/google/api_core/retry/retry_base.py b/google/api_core/retry/retry_base.py
new file mode 100644
index 0000000..efd6d8f
--- /dev/null
+++ b/google/api_core/retry/retry_base.py
@@ -0,0 +1,354 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Shared classes and functions for retrying requests.
+
+:class:`_BaseRetry` is the base class for :class:`Retry`,
+:class:`AsyncRetry`, :class:`StreamingRetry`, and :class:`AsyncStreamingRetry`.
+"""
+
+from __future__ import annotations
+
+import logging
+import random
+import time
+
+from enum import Enum
+from typing import Any, Callable, TYPE_CHECKING
+
+import requests.exceptions
+
+from google.api_core import exceptions
+from google.auth import exceptions as auth_exceptions
+
+if TYPE_CHECKING:
+    import sys
+
+    if sys.version_info >= (3, 11):
+        from typing import Self
+    else:
+        from typing_extensions import Self
+
+_DEFAULT_INITIAL_DELAY = 1.0  # seconds
+_DEFAULT_MAXIMUM_DELAY = 60.0  # seconds
+_DEFAULT_DELAY_MULTIPLIER = 2.0
+_DEFAULT_DEADLINE = 60.0 * 2.0  # seconds
+
+_LOGGER = logging.getLogger("google.api_core.retry")
+
+
+def if_exception_type(
+    *exception_types: type[Exception],
+) -> Callable[[Exception], bool]:
+    """Creates a predicate to check if the exception is of a given type.
+
+    Args:
+        exception_types (Sequence[:func:`type`]): The exception types to check
+            for.
+
+    Returns:
+        Callable[Exception]: A predicate that returns True if the provided
+            exception is of the given type(s).
+    """
+
+    def if_exception_type_predicate(exception: Exception) -> bool:
+        """Bound predicate for checking an exception type."""
+        return isinstance(exception, exception_types)
+
+    return if_exception_type_predicate
+
+
+# pylint: disable=invalid-name
+# Pylint sees this as a constant, but it is also an alias that should be
+# considered a function.
+if_transient_error = if_exception_type(
+    exceptions.InternalServerError,
+    exceptions.TooManyRequests,
+    exceptions.ServiceUnavailable,
+    requests.exceptions.ConnectionError,
+    requests.exceptions.ChunkedEncodingError,
+    auth_exceptions.TransportError,
+)
+"""A predicate that checks if an exception is a transient API error.
+
+The following server errors are considered transient:
+
+- :class:`google.api_core.exceptions.InternalServerError` - HTTP 500, gRPC
+    ``INTERNAL(13)`` and its subclasses.
+- :class:`google.api_core.exceptions.TooManyRequests` - HTTP 429
+- :class:`google.api_core.exceptions.ServiceUnavailable` - HTTP 503
+- :class:`requests.exceptions.ConnectionError`
+- :class:`requests.exceptions.ChunkedEncodingError` - The server declared
+    chunked encoding but sent an invalid chunk.
+- :class:`google.auth.exceptions.TransportError` - Used to indicate an
+    error occurred during an HTTP request.
+"""
+# pylint: enable=invalid-name
+
+
+def exponential_sleep_generator(
+    initial: float, maximum: float, multiplier: float = _DEFAULT_DELAY_MULTIPLIER
+):
+    """Generates sleep intervals based on the exponential back-off algorithm.
+
+    This implements the `Truncated Exponential Back-off`_ algorithm.
+
+    .. _Truncated Exponential Back-off:
+        https://cloud.google.com/storage/docs/exponential-backoff
+
+    Args:
+        initial (float): The minimum amount of time to delay. This must
+            be greater than 0.
+        maximum (float): The maximum amount of time to delay.
+        multiplier (float): The multiplier applied to the delay.
+
+    Yields:
+        float: successive sleep intervals.
+    """
+    max_delay = min(initial, maximum)
+    while True:
+        yield random.uniform(0.0, max_delay)
+        max_delay = min(max_delay * multiplier, maximum)
+
+
+class RetryFailureReason(Enum):
+    """
+    The cause of a failed retry, used when building exceptions
+    """
+
+    TIMEOUT = 0
+    NON_RETRYABLE_ERROR = 1
+
+
+def build_retry_error(
+    exc_list: list[Exception],
+    reason: RetryFailureReason,
+    timeout_val: float | None,
+    **kwargs: Any,
+) -> tuple[Exception, Exception | None]:
+    """
+    Default exception_factory implementation.
+
+    Returns a RetryError if the failure is due to a timeout, otherwise
+    returns the last exception encountered.
+
+    Args:
+      - exc_list: list of exceptions that occurred during the retry
+      - reason: reason for the retry failure.
+            Can be TIMEOUT or NON_RETRYABLE_ERROR
+      - timeout_val: the original timeout value for the retry (in seconds), for use in the exception message
+
+    Returns:
+      - tuple: a tuple of the exception to be raised, and the cause exception if any
+    """
+    if reason == RetryFailureReason.TIMEOUT:
+        # return RetryError with the most recent exception as the cause
+        src_exc = exc_list[-1] if exc_list else None
+        timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else ""
+        return (
+            exceptions.RetryError(
+                f"Timeout {timeout_val_str}exceeded",
+                src_exc,
+            ),
+            src_exc,
+        )
+    elif exc_list:
+        # return most recent exception encountered
+        return exc_list[-1], None
+    else:
+        # no exceptions were given in exc_list. Raise generic RetryError
+        return exceptions.RetryError("Unknown error", None), None
+
+
+def _retry_error_helper(
+    exc: Exception,
+    deadline: float | None,
+    next_sleep: float,
+    error_list: list[Exception],
+    predicate_fn: Callable[[Exception], bool],
+    on_error_fn: Callable[[Exception], None] | None,
+    exc_factory_fn: Callable[
+        [list[Exception], RetryFailureReason, float | None],
+        tuple[Exception, Exception | None],
+    ],
+    original_timeout: float | None,
+):
+    """
+    Shared logic for handling an error for all retry implementations
+
+    - Raises an error on timeout or non-retryable error
+    - Calls on_error_fn if provided
+    - Logs the error
+
+    Args:
+       - exc: the exception that was raised
+       - deadline: the deadline for the retry, calculated as a diff from time.monotonic()
+       - next_sleep: the next sleep interval
+       - error_list: the list of exceptions that have been raised so far
+       - predicate_fn: takes `exc` and returns true if the operation should be retried
+       - on_error_fn: callback to execute when a retryable error occurs
+       - exc_factory_fn: callback used to build the exception to be raised on terminal failure
+       - original_timeout_val: the original timeout value for the retry (in seconds),
+           to be passed to the exception factory for building an error message
+    """
+    error_list.append(exc)
+    if not predicate_fn(exc):
+        final_exc, source_exc = exc_factory_fn(
+            error_list,
+            RetryFailureReason.NON_RETRYABLE_ERROR,
+            original_timeout,
+        )
+        raise final_exc from source_exc
+    if on_error_fn is not None:
+        on_error_fn(exc)
+    if deadline is not None and time.monotonic() + next_sleep > deadline:
+        final_exc, source_exc = exc_factory_fn(
+            error_list,
+            RetryFailureReason.TIMEOUT,
+            original_timeout,
+        )
+        raise final_exc from source_exc
+    _LOGGER.debug(
+        "Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], next_sleep)
+    )
+
+
+class _BaseRetry(object):
+    """
+    Base class for retry configuration objects. This class is intended to capture retry
+    and backoff configuration that is common to both synchronous and asynchronous retries,
+    for both unary and streaming RPCs. It is not intended to be instantiated directly,
+    but rather to be subclassed by the various retry configuration classes.
+    """
+
+    def __init__(
+        self,
+        predicate: Callable[[Exception], bool] = if_transient_error,
+        initial: float = _DEFAULT_INITIAL_DELAY,
+        maximum: float = _DEFAULT_MAXIMUM_DELAY,
+        multiplier: float = _DEFAULT_DELAY_MULTIPLIER,
+        timeout: float = _DEFAULT_DEADLINE,
+        on_error: Callable[[Exception], Any] | None = None,
+        **kwargs: Any,
+    ) -> None:
+        self._predicate = predicate
+        self._initial = initial
+        self._multiplier = multiplier
+        self._maximum = maximum
+        self._timeout = kwargs.get("deadline", timeout)
+        self._deadline = self._timeout
+        self._on_error = on_error
+
+    def __call__(self, *args, **kwargs) -> Any:
+        raise NotImplementedError("Not implemented in base class")
+
+    @property
+    def deadline(self) -> float | None:
+        """
+        DEPRECATED: use ``timeout`` instead.  Refer to the ``Retry`` class
+        documentation for details.
+        """
+        return self._timeout
+
+    @property
+    def timeout(self) -> float | None:
+        return self._timeout
+
+    def _replace(
+        self,
+        predicate: Callable[[Exception], bool] | None = None,
+        initial: float | None = None,
+        maximum: float | None = None,
+        multiplier: float | None = None,
+        timeout: float | None = None,
+        on_error: Callable[[Exception], Any] | None = None,
+    ) -> Self:
+        return type(self)(
+            predicate=predicate or self._predicate,
+            initial=initial or self._initial,
+            maximum=maximum or self._maximum,
+            multiplier=multiplier or self._multiplier,
+            timeout=timeout or self._timeout,
+            on_error=on_error or self._on_error,
+        )
+
+    def with_deadline(self, deadline: float | None) -> Self:
+        """Return a copy of this retry with the given timeout.
+
+        DEPRECATED: use :meth:`with_timeout` instead. Refer to the ``Retry`` class
+        documentation for details.
+
+        Args:
+            deadline (float): How long to keep retrying, in seconds.
+
+        Returns:
+            Retry: A new retry instance with the given timeout.
+        """
+        return self._replace(timeout=deadline)
+
+    def with_timeout(self, timeout: float) -> Self:
+        """Return a copy of this retry with the given timeout.
+
+        Args:
+            timeout (float): How long to keep retrying, in seconds.
+
+        Returns:
+            Retry: A new retry instance with the given timeout.
+        """
+        return self._replace(timeout=timeout)
+
+    def with_predicate(self, predicate: Callable[[Exception], bool]) -> Self:
+        """Return a copy of this retry with the given predicate.
+
+        Args:
+            predicate (Callable[Exception]): A callable that should return
+                ``True`` if the given exception is retryable.
+
+        Returns:
+            Retry: A new retry instance with the given predicate.
+        """
+        return self._replace(predicate=predicate)
+
+    def with_delay(
+        self,
+        initial: float | None = None,
+        maximum: float | None = None,
+        multiplier: float | None = None,
+    ) -> Self:
+        """Return a copy of this retry with the given delay options.
+
+        Args:
+            initial (float): The minimum amount of time to delay (in seconds). This must
+                be greater than 0.
+            maximum (float): The maximum amount of time to delay (in seconds).
+            multiplier (float): The multiplier applied to the delay.
+
+        Returns:
+            Retry: A new retry instance with the given predicate.
+        """
+        return self._replace(initial=initial, maximum=maximum, multiplier=multiplier)
+
+    def __str__(self) -> str:
+        return (
+            "<{} predicate={}, initial={:.1f}, maximum={:.1f}, "
+            "multiplier={:.1f}, timeout={}, on_error={}>".format(
+                type(self).__name__,
+                self._predicate,
+                self._initial,
+                self._maximum,
+                self._multiplier,
+                self._timeout,  # timeout can be None, thus no {:.1f}
+                self._on_error,
+            )
+        )
diff --git a/google/api_core/retry/retry_streaming.py b/google/api_core/retry/retry_streaming.py
new file mode 100644
index 0000000..e113323
--- /dev/null
+++ b/google/api_core/retry/retry_streaming.py
@@ -0,0 +1,263 @@
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Generator wrapper for retryable streaming RPCs.
+"""
+from __future__ import annotations
+
+from typing import (
+    Callable,
+    Optional,
+    List,
+    Tuple,
+    Iterable,
+    Generator,
+    TypeVar,
+    Any,
+    TYPE_CHECKING,
+)
+
+import sys
+import time
+import functools
+
+from google.api_core.retry.retry_base import _BaseRetry
+from google.api_core.retry.retry_base import _retry_error_helper
+from google.api_core.retry import exponential_sleep_generator
+from google.api_core.retry import build_retry_error
+from google.api_core.retry import RetryFailureReason
+
+if TYPE_CHECKING:
+    if sys.version_info >= (3, 10):
+        from typing import ParamSpec
+    else:
+        from typing_extensions import ParamSpec
+
+    _P = ParamSpec("_P")  # target function call parameters
+    _Y = TypeVar("_Y")  # yielded values
+
+
+def retry_target_stream(
+    target: Callable[_P, Iterable[_Y]],
+    predicate: Callable[[Exception], bool],
+    sleep_generator: Iterable[float],
+    timeout: Optional[float] = None,
+    on_error: Optional[Callable[[Exception], None]] = None,
+    exception_factory: Callable[
+        [List[Exception], RetryFailureReason, Optional[float]],
+        Tuple[Exception, Optional[Exception]],
+    ] = build_retry_error,
+    init_args: _P.args = (),
+    init_kwargs: _P.kwargs = {},
+    **kwargs,
+) -> Generator[_Y, Any, None]:
+    """Create a generator wrapper that retries the wrapped stream if it fails.
+
+    This is the lowest-level retry helper. Generally, you'll use the
+    higher-level retry helper :class:`Retry`.
+
+    Args:
+        target: The generator function to call and retry.
+        predicate: A callable used to determine if an
+            exception raised by the target should be considered retryable.
+            It should return True to retry or False otherwise.
+        sleep_generator: An infinite iterator that determines
+            how long to sleep between retries.
+        timeout: How long to keep retrying the target.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error: If given, the on_error callback will be called with each
+            retryable exception raised by the target. Any error raised by this
+            function will *not* be caught.
+        exception_factory: A function that is called when the retryable reaches
+            a terminal failure state, used to construct an exception to be raised.
+            It takes a list of all exceptions encountered, a retry.RetryFailureReason
+            enum indicating the failure cause, and the original timeout value
+            as arguments. It should return a tuple of the exception to be raised,
+            along with the cause exception if any. The default implementation will raise
+            a RetryError on timeout, or the last exception encountered otherwise.
+        init_args: Positional arguments to pass to the target function.
+        init_kwargs: Keyword arguments to pass to the target function.
+
+    Returns:
+        Generator: A retryable generator that wraps the target generator function.
+
+    Raises:
+        ValueError: If the sleep generator stops yielding values.
+        Exception: a custom exception specified by the exception_factory if provided.
+            If no exception_factory is provided:
+                google.api_core.RetryError: If the timeout is exceeded while retrying.
+                Exception: If the target raises an error that isn't retryable.
+    """
+
+    timeout = kwargs.get("deadline", timeout)
+    deadline: Optional[float] = (
+        time.monotonic() + timeout if timeout is not None else None
+    )
+    error_list: list[Exception] = []
+
+    for sleep in sleep_generator:
+        # Start a new retry loop
+        try:
+            # Note: in the future, we can add a ResumptionStrategy object
+            # to generate new args between calls. For now, use the same args
+            # for each attempt.
+            subgenerator = target(*init_args, **init_kwargs)
+            return (yield from subgenerator)
+        # handle exceptions raised by the subgenerator
+        # pylint: disable=broad-except
+        # This function explicitly must deal with broad exceptions.
+        except Exception as exc:
+            # defer to shared logic for handling errors
+            _retry_error_helper(
+                exc,
+                deadline,
+                sleep,
+                error_list,
+                predicate,
+                on_error,
+                exception_factory,
+                timeout,
+            )
+            # if exception not raised, sleep before next attempt
+            time.sleep(sleep)
+
+    raise ValueError("Sleep generator stopped yielding sleep values.")
+
+
+class StreamingRetry(_BaseRetry):
+    """Exponential retry decorator for streaming synchronous RPCs.
+
+    This class returns a Generator when called, which wraps the target
+    stream in retry logic. If any exception is raised by the target, the
+    entire stream will be retried within the wrapper.
+
+    Although the default behavior is to retry transient API errors, a
+    different predicate can be provided to retry other exceptions.
+
+    Important Note: when a stream encounters a retryable error, it will
+    silently construct a fresh iterator instance in the background
+    and continue yielding (likely duplicate) values as if no error occurred.
+    This is the most general way to retry a stream, but it often is not the
+    desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
+
+    There are two ways to build more advanced retry logic for streams:
+
+    1. Wrap the target
+        Use a ``target`` that maintains state between retries, and creates a
+        different generator on each retry call. For example, you can wrap a
+        network call in a function that modifies the request based on what has
+        already been returned:
+
+        .. code-block:: python
+
+            def attempt_with_modified_request(target, request, seen_items=[]):
+                # remove seen items from request on each attempt
+                new_request = modify_request(request, seen_items)
+                new_generator = target(new_request)
+                for item in new_generator:
+                    yield item
+                    seen_items.append(item)
+
+            retry_wrapped_fn = StreamingRetry()(attempt_with_modified_request)
+            retryable_generator = retry_wrapped_fn(target, request)
+
+    2. Wrap the retry generator
+        Alternatively, you can wrap the retryable generator itself before
+        passing it to the end-user to add a filter on the stream. For
+        example, you can keep track of the items that were successfully yielded
+        in previous retry attempts, and only yield new items when the
+        new attempt surpasses the previous ones:
+
+        .. code-block:: python
+
+            def retryable_with_filter(target):
+                stream_idx = 0
+                # reset stream_idx when the stream is retried
+                def on_error(e):
+                    nonlocal stream_idx
+                    stream_idx = 0
+                # build retryable
+                retryable_gen = StreamingRetry(...)(target)
+                # keep track of what has been yielded out of filter
+                seen_items = []
+                for item in retryable_gen():
+                    if stream_idx >= len(seen_items):
+                        seen_items.append(item)
+                        yield item
+                    elif item != seen_items[stream_idx]:
+                        raise ValueError("Stream differs from last attempt")
+                    stream_idx += 1
+
+            filter_retry_wrapped = retryable_with_filter(target)
+
+    Args:
+        predicate (Callable[Exception]): A callable that should return ``True``
+            if the given exception is retryable.
+        initial (float): The minimum amount of time to delay in seconds. This
+            must be greater than 0.
+        maximum (float): The maximum amount of time to delay in seconds.
+        multiplier (float): The multiplier applied to the delay.
+        timeout (float): How long to keep retrying, in seconds.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error (Callable[Exception]): A function to call while processing
+            a retryable exception. Any error raised by this function will
+            *not* be caught.
+        deadline (float): DEPRECATED: use `timeout` instead. For backward
+            compatibility, if specified it will override the ``timeout`` parameter.
+    """
+
+    def __call__(
+        self,
+        func: Callable[_P, Iterable[_Y]],
+        on_error: Callable[[Exception], Any] | None = None,
+    ) -> Callable[_P, Generator[_Y, Any, None]]:
+        """Wrap a callable with retry behavior.
+
+        Args:
+            func (Callable): The callable to add retry behavior to.
+            on_error (Optional[Callable[Exception]]): If given, the
+                on_error callback will be called with each retryable exception
+                raised by the wrapped function. Any error raised by this
+                function will *not* be caught. If on_error was specified in the
+                constructor, this value will be ignored.
+
+        Returns:
+            Callable: A callable that will invoke ``func`` with retry
+                behavior.
+        """
+        if self._on_error is not None:
+            on_error = self._on_error
+
+        @functools.wraps(func)
+        def retry_wrapped_func(
+            *args: _P.args, **kwargs: _P.kwargs
+        ) -> Generator[_Y, Any, None]:
+            """A wrapper that calls target function with retry."""
+            sleep_generator = exponential_sleep_generator(
+                self._initial, self._maximum, multiplier=self._multiplier
+            )
+            return retry_target_stream(
+                func,
+                predicate=self._predicate,
+                sleep_generator=sleep_generator,
+                timeout=self._timeout,
+                on_error=on_error,
+                init_args=args,
+                init_kwargs=kwargs,
+            )
+
+        return retry_wrapped_func
diff --git a/google/api_core/retry/retry_streaming_async.py b/google/api_core/retry/retry_streaming_async.py
new file mode 100644
index 0000000..ed4edab
--- /dev/null
+++ b/google/api_core/retry/retry_streaming_async.py
@@ -0,0 +1,325 @@
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Generator wrapper for retryable async streaming RPCs.
+"""
+from __future__ import annotations
+
+from typing import (
+    cast,
+    Any,
+    Callable,
+    Iterable,
+    AsyncIterator,
+    AsyncIterable,
+    Awaitable,
+    TypeVar,
+    AsyncGenerator,
+    TYPE_CHECKING,
+)
+
+import asyncio
+import time
+import sys
+import functools
+
+from google.api_core.retry.retry_base import _BaseRetry
+from google.api_core.retry.retry_base import _retry_error_helper
+from google.api_core.retry import exponential_sleep_generator
+from google.api_core.retry import build_retry_error
+from google.api_core.retry import RetryFailureReason
+
+
+if TYPE_CHECKING:
+    if sys.version_info >= (3, 10):
+        from typing import ParamSpec
+    else:
+        from typing_extensions import ParamSpec
+
+    _P = ParamSpec("_P")  # target function call parameters
+    _Y = TypeVar("_Y")  # yielded values
+
+
+async def retry_target_stream(
+    target: Callable[_P, AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]]],
+    predicate: Callable[[Exception], bool],
+    sleep_generator: Iterable[float],
+    timeout: float | None = None,
+    on_error: Callable[[Exception], None] | None = None,
+    exception_factory: Callable[
+        [list[Exception], RetryFailureReason, float | None],
+        tuple[Exception, Exception | None],
+    ] = build_retry_error,
+    init_args: _P.args = (),
+    init_kwargs: _P.kwargs = {},
+    **kwargs,
+) -> AsyncGenerator[_Y, None]:
+    """Create a generator wrapper that retries the wrapped stream if it fails.
+
+    This is the lowest-level retry helper. Generally, you'll use the
+    higher-level retry helper :class:`AsyncRetry`.
+
+    Args:
+        target: The generator function to call and retry.
+        predicate: A callable used to determine if an
+            exception raised by the target should be considered retryable.
+            It should return True to retry or False otherwise.
+        sleep_generator: An infinite iterator that determines
+            how long to sleep between retries.
+        timeout: How long to keep retrying the target.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error: If given, the on_error callback will be called with each
+            retryable exception raised by the target. Any error raised by this
+            function will *not* be caught.
+        exception_factory: A function that is called when the retryable reaches
+            a terminal failure state, used to construct an exception to be raised.
+            It takes a list of all exceptions encountered, a retry.RetryFailureReason
+            enum indicating the failure cause, and the original timeout value
+            as arguments. It should return a tuple of the exception to be raised,
+            along with the cause exception if any. The default implementation will raise
+            a RetryError on timeout, or the last exception encountered otherwise.
+        init_args: Positional arguments to pass to the target function.
+        init_kwargs: Keyword arguments to pass to the target function.
+
+    Returns:
+        AsyncGenerator: A retryable generator that wraps the target generator function.
+
+    Raises:
+        ValueError: If the sleep generator stops yielding values.
+        Exception: a custom exception specified by the exception_factory if provided.
+            If no exception_factory is provided:
+                google.api_core.RetryError: If the timeout is exceeded while retrying.
+                Exception: If the target raises an error that isn't retryable.
+    """
+    target_iterator: AsyncIterator[_Y] | None = None
+    timeout = kwargs.get("deadline", timeout)
+    deadline = time.monotonic() + timeout if timeout else None
+    # keep track of retryable exceptions we encounter to pass in to exception_factory
+    error_list: list[Exception] = []
+    target_is_generator: bool | None = None
+
+    for sleep in sleep_generator:
+        # Start a new retry loop
+        try:
+            # Note: in the future, we can add a ResumptionStrategy object
+            # to generate new args between calls. For now, use the same args
+            # for each attempt.
+            target_output: AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]] = target(
+                *init_args, **init_kwargs
+            )
+            try:
+                # gapic functions return the generator behind an awaitable
+                # unwrap the awaitable so we can work with the generator directly
+                target_output = await target_output  # type: ignore
+            except TypeError:
+                # was not awaitable, continue
+                pass
+            target_iterator = cast(AsyncIterable["_Y"], target_output).__aiter__()
+
+            if target_is_generator is None:
+                # Check if target supports generator features (asend, athrow, aclose)
+                target_is_generator = bool(getattr(target_iterator, "asend", None))
+
+            sent_in = None
+            while True:
+                ## Read from target_iterator
+                # If the target is a generator, we will advance it with `asend`
+                # otherwise, we will use `anext`
+                if target_is_generator:
+                    next_value = await target_iterator.asend(sent_in)  # type: ignore
+                else:
+                    next_value = await target_iterator.__anext__()
+                ## Yield from Wrapper to caller
+                try:
+                    # yield latest value from target
+                    # exceptions from `athrow` and `aclose` are injected here
+                    sent_in = yield next_value
+                except GeneratorExit:
+                    # if wrapper received `aclose` while waiting on yield,
+                    # it will raise GeneratorExit here
+                    if target_is_generator:
+                        # pass to inner target_iterator for handling
+                        await cast(AsyncGenerator["_Y", None], target_iterator).aclose()
+                    else:
+                        raise
+                    return
+                except:  # noqa: E722
+                    # bare except catches any exception passed to `athrow`
+                    if target_is_generator:
+                        # delegate error handling to target_iterator
+                        await cast(AsyncGenerator["_Y", None], target_iterator).athrow(
+                            cast(BaseException, sys.exc_info()[1])
+                        )
+                    else:
+                        raise
+            return
+        except StopAsyncIteration:
+            # if iterator exhausted, return
+            return
+        # handle exceptions raised by the target_iterator
+        # pylint: disable=broad-except
+        # This function explicitly must deal with broad exceptions.
+        except Exception as exc:
+            # defer to shared logic for handling errors
+            _retry_error_helper(
+                exc,
+                deadline,
+                sleep,
+                error_list,
+                predicate,
+                on_error,
+                exception_factory,
+                timeout,
+            )
+            # if exception not raised, sleep before next attempt
+            await asyncio.sleep(sleep)
+        finally:
+            if target_is_generator and target_iterator is not None:
+                await cast(AsyncGenerator["_Y", None], target_iterator).aclose()
+    raise ValueError("Sleep generator stopped yielding sleep values.")
+
+
+class AsyncStreamingRetry(_BaseRetry):
+    """Exponential retry decorator for async streaming rpcs.
+
+    This class returns an AsyncGenerator when called, which wraps the target
+    stream in retry logic. If any exception is raised by the target, the
+    entire stream will be retried within the wrapper.
+
+    Although the default behavior is to retry transient API errors, a
+    different predicate can be provided to retry other exceptions.
+
+    Important Note: when a stream is encounters a retryable error, it will
+    silently construct a fresh iterator instance in the background
+    and continue yielding (likely duplicate) values as if no error occurred.
+    This is the most general way to retry a stream, but it often is not the
+    desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...]
+
+    There are two ways to build more advanced retry logic for streams:
+
+    1. Wrap the target
+        Use a ``target`` that maintains state between retries, and creates a
+        different generator on each retry call. For example, you can wrap a
+        grpc call in a function that modifies the request based on what has
+        already been returned:
+
+        .. code-block:: python
+
+            async def attempt_with_modified_request(target, request, seen_items=[]):
+                # remove seen items from request on each attempt
+                new_request = modify_request(request, seen_items)
+                new_generator = await target(new_request)
+                async for item in new_generator:
+                    yield item
+                    seen_items.append(item)
+
+            retry_wrapped = AsyncRetry(is_stream=True,...)(attempt_with_modified_request, target, request, [])
+
+        2. Wrap the retry generator
+            Alternatively, you can wrap the retryable generator itself before
+            passing it to the end-user to add a filter on the stream. For
+            example, you can keep track of the items that were successfully yielded
+            in previous retry attempts, and only yield new items when the
+            new attempt surpasses the previous ones:
+
+            .. code-block:: python
+
+                async def retryable_with_filter(target):
+                    stream_idx = 0
+                    # reset stream_idx when the stream is retried
+                    def on_error(e):
+                        nonlocal stream_idx
+                        stream_idx = 0
+                    # build retryable
+                    retryable_gen = AsyncRetry(is_stream=True, ...)(target)
+                    # keep track of what has been yielded out of filter
+                    seen_items = []
+                    async for item in retryable_gen:
+                        if stream_idx >= len(seen_items):
+                            yield item
+                            seen_items.append(item)
+                        elif item != previous_stream[stream_idx]:
+                            raise ValueError("Stream differs from last attempt")"
+                        stream_idx += 1
+
+                filter_retry_wrapped = retryable_with_filter(target)
+
+    Args:
+        predicate (Callable[Exception]): A callable that should return ``True``
+            if the given exception is retryable.
+        initial (float): The minimum a,out of time to delay in seconds. This
+            must be greater than 0.
+        maximum (float): The maximum amount of time to delay in seconds.
+        multiplier (float): The multiplier applied to the delay.
+        timeout (Optional[float]): How long to keep retrying in seconds.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error (Optional[Callable[Exception]]): A function to call while processing
+            a retryable exception. Any error raised by this function will
+            *not* be caught.
+        is_stream (bool): Indicates whether the input function
+            should be treated as a stream function (i.e. an AsyncGenerator,
+            or function or coroutine that returns an AsyncIterable).
+            If True, the iterable will be wrapped with retry logic, and any
+            failed outputs will restart the stream. If False, only the input
+            function call itself will be retried. Defaults to False.
+            To avoid duplicate values, retryable streams should typically be
+            wrapped in additional filter logic before use.
+        deadline (float): DEPRECATED use ``timeout`` instead. If set it will
+        override ``timeout`` parameter.
+    """
+
+    def __call__(
+        self,
+        func: Callable[..., AsyncIterable[_Y] | Awaitable[AsyncIterable[_Y]]],
+        on_error: Callable[[Exception], Any] | None = None,
+    ) -> Callable[_P, Awaitable[AsyncGenerator[_Y, None]]]:
+        """Wrap a callable with retry behavior.
+
+        Args:
+            func (Callable): The callable or stream to add retry behavior to.
+            on_error (Optional[Callable[Exception]]): If given, the
+                on_error callback will be called with each retryable exception
+                raised by the wrapped function. Any error raised by this
+                function will *not* be caught. If on_error was specified in the
+                constructor, this value will be ignored.
+
+        Returns:
+            Callable: A callable that will invoke ``func`` with retry
+                behavior.
+        """
+        if self._on_error is not None:
+            on_error = self._on_error
+
+        @functools.wraps(func)
+        async def retry_wrapped_func(
+            *args: _P.args, **kwargs: _P.kwargs
+        ) -> AsyncGenerator[_Y, None]:
+            """A wrapper that calls target function with retry."""
+            sleep_generator = exponential_sleep_generator(
+                self._initial, self._maximum, multiplier=self._multiplier
+            )
+            return retry_target_stream(
+                func,
+                self._predicate,
+                sleep_generator,
+                self._timeout,
+                on_error,
+                init_args=args,
+                init_kwargs=kwargs,
+            )
+
+        return retry_wrapped_func
diff --git a/google/api_core/retry/retry_unary.py b/google/api_core/retry/retry_unary.py
new file mode 100644
index 0000000..ae59d51
--- /dev/null
+++ b/google/api_core/retry/retry_unary.py
@@ -0,0 +1,301 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Helpers for retrying functions with exponential back-off.
+
+The :class:`Retry` decorator can be used to retry functions that raise
+exceptions using exponential backoff. Because a exponential sleep algorithm is
+used, the retry is limited by a `timeout`. The timeout determines the window
+in which retries will be attempted. This is used instead of total number of retries
+because it is difficult to ascertain the amount of time a function can block
+when using total number of retries and exponential backoff.
+
+By default, this decorator will retry transient
+API errors (see :func:`if_transient_error`). For example:
+
+.. code-block:: python
+
+    @retry.Retry()
+    def call_flaky_rpc():
+        return client.flaky_rpc()
+
+    # Will retry flaky_rpc() if it raises transient API errors.
+    result = call_flaky_rpc()
+
+You can pass a custom predicate to retry on different exceptions, such as
+waiting for an eventually consistent item to be available:
+
+.. code-block:: python
+
+    @retry.Retry(predicate=if_exception_type(exceptions.NotFound))
+    def check_if_exists():
+        return client.does_thing_exist()
+
+    is_available = check_if_exists()
+
+Some client library methods apply retry automatically. These methods can accept
+a ``retry`` parameter that allows you to configure the behavior:
+
+.. code-block:: python
+
+    my_retry = retry.Retry(timeout=60)
+    result = client.some_method(retry=my_retry)
+
+"""
+
+from __future__ import annotations
+
+import functools
+import sys
+import time
+import inspect
+import warnings
+from typing import Any, Callable, Iterable, TypeVar, TYPE_CHECKING
+
+from google.api_core.retry.retry_base import _BaseRetry
+from google.api_core.retry.retry_base import _retry_error_helper
+from google.api_core.retry.retry_base import exponential_sleep_generator
+from google.api_core.retry.retry_base import build_retry_error
+from google.api_core.retry.retry_base import RetryFailureReason
+
+
+if TYPE_CHECKING:
+    if sys.version_info >= (3, 10):
+        from typing import ParamSpec
+    else:
+        from typing_extensions import ParamSpec
+
+    _P = ParamSpec("_P")  # target function call parameters
+    _R = TypeVar("_R")  # target function returned value
+
+_ASYNC_RETRY_WARNING = "Using the synchronous google.api_core.retry.Retry with asynchronous calls may lead to unexpected results. Please use google.api_core.retry_async.AsyncRetry instead."
+
+
+def retry_target(
+    target: Callable[_P, _R],
+    predicate: Callable[[Exception], bool],
+    sleep_generator: Iterable[float],
+    timeout: float | None = None,
+    on_error: Callable[[Exception], None] | None = None,
+    exception_factory: Callable[
+        [list[Exception], RetryFailureReason, float | None],
+        tuple[Exception, Exception | None],
+    ] = build_retry_error,
+    **kwargs,
+):
+    """Call a function and retry if it fails.
+
+    This is the lowest-level retry helper. Generally, you'll use the
+    higher-level retry helper :class:`Retry`.
+
+    Args:
+        target(Callable): The function to call and retry. This must be a
+            nullary function - apply arguments with `functools.partial`.
+        predicate (Callable[Exception]): A callable used to determine if an
+            exception raised by the target should be considered retryable.
+            It should return True to retry or False otherwise.
+        sleep_generator (Iterable[float]): An infinite iterator that determines
+            how long to sleep between retries.
+        timeout (Optional[float]): How long to keep retrying the target.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error (Optional[Callable[Exception]]): If given, the on_error
+            callback will be called with each retryable exception raised by the
+            target. Any error raised by this function will *not* be caught.
+        exception_factory: A function that is called when the retryable reaches
+            a terminal failure state, used to construct an exception to be raised.
+            It takes a list of all exceptions encountered, a retry.RetryFailureReason
+            enum indicating the failure cause, and the original timeout value
+            as arguments. It should return a tuple of the exception to be raised,
+            along with the cause exception if any. The default implementation will raise
+            a RetryError on timeout, or the last exception encountered otherwise.
+        deadline (float): DEPRECATED: use ``timeout`` instead. For backward
+            compatibility, if specified it will override ``timeout`` parameter.
+
+    Returns:
+        Any: the return value of the target function.
+
+    Raises:
+        ValueError: If the sleep generator stops yielding values.
+        Exception: a custom exception specified by the exception_factory if provided.
+            If no exception_factory is provided:
+                google.api_core.RetryError: If the timeout is exceeded while retrying.
+                Exception: If the target raises an error that isn't retryable.
+    """
+
+    timeout = kwargs.get("deadline", timeout)
+
+    deadline = time.monotonic() + timeout if timeout is not None else None
+    error_list: list[Exception] = []
+
+    for sleep in sleep_generator:
+        try:
+            result = target()
+            if inspect.isawaitable(result):
+                warnings.warn(_ASYNC_RETRY_WARNING)
+            return result
+
+        # pylint: disable=broad-except
+        # This function explicitly must deal with broad exceptions.
+        except Exception as exc:
+            # defer to shared logic for handling errors
+            _retry_error_helper(
+                exc,
+                deadline,
+                sleep,
+                error_list,
+                predicate,
+                on_error,
+                exception_factory,
+                timeout,
+            )
+            # if exception not raised, sleep before next attempt
+            time.sleep(sleep)
+
+    raise ValueError("Sleep generator stopped yielding sleep values.")
+
+
+class Retry(_BaseRetry):
+    """Exponential retry decorator for unary synchronous RPCs.
+
+    This class is a decorator used to add retry or polling behavior to an RPC
+    call.
+
+    Although the default behavior is to retry transient API errors, a
+    different predicate can be provided to retry other exceptions.
+
+    There are two important concepts that retry/polling behavior may operate on,
+    Deadline and Timeout, which need to be properly defined for the correct
+    usage of this class and the rest of the library.
+
+    Deadline: a fixed point in time by which a certain operation must
+    terminate. For example, if a certain operation has a deadline
+    "2022-10-18T23:30:52.123Z" it must terminate (successfully or with an
+    error) by that time, regardless of when it was started or whether it
+    was started at all.
+
+    Timeout: the maximum duration of time after which a certain operation
+    must terminate (successfully or with an error). The countdown begins right
+    after an operation was started. For example, if an operation was started at
+    09:24:00 with timeout of 75 seconds, it must terminate no later than
+    09:25:15.
+
+    Unfortunately, in the past this class (and the api-core library as a whole) has not
+    been properly distinguishing the concepts of "timeout" and "deadline", and the
+    ``deadline`` parameter has meant ``timeout``. That is why
+    ``deadline`` has been deprecated and ``timeout`` should be used instead. If the
+    ``deadline`` parameter is set, it will override the ``timeout`` parameter.
+    In other words, ``retry.deadline`` should be treated as just a deprecated alias for
+    ``retry.timeout``.
+
+    Said another way, it is safe to assume that this class and the rest of this
+    library operate in terms of timeouts (not deadlines) unless explicitly
+    noted the usage of deadline semantics.
+
+    It is also important to
+    understand the three most common applications of the Timeout concept in the
+    context of this library.
+
+    Usually the generic Timeout term may stand for one of the following actual
+    timeouts: RPC Timeout, Retry Timeout, or Polling Timeout.
+
+    RPC Timeout: a value supplied by the client to the server so
+    that the server side knows the maximum amount of time it is expected to
+    spend handling that specific RPC. For example, in the case of gRPC transport,
+    RPC Timeout is represented by setting "grpc-timeout" header in the HTTP2
+    request. The `timeout` property of this class normally never represents the
+    RPC Timeout as it is handled separately by the ``google.api_core.timeout``
+    module of this library.
+
+    Retry Timeout: this is the most common meaning of the ``timeout`` property
+    of this class, and defines how long a certain RPC may be retried in case
+    the server returns an error.
+
+    Polling Timeout: defines how long the
+    client side is allowed to call the polling RPC repeatedly to check a status of a
+    long-running operation. Each polling RPC is
+    expected to succeed (its errors are supposed to be handled by the retry
+    logic). The decision as to whether a new polling attempt needs to be made is based
+    not on the RPC status code but  on the status of the returned
+    status of an operation. In other words: we will poll a long-running operation until
+    the operation is done or the polling timeout expires. Each poll will inform us of
+    the status of the operation. The poll consists of an RPC to the server that may
+    itself be retried as per the poll-specific retry settings in case of errors. The
+    operation-level retry settings do NOT apply to polling-RPC retries.
+
+    With the actual timeout types being defined above, the client libraries
+    often refer to just Timeout without clarifying which type specifically
+    that is. In that case the actual timeout type (sometimes also referred to as
+    Logical Timeout) can be determined from the context. If it is a unary rpc
+    call (i.e. a regular one) Timeout usually stands for the RPC Timeout (if
+    provided directly as a standalone value) or Retry Timeout (if provided as
+    ``retry.timeout`` property of the unary RPC's retry config). For
+    ``Operation`` or ``PollingFuture`` in general Timeout stands for
+    Polling Timeout.
+
+    Args:
+        predicate (Callable[Exception]): A callable that should return ``True``
+            if the given exception is retryable.
+        initial (float): The minimum amount of time to delay in seconds. This
+            must be greater than 0.
+        maximum (float): The maximum amount of time to delay in seconds.
+        multiplier (float): The multiplier applied to the delay.
+        timeout (float): How long to keep retrying, in seconds.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error (Callable[Exception]): A function to call while processing
+            a retryable exception. Any error raised by this function will
+            *not* be caught.
+        deadline (float): DEPRECATED: use `timeout` instead. For backward
+            compatibility, if specified it will override the ``timeout`` parameter.
+    """
+
+    def __call__(
+        self,
+        func: Callable[_P, _R],
+        on_error: Callable[[Exception], Any] | None = None,
+    ) -> Callable[_P, _R]:
+        """Wrap a callable with retry behavior.
+
+        Args:
+            func (Callable): The callable to add retry behavior to.
+            on_error (Optional[Callable[Exception]]): If given, the
+                on_error callback will be called with each retryable exception
+                raised by the wrapped function. Any error raised by this
+                function will *not* be caught. If on_error was specified in the
+                constructor, this value will be ignored.
+
+        Returns:
+            Callable: A callable that will invoke ``func`` with retry
+                behavior.
+        """
+        if self._on_error is not None:
+            on_error = self._on_error
+
+        @functools.wraps(func)
+        def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R:
+            """A wrapper that calls target function with retry."""
+            target = functools.partial(func, *args, **kwargs)
+            sleep_generator = exponential_sleep_generator(
+                self._initial, self._maximum, multiplier=self._multiplier
+            )
+            return retry_target(
+                target,
+                self._predicate,
+                sleep_generator,
+                timeout=self._timeout,
+                on_error=on_error,
+            )
+
+        return retry_wrapped_func
diff --git a/google/api_core/retry/retry_unary_async.py b/google/api_core/retry/retry_unary_async.py
new file mode 100644
index 0000000..f97ea93
--- /dev/null
+++ b/google/api_core/retry/retry_unary_async.py
@@ -0,0 +1,238 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Helpers for retrying coroutine functions with exponential back-off.
+
+The :class:`AsyncRetry` decorator shares most functionality and behavior with
+:class:`Retry`, but supports coroutine functions. Please refer to description
+of :class:`Retry` for more details.
+
+By default, this decorator will retry transient
+API errors (see :func:`if_transient_error`). For example:
+
+.. code-block:: python
+
+    @retry_async.AsyncRetry()
+    async def call_flaky_rpc():
+        return await client.flaky_rpc()
+
+    # Will retry flaky_rpc() if it raises transient API errors.
+    result = await call_flaky_rpc()
+
+You can pass a custom predicate to retry on different exceptions, such as
+waiting for an eventually consistent item to be available:
+
+.. code-block:: python
+
+    @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound))
+    async def check_if_exists():
+        return await client.does_thing_exist()
+
+    is_available = await check_if_exists()
+
+Some client library methods apply retry automatically. These methods can accept
+a ``retry`` parameter that allows you to configure the behavior:
+
+.. code-block:: python
+
+    my_retry = retry_async.AsyncRetry(timeout=60)
+    result = await client.some_method(retry=my_retry)
+
+"""
+
+from __future__ import annotations
+
+import asyncio
+import time
+import functools
+from typing import (
+    Awaitable,
+    Any,
+    Callable,
+    Iterable,
+    TypeVar,
+    TYPE_CHECKING,
+)
+
+from google.api_core.retry.retry_base import _BaseRetry
+from google.api_core.retry.retry_base import _retry_error_helper
+from google.api_core.retry.retry_base import exponential_sleep_generator
+from google.api_core.retry.retry_base import build_retry_error
+from google.api_core.retry.retry_base import RetryFailureReason
+
+# for backwards compatibility, expose helpers in this module
+from google.api_core.retry.retry_base import if_exception_type  # noqa
+from google.api_core.retry.retry_base import if_transient_error  # noqa
+
+if TYPE_CHECKING:
+    import sys
+
+    if sys.version_info >= (3, 10):
+        from typing import ParamSpec
+    else:
+        from typing_extensions import ParamSpec
+
+    _P = ParamSpec("_P")  # target function call parameters
+    _R = TypeVar("_R")  # target function returned value
+
+_DEFAULT_INITIAL_DELAY = 1.0  # seconds
+_DEFAULT_MAXIMUM_DELAY = 60.0  # seconds
+_DEFAULT_DELAY_MULTIPLIER = 2.0
+_DEFAULT_DEADLINE = 60.0 * 2.0  # seconds
+_DEFAULT_TIMEOUT = 60.0 * 2.0  # seconds
+
+
+async def retry_target(
+    target: Callable[_P, Awaitable[_R]],
+    predicate: Callable[[Exception], bool],
+    sleep_generator: Iterable[float],
+    timeout: float | None = None,
+    on_error: Callable[[Exception], None] | None = None,
+    exception_factory: Callable[
+        [list[Exception], RetryFailureReason, float | None],
+        tuple[Exception, Exception | None],
+    ] = build_retry_error,
+    **kwargs,
+):
+    """Await a coroutine and retry if it fails.
+
+    This is the lowest-level retry helper. Generally, you'll use the
+    higher-level retry helper :class:`Retry`.
+
+    Args:
+        target(Callable[[], Any]): The function to call and retry. This must be a
+            nullary function - apply arguments with `functools.partial`.
+        predicate (Callable[Exception]): A callable used to determine if an
+            exception raised by the target should be considered retryable.
+            It should return True to retry or False otherwise.
+        sleep_generator (Iterable[float]): An infinite iterator that determines
+            how long to sleep between retries.
+        timeout (Optional[float]): How long to keep retrying the target, in seconds.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error (Optional[Callable[Exception]]): If given, the on_error
+            callback will be called with each retryable exception raised by the
+            target. Any error raised by this function will *not* be caught.
+        exception_factory: A function that is called when the retryable reaches
+            a terminal failure state, used to construct an exception to be raised.
+            It takes a list of all exceptions encountered, a retry.RetryFailureReason
+            enum indicating the failure cause, and the original timeout value
+            as arguments. It should return a tuple of the exception to be raised,
+            along with the cause exception if any. The default implementation will raise
+            a RetryError on timeout, or the last exception encountered otherwise.
+        deadline (float): DEPRECATED use ``timeout`` instead. For backward
+            compatibility, if set it will override the ``timeout`` parameter.
+
+    Returns:
+        Any: the return value of the target function.
+
+    Raises:
+        ValueError: If the sleep generator stops yielding values.
+        Exception: a custom exception specified by the exception_factory if provided.
+            If no exception_factory is provided:
+                google.api_core.RetryError: If the timeout is exceeded while retrying.
+                Exception: If the target raises an error that isn't retryable.
+    """
+
+    timeout = kwargs.get("deadline", timeout)
+
+    deadline = time.monotonic() + timeout if timeout is not None else None
+    error_list: list[Exception] = []
+
+    for sleep in sleep_generator:
+        try:
+            return await target()
+        # pylint: disable=broad-except
+        # This function explicitly must deal with broad exceptions.
+        except Exception as exc:
+            # defer to shared logic for handling errors
+            _retry_error_helper(
+                exc,
+                deadline,
+                sleep,
+                error_list,
+                predicate,
+                on_error,
+                exception_factory,
+                timeout,
+            )
+            # if exception not raised, sleep before next attempt
+            await asyncio.sleep(sleep)
+
+    raise ValueError("Sleep generator stopped yielding sleep values.")
+
+
+class AsyncRetry(_BaseRetry):
+    """Exponential retry decorator for async coroutines.
+
+    This class is a decorator used to add exponential back-off retry behavior
+    to an RPC call.
+
+    Although the default behavior is to retry transient API errors, a
+    different predicate can be provided to retry other exceptions.
+
+    Args:
+        predicate (Callable[Exception]): A callable that should return ``True``
+            if the given exception is retryable.
+        initial (float): The minimum a,out of time to delay in seconds. This
+            must be greater than 0.
+        maximum (float): The maximum amount of time to delay in seconds.
+        multiplier (float): The multiplier applied to the delay.
+        timeout (Optional[float]): How long to keep retrying in seconds.
+            Note: timeout is only checked before initiating a retry, so the target may
+            run past the timeout value as long as it is healthy.
+        on_error (Optional[Callable[Exception]]): A function to call while processing
+            a retryable exception. Any error raised by this function will
+            *not* be caught.
+        deadline (float): DEPRECATED use ``timeout`` instead. If set it will
+        override ``timeout`` parameter.
+    """
+
+    def __call__(
+        self,
+        func: Callable[..., Awaitable[_R]],
+        on_error: Callable[[Exception], Any] | None = None,
+    ) -> Callable[_P, Awaitable[_R]]:
+        """Wrap a callable with retry behavior.
+
+        Args:
+            func (Callable): The callable or stream to add retry behavior to.
+            on_error (Optional[Callable[Exception]]): If given, the
+                on_error callback will be called with each retryable exception
+                raised by the wrapped function. Any error raised by this
+                function will *not* be caught. If on_error was specified in the
+                constructor, this value will be ignored.
+
+        Returns:
+            Callable: A callable that will invoke ``func`` with retry
+                behavior.
+        """
+        if self._on_error is not None:
+            on_error = self._on_error
+
+        @functools.wraps(func)
+        async def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R:
+            """A wrapper that calls target function with retry."""
+            sleep_generator = exponential_sleep_generator(
+                self._initial, self._maximum, multiplier=self._multiplier
+            )
+            return await retry_target(
+                functools.partial(func, *args, **kwargs),
+                predicate=self._predicate,
+                sleep_generator=sleep_generator,
+                timeout=self._timeout,
+                on_error=on_error,
+            )
+
+        return retry_wrapped_func
diff --git a/google/api_core/retry_async.py b/google/api_core/retry_async.py
deleted file mode 100644
index 739e88d..0000000
--- a/google/api_core/retry_async.py
+++ /dev/null
@@ -1,311 +0,0 @@
-# Copyright 2020 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Helpers for retrying coroutine functions with exponential back-off.
-
-The :class:`AsyncRetry` decorator shares most functionality and behavior with
-:class:`Retry`, but supports coroutine functions. Please refer to description
-of :class:`Retry` for more details.
-
-By default, this decorator will retry transient
-API errors (see :func:`if_transient_error`). For example:
-
-.. code-block:: python
-
-    @retry_async.AsyncRetry()
-    async def call_flaky_rpc():
-        return await client.flaky_rpc()
-
-    # Will retry flaky_rpc() if it raises transient API errors.
-    result = await call_flaky_rpc()
-
-You can pass a custom predicate to retry on different exceptions, such as
-waiting for an eventually consistent item to be available:
-
-.. code-block:: python
-
-    @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound))
-    async def check_if_exists():
-        return await client.does_thing_exist()
-
-    is_available = await check_if_exists()
-
-Some client library methods apply retry automatically. These methods can accept
-a ``retry`` parameter that allows you to configure the behavior:
-
-.. code-block:: python
-
-    my_retry = retry_async.AsyncRetry(deadline=60)
-    result = await client.some_method(retry=my_retry)
-
-"""
-
-import asyncio
-import datetime
-import functools
-import logging
-
-from google.api_core import datetime_helpers
-from google.api_core import exceptions
-from google.api_core.retry import exponential_sleep_generator
-from google.api_core.retry import if_exception_type  # noqa: F401
-from google.api_core.retry import if_transient_error
-
-
-_LOGGER = logging.getLogger(__name__)
-_DEFAULT_INITIAL_DELAY = 1.0  # seconds
-_DEFAULT_MAXIMUM_DELAY = 60.0  # seconds
-_DEFAULT_DELAY_MULTIPLIER = 2.0
-_DEFAULT_DEADLINE = 60.0 * 2.0  # seconds
-_DEFAULT_TIMEOUT = 60.0 * 2.0  # seconds
-
-
-async def retry_target(
-    target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs
-):
-    """Call a function and retry if it fails.
-
-    This is the lowest-level retry helper. Generally, you'll use the
-    higher-level retry helper :class:`Retry`.
-
-    Args:
-        target(Callable): The function to call and retry. This must be a
-            nullary function - apply arguments with `functools.partial`.
-        predicate (Callable[Exception]): A callable used to determine if an
-            exception raised by the target should be considered retryable.
-            It should return True to retry or False otherwise.
-        sleep_generator (Iterable[float]): An infinite iterator that determines
-            how long to sleep between retries.
-        timeout (float): How long to keep retrying the target, in seconds.
-        on_error (Callable[Exception]): A function to call while processing a
-            retryable exception.  Any error raised by this function will *not*
-            be caught.
-        deadline (float): DEPRECATED use ``timeout`` instead. For backward
-        compatibility, if set it will override the ``timeout`` parameter.
-
-    Returns:
-        Any: the return value of the target function.
-
-    Raises:
-        google.api_core.RetryError: If the deadline is exceeded while retrying.
-        ValueError: If the sleep generator stops yielding values.
-        Exception: If the target raises a method that isn't retryable.
-    """
-
-    timeout = kwargs.get("deadline", timeout)
-
-    deadline_dt = (
-        (datetime_helpers.utcnow() + datetime.timedelta(seconds=timeout))
-        if timeout
-        else None
-    )
-
-    last_exc = None
-
-    for sleep in sleep_generator:
-        try:
-            if not deadline_dt:
-                return await target()
-            else:
-                return await asyncio.wait_for(
-                    target(),
-                    timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds(),
-                )
-        # pylint: disable=broad-except
-        # This function explicitly must deal with broad exceptions.
-        except Exception as exc:
-            if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError):
-                raise
-            last_exc = exc
-            if on_error is not None:
-                on_error(exc)
-
-        now = datetime_helpers.utcnow()
-
-        if deadline_dt:
-            if deadline_dt <= now:
-                # Chains the raising RetryError with the root cause error,
-                # which helps observability and debugability.
-                raise exceptions.RetryError(
-                    "Timeout of {:.1f}s exceeded while calling target function".format(
-                        timeout
-                    ),
-                    last_exc,
-                ) from last_exc
-            else:
-                time_to_deadline = (deadline_dt - now).total_seconds()
-                sleep = min(time_to_deadline, sleep)
-
-        _LOGGER.debug(
-            "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
-        )
-        await asyncio.sleep(sleep)
-
-    raise ValueError("Sleep generator stopped yielding sleep values.")
-
-
-class AsyncRetry:
-    """Exponential retry decorator for async functions.
-
-    This class is a decorator used to add exponential back-off retry behavior
-    to an RPC call.
-
-    Although the default behavior is to retry transient API errors, a
-    different predicate can be provided to retry other exceptions.
-
-    Args:
-        predicate (Callable[Exception]): A callable that should return ``True``
-            if the given exception is retryable.
-        initial (float): The minimum a,out of time to delay in seconds. This
-            must be greater than 0.
-        maximum (float): The maximum amount of time to delay in seconds.
-        multiplier (float): The multiplier applied to the delay.
-        timeout (float): How long to keep retrying in seconds.
-        on_error (Callable[Exception]): A function to call while processing
-            a retryable exception. Any error raised by this function will
-            *not* be caught.
-        deadline (float): DEPRECATED use ``timeout`` instead. If set it will
-        override ``timeout`` parameter.
-    """
-
-    def __init__(
-        self,
-        predicate=if_transient_error,
-        initial=_DEFAULT_INITIAL_DELAY,
-        maximum=_DEFAULT_MAXIMUM_DELAY,
-        multiplier=_DEFAULT_DELAY_MULTIPLIER,
-        timeout=_DEFAULT_TIMEOUT,
-        on_error=None,
-        **kwargs
-    ):
-        self._predicate = predicate
-        self._initial = initial
-        self._multiplier = multiplier
-        self._maximum = maximum
-        self._timeout = kwargs.get("deadline", timeout)
-        self._deadline = self._timeout
-        self._on_error = on_error
-
-    def __call__(self, func, on_error=None):
-        """Wrap a callable with retry behavior.
-
-        Args:
-            func (Callable): The callable to add retry behavior to.
-            on_error (Callable[Exception]): A function to call while processing
-                a retryable exception. Any error raised by this function will
-                *not* be caught.
-
-        Returns:
-            Callable: A callable that will invoke ``func`` with retry
-                behavior.
-        """
-        if self._on_error is not None:
-            on_error = self._on_error
-
-        @functools.wraps(func)
-        async def retry_wrapped_func(*args, **kwargs):
-            """A wrapper that calls target function with retry."""
-            target = functools.partial(func, *args, **kwargs)
-            sleep_generator = exponential_sleep_generator(
-                self._initial, self._maximum, multiplier=self._multiplier
-            )
-            return await retry_target(
-                target,
-                self._predicate,
-                sleep_generator,
-                self._timeout,
-                on_error=on_error,
-            )
-
-        return retry_wrapped_func
-
-    def _replace(
-        self,
-        predicate=None,
-        initial=None,
-        maximum=None,
-        multiplier=None,
-        timeout=None,
-        on_error=None,
-    ):
-        return AsyncRetry(
-            predicate=predicate or self._predicate,
-            initial=initial or self._initial,
-            maximum=maximum or self._maximum,
-            multiplier=multiplier or self._multiplier,
-            timeout=timeout or self._timeout,
-            on_error=on_error or self._on_error,
-        )
-
-    def with_deadline(self, deadline):
-        """Return a copy of this retry with the given deadline.
-        DEPRECATED: use :meth:`with_timeout` instead.
-
-        Args:
-            deadline (float): How long to keep retrying.
-
-        Returns:
-            AsyncRetry: A new retry instance with the given deadline.
-        """
-        return self._replace(timeout=deadline)
-
-    def with_timeout(self, timeout):
-        """Return a copy of this retry with the given timeout.
-
-        Args:
-            timeout (float): How long to keep retrying, in seconds.
-
-        Returns:
-            AsyncRetry: A new retry instance with the given timeout.
-        """
-        return self._replace(timeout=timeout)
-
-    def with_predicate(self, predicate):
-        """Return a copy of this retry with the given predicate.
-
-        Args:
-            predicate (Callable[Exception]): A callable that should return
-                ``True`` if the given exception is retryable.
-
-        Returns:
-            AsyncRetry: A new retry instance with the given predicate.
-        """
-        return self._replace(predicate=predicate)
-
-    def with_delay(self, initial=None, maximum=None, multiplier=None):
-        """Return a copy of this retry with the given delay options.
-
-        Args:
-            initial (float): The minimum amount of time to delay. This must
-                be greater than 0.
-            maximum (float): The maximum amount of time to delay.
-            multiplier (float): The multiplier applied to the delay.
-
-        Returns:
-            AsyncRetry: A new retry instance with the given predicate.
-        """
-        return self._replace(initial=initial, maximum=maximum, multiplier=multiplier)
-
-    def __str__(self):
-        return (
-            "<AsyncRetry predicate={}, initial={:.1f}, maximum={:.1f}, "
-            "multiplier={:.1f}, timeout={:.1f}, on_error={}>".format(
-                self._predicate,
-                self._initial,
-                self._maximum,
-                self._multiplier,
-                self._timeout,
-                self._on_error,
-            )
-        )
diff --git a/tests/asyncio/retry/__init__.py b/tests/asyncio/retry/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/asyncio/retry/__init__.py
diff --git a/tests/asyncio/retry/test_retry_streaming_async.py b/tests/asyncio/retry/test_retry_streaming_async.py
new file mode 100644
index 0000000..28ae6ff
--- /dev/null
+++ b/tests/asyncio/retry/test_retry_streaming_async.py
@@ -0,0 +1,562 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import re
+import asyncio
+
+import mock
+import pytest
+
+from google.api_core import exceptions
+from google.api_core import retry_async
+from google.api_core.retry import retry_streaming_async
+
+from ...unit.retry.test_retry_base import Test_BaseRetry
+
+
+@pytest.mark.asyncio
+async def test_retry_streaming_target_bad_sleep_generator():
+    from google.api_core.retry.retry_streaming_async import retry_target_stream
+
+    with pytest.raises(ValueError, match="Sleep generator"):
+        await retry_target_stream(None, None, [], None).__anext__()
+
+
+class TestAsyncStreamingRetry(Test_BaseRetry):
+    def _make_one(self, *args, **kwargs):
+        return retry_streaming_async.AsyncStreamingRetry(*args, **kwargs)
+
+    def test___str__(self):
+        def if_exception_type(exc):
+            return bool(exc)  # pragma: NO COVER
+
+        # Explicitly set all attributes as changed Retry defaults should not
+        # cause this test to start failing.
+        retry_ = retry_streaming_async.AsyncStreamingRetry(
+            predicate=if_exception_type,
+            initial=1.0,
+            maximum=60.0,
+            multiplier=2.0,
+            timeout=120.0,
+            on_error=None,
+        )
+        assert re.match(
+            (
+                r"<AsyncStreamingRetry predicate=<function.*?if_exception_type.*?>, "
+                r"initial=1.0, maximum=60.0, multiplier=2.0, timeout=120.0, "
+                r"on_error=None>"
+            ),
+            str(retry_),
+        )
+
+    async def _generator_mock(
+        self,
+        num=5,
+        error_on=None,
+        exceptions_seen=None,
+        sleep_time=0,
+    ):
+        """
+        Helper to create a mock generator that yields a number of values
+        Generator can optionally raise an exception on a specific iteration
+
+        Args:
+          - num (int): the number of values to yield
+          - error_on (int): if given, the generator will raise a ValueError on the specified iteration
+          - exceptions_seen (list): if given, the generator will append any exceptions to this list before raising
+          - sleep_time (int): if given, the generator will asyncio.sleep for this many seconds before yielding each value
+        """
+        try:
+            for i in range(num):
+                if sleep_time:
+                    await asyncio.sleep(sleep_time)
+                if error_on and i == error_on:
+                    raise ValueError("generator mock error")
+                yield i
+        except (Exception, BaseException, GeneratorExit) as e:
+            # keep track of exceptions seen by generator
+            if exceptions_seen is not None:
+                exceptions_seen.append(e)
+            raise
+
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___generator_success(self, sleep):
+        """
+        Test that a retry-decorated generator yields values as expected
+        This test checks a generator with no issues
+        """
+        from collections.abc import AsyncGenerator
+
+        retry_ = retry_streaming_async.AsyncStreamingRetry()
+        decorated = retry_(self._generator_mock)
+
+        num = 10
+        generator = await decorated(num)
+        # check types
+        assert isinstance(generator, AsyncGenerator)
+        assert isinstance(self._generator_mock(num), AsyncGenerator)
+        # check yield contents
+        unpacked = [i async for i in generator]
+        assert len(unpacked) == num
+        expected = [i async for i in self._generator_mock(num)]
+        for a, b in zip(unpacked, expected):
+            assert a == b
+        sleep.assert_not_called()
+
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___generator_retry(self, sleep):
+        """
+        Tests that a retry-decorated generator will retry on errors
+        """
+        on_error = mock.Mock(return_value=None)
+        retry_ = retry_streaming_async.AsyncStreamingRetry(
+            on_error=on_error,
+            predicate=retry_async.if_exception_type(ValueError),
+            timeout=None,
+        )
+        generator = await retry_(self._generator_mock)(error_on=3)
+        # error thrown on 3
+        # generator should contain 0, 1, 2 looping
+        unpacked = [await generator.__anext__() for i in range(10)]
+        assert unpacked == [0, 1, 2, 0, 1, 2, 0, 1, 2, 0]
+        assert on_error.call_count == 3
+
+    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.parametrize("use_deadline_arg", [True, False])
+    @pytest.mark.asyncio
+    async def test___call___generator_retry_hitting_timeout(
+        self, sleep, uniform, use_deadline_arg
+    ):
+        """
+        Tests that a retry-decorated generator will throw a RetryError
+        after using the time budget
+        """
+        import time
+
+        timeout_val = 9.9
+        # support "deadline" as an alias for "timeout"
+        timeout_kwarg = (
+            {"timeout": timeout_val}
+            if not use_deadline_arg
+            else {"deadline": timeout_val}
+        )
+
+        on_error = mock.Mock()
+        retry_ = retry_streaming_async.AsyncStreamingRetry(
+            predicate=retry_async.if_exception_type(ValueError),
+            initial=1.0,
+            maximum=1024.0,
+            multiplier=2.0,
+            **timeout_kwarg,
+        )
+
+        time_now = time.monotonic()
+        now_patcher = mock.patch(
+            "time.monotonic",
+            return_value=time_now,
+        )
+
+        decorated = retry_(self._generator_mock, on_error=on_error)
+        generator = await decorated(error_on=1)
+
+        with now_patcher as patched_now:
+            # Make sure that calls to fake asyncio.sleep() also advance the mocked
+            # time clock.
+            def increase_time(sleep_delay):
+                patched_now.return_value += sleep_delay
+
+            sleep.side_effect = increase_time
+
+            with pytest.raises(exceptions.RetryError):
+                [i async for i in generator]
+
+        assert on_error.call_count == 4
+        # check the delays
+        assert sleep.call_count == 3  # once between each successive target calls
+        last_wait = sleep.call_args.args[0]
+        total_wait = sum(call_args.args[0] for call_args in sleep.call_args_list)
+        # next wait would have put us over, so ended early
+        assert last_wait == 4
+        assert total_wait == 7
+
+    @pytest.mark.asyncio
+    async def test___call___generator_cancellations(self):
+        """
+        cancel calls should propagate to the generator
+        """
+        # test without cancel as retryable
+        retry_ = retry_streaming_async.AsyncStreamingRetry()
+        utcnow = datetime.datetime.now(datetime.timezone.utc)
+        mock.patch("google.api_core.datetime_helpers.utcnow", return_value=utcnow)
+        generator = await retry_(self._generator_mock)(sleep_time=0.2)
+        assert await generator.__anext__() == 0
+        task = asyncio.create_task(generator.__anext__())
+        task.cancel()
+        with pytest.raises(asyncio.CancelledError):
+            await task
+        with pytest.raises(StopAsyncIteration):
+            await generator.__anext__()
+
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___with_generator_send(self, sleep):
+        """
+        Send should be passed through retry into target generator
+        """
+
+        async def _mock_send_gen():
+            """
+            always yield whatever was sent in
+            """
+            in_ = yield
+            while True:
+                in_ = yield in_
+
+        retry_ = retry_streaming_async.AsyncStreamingRetry()
+
+        decorated = retry_(_mock_send_gen)
+
+        generator = await decorated()
+        result = await generator.__anext__()
+        # first yield should be None
+        assert result is None
+        in_messages = ["test_1", "hello", "world"]
+        out_messages = []
+        for msg in in_messages:
+            recv = await generator.asend(msg)
+            out_messages.append(recv)
+        assert in_messages == out_messages
+
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___generator_send_retry(self, sleep):
+        """
+        Send should be retried if target generator raises an error
+        """
+        on_error = mock.Mock(return_value=None)
+        retry_ = retry_streaming_async.AsyncStreamingRetry(
+            on_error=on_error,
+            predicate=retry_async.if_exception_type(ValueError),
+            timeout=None,
+        )
+        generator = await retry_(self._generator_mock)(error_on=3)
+        with pytest.raises(TypeError) as exc_info:
+            await generator.asend("cannot send to fresh generator")
+            assert exc_info.match("can't send non-None value")
+
+        # error thrown on 3
+        # generator should contain 0, 1, 2 looping
+        generator = await retry_(self._generator_mock)(error_on=3)
+        assert await generator.__anext__() == 0
+        unpacked = [await generator.asend(i) for i in range(10)]
+        assert unpacked == [1, 2, 0, 1, 2, 0, 1, 2, 0, 1]
+        assert on_error.call_count == 3
+
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___with_generator_close(self, sleep):
+        """
+        Close should be passed through retry into target generator
+        """
+        retry_ = retry_streaming_async.AsyncStreamingRetry()
+        decorated = retry_(self._generator_mock)
+        exception_list = []
+        generator = await decorated(10, exceptions_seen=exception_list)
+        for i in range(2):
+            await generator.__anext__()
+        await generator.aclose()
+
+        assert isinstance(exception_list[0], GeneratorExit)
+        with pytest.raises(StopAsyncIteration):
+            # calling next on closed generator should raise error
+            await generator.__anext__()
+
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___with_new_generator_close(self, sleep):
+        """
+        Close should be passed through retry into target generator,
+        even when it hasn't been iterated yet
+        """
+        retry_ = retry_streaming_async.AsyncStreamingRetry()
+        decorated = retry_(self._generator_mock)
+        exception_list = []
+        generator = await decorated(10, exceptions_seen=exception_list)
+        await generator.aclose()
+
+        with pytest.raises(StopAsyncIteration):
+            # calling next on closed generator should raise error
+            await generator.__anext__()
+
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___with_generator_throw(self, sleep):
+        """
+        Throw should be passed through retry into target generator
+        """
+
+        # The generator should not retry when it encounters a non-retryable error
+        retry_ = retry_streaming_async.AsyncStreamingRetry(
+            predicate=retry_async.if_exception_type(ValueError),
+        )
+        decorated = retry_(self._generator_mock)
+        exception_list = []
+        generator = await decorated(10, exceptions_seen=exception_list)
+        for i in range(2):
+            await generator.__anext__()
+        with pytest.raises(BufferError):
+            await generator.athrow(BufferError("test"))
+        assert isinstance(exception_list[0], BufferError)
+        with pytest.raises(StopAsyncIteration):
+            # calling next on closed generator should raise error
+            await generator.__anext__()
+
+        # In contrast, the generator should retry if we throw a retryable exception
+        exception_list = []
+        generator = await decorated(10, exceptions_seen=exception_list)
+        for i in range(2):
+            await generator.__anext__()
+        throw_val = await generator.athrow(ValueError("test"))
+        assert throw_val == 0
+        assert isinstance(exception_list[0], ValueError)
+        # calling next on generator should not raise error, because it was retried
+        assert await generator.__anext__() == 1
+
+    @pytest.mark.parametrize("awaitable_wrapped", [True, False])
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___with_iterable_send(self, sleep, awaitable_wrapped):
+        """
+        Send should work like next if the wrapped iterable does not support it
+        """
+        retry_ = retry_streaming_async.AsyncStreamingRetry()
+
+        def iterable_fn():
+            class CustomIterable:
+                def __init__(self):
+                    self.i = -1
+
+                def __aiter__(self):
+                    return self
+
+                async def __anext__(self):
+                    self.i += 1
+                    return self.i
+
+            return CustomIterable()
+
+        if awaitable_wrapped:
+
+            async def wrapper():
+                return iterable_fn()
+
+            decorated = retry_(wrapper)
+        else:
+            decorated = retry_(iterable_fn)
+
+        retryable = await decorated()
+        # initiate the generator by calling next
+        result = await retryable.__anext__()
+        assert result == 0
+        # test sending values
+        assert await retryable.asend("test") == 1
+        assert await retryable.asend("test2") == 2
+        assert await retryable.asend("test3") == 3
+
+    @pytest.mark.parametrize("awaitable_wrapped", [True, False])
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___with_iterable_close(self, sleep, awaitable_wrapped):
+        """
+        close should be handled by wrapper if wrapped iterable does not support it
+        """
+        retry_ = retry_streaming_async.AsyncStreamingRetry()
+
+        def iterable_fn():
+            class CustomIterable:
+                def __init__(self):
+                    self.i = -1
+
+                def __aiter__(self):
+                    return self
+
+                async def __anext__(self):
+                    self.i += 1
+                    return self.i
+
+            return CustomIterable()
+
+        if awaitable_wrapped:
+
+            async def wrapper():
+                return iterable_fn()
+
+            decorated = retry_(wrapper)
+        else:
+            decorated = retry_(iterable_fn)
+
+        # try closing active generator
+        retryable = await decorated()
+        assert await retryable.__anext__() == 0
+        await retryable.aclose()
+        with pytest.raises(StopAsyncIteration):
+            await retryable.__anext__()
+        # try closing new generator
+        new_retryable = await decorated()
+        await new_retryable.aclose()
+        with pytest.raises(StopAsyncIteration):
+            await new_retryable.__anext__()
+
+    @pytest.mark.parametrize("awaitable_wrapped", [True, False])
+    @mock.patch("asyncio.sleep", autospec=True)
+    @pytest.mark.asyncio
+    async def test___call___with_iterable_throw(self, sleep, awaitable_wrapped):
+        """
+        Throw should work even if the wrapped iterable does not support it
+        """
+
+        predicate = retry_async.if_exception_type(ValueError)
+        retry_ = retry_streaming_async.AsyncStreamingRetry(predicate=predicate)
+
+        def iterable_fn():
+            class CustomIterable:
+                def __init__(self):
+                    self.i = -1
+
+                def __aiter__(self):
+                    return self
+
+                async def __anext__(self):
+                    self.i += 1
+                    return self.i
+
+            return CustomIterable()
+
+        if awaitable_wrapped:
+
+            async def wrapper():
+                return iterable_fn()
+
+            decorated = retry_(wrapper)
+        else:
+            decorated = retry_(iterable_fn)
+
+        # try throwing with active generator
+        retryable = await decorated()
+        assert await retryable.__anext__() == 0
+        # should swallow errors in predicate
+        await retryable.athrow(ValueError("test"))
+        # should raise errors not in predicate
+        with pytest.raises(BufferError):
+            await retryable.athrow(BufferError("test"))
+        with pytest.raises(StopAsyncIteration):
+            await retryable.__anext__()
+        # try throwing with new generator
+        new_retryable = await decorated()
+        with pytest.raises(BufferError):
+            await new_retryable.athrow(BufferError("test"))
+        with pytest.raises(StopAsyncIteration):
+            await new_retryable.__anext__()
+
+    @pytest.mark.asyncio
+    async def test_exc_factory_non_retryable_error(self):
+        """
+        generator should give the option to override exception creation logic
+        test when non-retryable error is thrown
+        """
+        from google.api_core.retry import RetryFailureReason
+        from google.api_core.retry.retry_streaming_async import retry_target_stream
+
+        timeout = 6
+        sent_errors = [ValueError("test"), ValueError("test2"), BufferError("test3")]
+        expected_final_err = RuntimeError("done")
+        expected_source_err = ZeroDivisionError("test4")
+
+        def factory(*args, **kwargs):
+            assert len(kwargs) == 0
+            assert args[0] == sent_errors
+            assert args[1] == RetryFailureReason.NON_RETRYABLE_ERROR
+            assert args[2] == timeout
+            return expected_final_err, expected_source_err
+
+        generator = retry_target_stream(
+            self._generator_mock,
+            retry_async.if_exception_type(ValueError),
+            [0] * 3,
+            timeout=timeout,
+            exception_factory=factory,
+        )
+        # initialize the generator
+        await generator.__anext__()
+        # trigger some retryable errors
+        await generator.athrow(sent_errors[0])
+        await generator.athrow(sent_errors[1])
+        # trigger a non-retryable error
+        with pytest.raises(expected_final_err.__class__) as exc_info:
+            await generator.athrow(sent_errors[2])
+        assert exc_info.value == expected_final_err
+        assert exc_info.value.__cause__ == expected_source_err
+
+    @pytest.mark.asyncio
+    async def test_exc_factory_timeout(self):
+        """
+        generator should give the option to override exception creation logic
+        test when timeout is exceeded
+        """
+        import time
+        from google.api_core.retry import RetryFailureReason
+        from google.api_core.retry.retry_streaming_async import retry_target_stream
+
+        timeout = 2
+        time_now = time.monotonic()
+        now_patcher = mock.patch(
+            "time.monotonic",
+            return_value=time_now,
+        )
+
+        with now_patcher as patched_now:
+            timeout = 2
+            sent_errors = [ValueError("test"), ValueError("test2"), ValueError("test3")]
+            expected_final_err = RuntimeError("done")
+            expected_source_err = ZeroDivisionError("test4")
+
+            def factory(*args, **kwargs):
+                assert len(kwargs) == 0
+                assert args[0] == sent_errors
+                assert args[1] == RetryFailureReason.TIMEOUT
+                assert args[2] == timeout
+                return expected_final_err, expected_source_err
+
+            generator = retry_target_stream(
+                self._generator_mock,
+                retry_async.if_exception_type(ValueError),
+                [0] * 3,
+                timeout=timeout,
+                exception_factory=factory,
+            )
+            # initialize the generator
+            await generator.__anext__()
+            # trigger some retryable errors
+            await generator.athrow(sent_errors[0])
+            await generator.athrow(sent_errors[1])
+            # trigger a timeout
+            patched_now.return_value += timeout + 1
+            with pytest.raises(expected_final_err.__class__) as exc_info:
+                await generator.athrow(sent_errors[2])
+            assert exc_info.value == expected_final_err
+            assert exc_info.value.__cause__ == expected_source_err
diff --git a/tests/asyncio/test_retry_async.py b/tests/asyncio/retry/test_retry_unary_async.py
similarity index 65%
rename from tests/asyncio/test_retry_async.py
rename to tests/asyncio/retry/test_retry_unary_async.py
index 16f5c3d..fc2f572 100644
--- a/tests/asyncio/test_retry_async.py
+++ b/tests/asyncio/retry/test_retry_unary_async.py
@@ -21,6 +21,8 @@
 from google.api_core import exceptions
 from google.api_core import retry_async
 
+from ...unit.retry.test_retry_base import Test_BaseRetry
+
 
 @mock.patch("asyncio.sleep", autospec=True)
 @mock.patch(
@@ -97,23 +99,25 @@
 
 
 @mock.patch("asyncio.sleep", autospec=True)
-@mock.patch("google.api_core.datetime_helpers.utcnow", autospec=True)
+@mock.patch("time.monotonic", autospec=True)
+@pytest.mark.parametrize("use_deadline_arg", [True, False])
 @pytest.mark.asyncio
-async def test_retry_target_deadline_exceeded(utcnow, sleep):
+async def test_retry_target_timeout_exceeded(monotonic, sleep, use_deadline_arg):
     predicate = retry_async.if_exception_type(ValueError)
     exception = ValueError("meep")
     target = mock.Mock(side_effect=exception)
     # Setup the timeline so that the first call takes 5 seconds but the second
-    # call takes 6, which puts the retry over the deadline.
-    utcnow.side_effect = [
-        # The first call to utcnow establishes the start of the timeline.
-        datetime.datetime.min,
-        datetime.datetime.min + datetime.timedelta(seconds=5),
-        datetime.datetime.min + datetime.timedelta(seconds=11),
-    ]
+    # call takes 6, which puts the retry over the timeout.
+    monotonic.side_effect = [0, 5, 11]
+
+    timeout_val = 10
+    # support "deadline" as an alias for "timeout"
+    timeout_kwarg = (
+        {"timeout": timeout_val} if not use_deadline_arg else {"deadline": timeout_val}
+    )
 
     with pytest.raises(exceptions.RetryError) as exc_info:
-        await retry_async.retry_target(target, predicate, range(10), deadline=10)
+        await retry_async.retry_target(target, predicate, range(10), **timeout_kwarg)
 
     assert exc_info.value.cause == exception
     assert exc_info.match("Timeout of 10.0s exceeded")
@@ -133,108 +137,9 @@
         )
 
 
-class TestAsyncRetry:
-    def test_constructor_defaults(self):
-        retry_ = retry_async.AsyncRetry()
-        assert retry_._predicate == retry_async.if_transient_error
-        assert retry_._initial == 1
-        assert retry_._maximum == 60
-        assert retry_._multiplier == 2
-        assert retry_._deadline == 120
-        assert retry_._on_error is None
-
-    def test_constructor_options(self):
-        _some_function = mock.Mock()
-
-        retry_ = retry_async.AsyncRetry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=_some_function,
-        )
-        assert retry_._predicate == mock.sentinel.predicate
-        assert retry_._initial == 1
-        assert retry_._maximum == 2
-        assert retry_._multiplier == 3
-        assert retry_._deadline == 4
-        assert retry_._on_error is _some_function
-
-    def test_with_deadline(self):
-        retry_ = retry_async.AsyncRetry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_deadline(42)
-        assert retry_ is not new_retry
-        assert new_retry._deadline == 42
-
-        # the rest of the attributes should remain the same
-        assert new_retry._predicate is retry_._predicate
-        assert new_retry._initial == retry_._initial
-        assert new_retry._maximum == retry_._maximum
-        assert new_retry._multiplier == retry_._multiplier
-        assert new_retry._on_error is retry_._on_error
-
-    def test_with_predicate(self):
-        retry_ = retry_async.AsyncRetry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_predicate(mock.sentinel.predicate)
-        assert retry_ is not new_retry
-        assert new_retry._predicate == mock.sentinel.predicate
-
-        # the rest of the attributes should remain the same
-        assert new_retry._deadline == retry_._deadline
-        assert new_retry._initial == retry_._initial
-        assert new_retry._maximum == retry_._maximum
-        assert new_retry._multiplier == retry_._multiplier
-        assert new_retry._on_error is retry_._on_error
-
-    def test_with_delay_noop(self):
-        retry_ = retry_async.AsyncRetry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_delay()
-        assert retry_ is not new_retry
-        assert new_retry._initial == retry_._initial
-        assert new_retry._maximum == retry_._maximum
-        assert new_retry._multiplier == retry_._multiplier
-
-    def test_with_delay(self):
-        retry_ = retry_async.AsyncRetry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_delay(initial=1, maximum=2, multiplier=3)
-        assert retry_ is not new_retry
-        assert new_retry._initial == 1
-        assert new_retry._maximum == 2
-        assert new_retry._multiplier == 3
-
-        # the rest of the attributes should remain the same
-        assert new_retry._deadline == retry_._deadline
-        assert new_retry._predicate is retry_._predicate
-        assert new_retry._on_error is retry_._on_error
+class TestAsyncRetry(Test_BaseRetry):
+    def _make_one(self, *args, **kwargs):
+        return retry_async.AsyncRetry(*args, **kwargs)
 
     def test___str__(self):
         def if_exception_type(exc):
@@ -247,7 +152,7 @@
             initial=1.0,
             maximum=60.0,
             multiplier=2.0,
-            deadline=120.0,
+            timeout=120.0,
             on_error=None,
         )
         assert re.match(
@@ -303,20 +208,17 @@
     @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
     @mock.patch("asyncio.sleep", autospec=True)
     @pytest.mark.asyncio
-    async def test___call___and_execute_retry_hitting_deadline(self, sleep, uniform):
+    async def test___call___and_execute_retry_hitting_timeout(self, sleep, uniform):
         on_error = mock.Mock(spec=["__call__"], side_effect=[None] * 10)
         retry_ = retry_async.AsyncRetry(
             predicate=retry_async.if_exception_type(ValueError),
             initial=1.0,
             maximum=1024.0,
             multiplier=2.0,
-            deadline=9.9,
+            timeout=30.9,
         )
 
-        utcnow = datetime.datetime.now(tz=datetime.timezone.utc)
-        utcnow_patcher = mock.patch(
-            "google.api_core.datetime_helpers.utcnow", return_value=utcnow
-        )
+        monotonic_patcher = mock.patch("time.monotonic", return_value=0)
 
         target = mock.AsyncMock(spec=["__call__"], side_effect=[ValueError()] * 10)
         # __name__ is needed by functools.partial.
@@ -325,11 +227,11 @@
         decorated = retry_(target, on_error=on_error)
         target.assert_not_called()
 
-        with utcnow_patcher as patched_utcnow:
+        with monotonic_patcher as patched_monotonic:
             # Make sure that calls to fake asyncio.sleep() also advance the mocked
             # time clock.
             def increase_time(sleep_delay):
-                patched_utcnow.return_value += datetime.timedelta(seconds=sleep_delay)
+                patched_monotonic.return_value += sleep_delay
 
             sleep.side_effect = increase_time
 
@@ -345,8 +247,17 @@
         last_wait = sleep.call_args.args[0]
         total_wait = sum(call_args.args[0] for call_args in sleep.call_args_list)
 
-        assert last_wait == 2.9  # and not 8.0, because the last delay was shortened
-        assert total_wait == 9.9  # the same as the deadline
+        assert last_wait == 8.0
+        # Next attempt would be scheduled in 16 secs, 15 + 16 = 31 > 30.9, thus
+        # we do not even wait for it to be scheduled (30.9 is configured timeout).
+        # This changes the previous logic of shortening the last attempt to fit
+        # in the timeout. The previous logic was removed to make Python retry
+        # logic consistent with the other languages and to not disrupt the
+        # randomized retry delays distribution by artificially increasing a
+        # probability of scheduling two (instead of one) last attempts with very
+        # short delay between them, while the second retry having very low chance
+        # of succeeding anyways.
+        assert total_wait == 15.0
 
     @mock.patch("asyncio.sleep", autospec=True)
     @pytest.mark.asyncio
diff --git a/tests/unit/retry/__init__.py b/tests/unit/retry/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/unit/retry/__init__.py
diff --git a/tests/unit/retry/test_retry_base.py b/tests/unit/retry/test_retry_base.py
new file mode 100644
index 0000000..fa55d93
--- /dev/null
+++ b/tests/unit/retry/test_retry_base.py
@@ -0,0 +1,272 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import re
+
+import mock
+import pytest
+import requests.exceptions
+
+from google.api_core import exceptions
+from google.api_core import retry
+from google.auth import exceptions as auth_exceptions
+
+
+def test_if_exception_type():
+    predicate = retry.if_exception_type(ValueError)
+
+    assert predicate(ValueError())
+    assert not predicate(TypeError())
+
+
+def test_if_exception_type_multiple():
+    predicate = retry.if_exception_type(ValueError, TypeError)
+
+    assert predicate(ValueError())
+    assert predicate(TypeError())
+    assert not predicate(RuntimeError())
+
+
+def test_if_transient_error():
+    assert retry.if_transient_error(exceptions.InternalServerError(""))
+    assert retry.if_transient_error(exceptions.TooManyRequests(""))
+    assert retry.if_transient_error(exceptions.ServiceUnavailable(""))
+    assert retry.if_transient_error(requests.exceptions.ConnectionError(""))
+    assert retry.if_transient_error(requests.exceptions.ChunkedEncodingError(""))
+    assert retry.if_transient_error(auth_exceptions.TransportError(""))
+    assert not retry.if_transient_error(exceptions.InvalidArgument(""))
+
+
+# Make uniform return half of its maximum, which will be the calculated
+# sleep time.
+@mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
+def test_exponential_sleep_generator_base_2(uniform):
+    gen = retry.exponential_sleep_generator(1, 60, multiplier=2)
+
+    result = list(itertools.islice(gen, 8))
+    assert result == [1, 2, 4, 8, 16, 32, 60, 60]
+
+
+def test_build_retry_error_empty_list():
+    """
+    attempt to build a retry error with no errors encountered
+    should return a generic RetryError
+    """
+    from google.api_core.retry import build_retry_error
+    from google.api_core.retry import RetryFailureReason
+
+    reason = RetryFailureReason.NON_RETRYABLE_ERROR
+    src, cause = build_retry_error([], reason, 10)
+    assert isinstance(src, exceptions.RetryError)
+    assert cause is None
+    assert src.message == "Unknown error"
+
+
+def test_build_retry_error_timeout_message():
+    """
+    should provide helpful error message when timeout is reached
+    """
+    from google.api_core.retry import build_retry_error
+    from google.api_core.retry import RetryFailureReason
+
+    reason = RetryFailureReason.TIMEOUT
+    cause = RuntimeError("timeout")
+    src, found_cause = build_retry_error([ValueError(), cause], reason, 10)
+    assert isinstance(src, exceptions.RetryError)
+    assert src.message == "Timeout of 10.0s exceeded"
+    # should attach appropriate cause
+    assert found_cause is cause
+
+
+def test_build_retry_error_empty_timeout():
+    """
+    attempt to build a retry error when timeout is None
+    should return a generic timeout error message
+    """
+    from google.api_core.retry import build_retry_error
+    from google.api_core.retry import RetryFailureReason
+
+    reason = RetryFailureReason.TIMEOUT
+    src, _ = build_retry_error([], reason, None)
+    assert isinstance(src, exceptions.RetryError)
+    assert src.message == "Timeout exceeded"
+
+
+class Test_BaseRetry(object):
+    def _make_one(self, *args, **kwargs):
+        return retry.retry_base._BaseRetry(*args, **kwargs)
+
+    def test_constructor_defaults(self):
+        retry_ = self._make_one()
+        assert retry_._predicate == retry.if_transient_error
+        assert retry_._initial == 1
+        assert retry_._maximum == 60
+        assert retry_._multiplier == 2
+        assert retry_._timeout == 120
+        assert retry_._on_error is None
+        assert retry_.timeout == 120
+        assert retry_.timeout == 120
+
+    def test_constructor_options(self):
+        _some_function = mock.Mock()
+
+        retry_ = self._make_one(
+            predicate=mock.sentinel.predicate,
+            initial=1,
+            maximum=2,
+            multiplier=3,
+            timeout=4,
+            on_error=_some_function,
+        )
+        assert retry_._predicate == mock.sentinel.predicate
+        assert retry_._initial == 1
+        assert retry_._maximum == 2
+        assert retry_._multiplier == 3
+        assert retry_._timeout == 4
+        assert retry_._on_error is _some_function
+
+    @pytest.mark.parametrize("use_deadline", [True, False])
+    def test_with_timeout(self, use_deadline):
+        retry_ = self._make_one(
+            predicate=mock.sentinel.predicate,
+            initial=1,
+            maximum=2,
+            multiplier=3,
+            timeout=4,
+            on_error=mock.sentinel.on_error,
+        )
+        new_retry = (
+            retry_.with_timeout(42) if not use_deadline else retry_.with_deadline(42)
+        )
+        assert retry_ is not new_retry
+        assert new_retry._timeout == 42
+        assert new_retry.timeout == 42 if not use_deadline else new_retry.deadline == 42
+
+        # the rest of the attributes should remain the same
+        assert new_retry._predicate is retry_._predicate
+        assert new_retry._initial == retry_._initial
+        assert new_retry._maximum == retry_._maximum
+        assert new_retry._multiplier == retry_._multiplier
+        assert new_retry._on_error is retry_._on_error
+
+    def test_with_predicate(self):
+        retry_ = self._make_one(
+            predicate=mock.sentinel.predicate,
+            initial=1,
+            maximum=2,
+            multiplier=3,
+            timeout=4,
+            on_error=mock.sentinel.on_error,
+        )
+        new_retry = retry_.with_predicate(mock.sentinel.predicate)
+        assert retry_ is not new_retry
+        assert new_retry._predicate == mock.sentinel.predicate
+
+        # the rest of the attributes should remain the same
+        assert new_retry._timeout == retry_._timeout
+        assert new_retry._initial == retry_._initial
+        assert new_retry._maximum == retry_._maximum
+        assert new_retry._multiplier == retry_._multiplier
+        assert new_retry._on_error is retry_._on_error
+
+    def test_with_delay_noop(self):
+        retry_ = self._make_one(
+            predicate=mock.sentinel.predicate,
+            initial=1,
+            maximum=2,
+            multiplier=3,
+            timeout=4,
+            on_error=mock.sentinel.on_error,
+        )
+        new_retry = retry_.with_delay()
+        assert retry_ is not new_retry
+        assert new_retry._initial == retry_._initial
+        assert new_retry._maximum == retry_._maximum
+        assert new_retry._multiplier == retry_._multiplier
+
+    def test_with_delay(self):
+        retry_ = self._make_one(
+            predicate=mock.sentinel.predicate,
+            initial=1,
+            maximum=2,
+            multiplier=3,
+            timeout=4,
+            on_error=mock.sentinel.on_error,
+        )
+        new_retry = retry_.with_delay(initial=5, maximum=6, multiplier=7)
+        assert retry_ is not new_retry
+        assert new_retry._initial == 5
+        assert new_retry._maximum == 6
+        assert new_retry._multiplier == 7
+
+        # the rest of the attributes should remain the same
+        assert new_retry._timeout == retry_._timeout
+        assert new_retry._predicate is retry_._predicate
+        assert new_retry._on_error is retry_._on_error
+
+    def test_with_delay_partial_options(self):
+        retry_ = self._make_one(
+            predicate=mock.sentinel.predicate,
+            initial=1,
+            maximum=2,
+            multiplier=3,
+            timeout=4,
+            on_error=mock.sentinel.on_error,
+        )
+        new_retry = retry_.with_delay(initial=4)
+        assert retry_ is not new_retry
+        assert new_retry._initial == 4
+        assert new_retry._maximum == 2
+        assert new_retry._multiplier == 3
+
+        new_retry = retry_.with_delay(maximum=4)
+        assert retry_ is not new_retry
+        assert new_retry._initial == 1
+        assert new_retry._maximum == 4
+        assert new_retry._multiplier == 3
+
+        new_retry = retry_.with_delay(multiplier=4)
+        assert retry_ is not new_retry
+        assert new_retry._initial == 1
+        assert new_retry._maximum == 2
+        assert new_retry._multiplier == 4
+
+        # the rest of the attributes should remain the same
+        assert new_retry._timeout == retry_._timeout
+        assert new_retry._predicate is retry_._predicate
+        assert new_retry._on_error is retry_._on_error
+
+    def test___str__(self):
+        def if_exception_type(exc):
+            return bool(exc)  # pragma: NO COVER
+
+        # Explicitly set all attributes as changed Retry defaults should not
+        # cause this test to start failing.
+        retry_ = self._make_one(
+            predicate=if_exception_type,
+            initial=1.0,
+            maximum=60.0,
+            multiplier=2.0,
+            timeout=120.0,
+            on_error=None,
+        )
+        assert re.match(
+            (
+                r"<_BaseRetry predicate=<function.*?if_exception_type.*?>, "
+                r"initial=1.0, maximum=60.0, multiplier=2.0, timeout=120.0, "
+                r"on_error=None>"
+            ),
+            str(retry_),
+        )
diff --git a/tests/unit/retry/test_retry_streaming.py b/tests/unit/retry/test_retry_streaming.py
new file mode 100644
index 0000000..01f3532
--- /dev/null
+++ b/tests/unit/retry/test_retry_streaming.py
@@ -0,0 +1,471 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+
+import mock
+import pytest
+
+from google.api_core import exceptions
+from google.api_core import retry
+from google.api_core.retry import retry_streaming
+
+from .test_retry_base import Test_BaseRetry
+
+
+def test_retry_streaming_target_bad_sleep_generator():
+    with pytest.raises(
+        ValueError, match="Sleep generator stopped yielding sleep values"
+    ):
+        next(retry_streaming.retry_target_stream(None, None, [], None))
+
+
+class TestStreamingRetry(Test_BaseRetry):
+    def _make_one(self, *args, **kwargs):
+        return retry_streaming.StreamingRetry(*args, **kwargs)
+
+    def test___str__(self):
+        def if_exception_type(exc):
+            return bool(exc)  # pragma: NO COVER
+
+        # Explicitly set all attributes as changed Retry defaults should not
+        # cause this test to start failing.
+        retry_ = retry_streaming.StreamingRetry(
+            predicate=if_exception_type,
+            initial=1.0,
+            maximum=60.0,
+            multiplier=2.0,
+            timeout=120.0,
+            on_error=None,
+        )
+        assert re.match(
+            (
+                r"<StreamingRetry predicate=<function.*?if_exception_type.*?>, "
+                r"initial=1.0, maximum=60.0, multiplier=2.0, timeout=120.0, "
+                r"on_error=None>"
+            ),
+            str(retry_),
+        )
+
+    def _generator_mock(
+        self,
+        num=5,
+        error_on=None,
+        return_val=None,
+        exceptions_seen=None,
+    ):
+        """
+        Helper to create a mock generator that yields a number of values
+        Generator can optionally raise an exception on a specific iteration
+
+        Args:
+          - num (int): the number of values to yield. After this, the generator will return `return_val`
+          - error_on (int): if given, the generator will raise a ValueError on the specified iteration
+          - return_val (any): if given, the generator will return this value after yielding num values
+          - exceptions_seen (list): if given, the generator will append any exceptions to this list before raising
+        """
+        try:
+            for i in range(num):
+                if error_on and i == error_on:
+                    raise ValueError("generator mock error")
+                yield i
+            return return_val
+        except (Exception, BaseException, GeneratorExit) as e:
+            # keep track of exceptions seen by generator
+            if exceptions_seen is not None:
+                exceptions_seen.append(e)
+            raise
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___success(self, sleep):
+        """
+        Test that a retry-decorated generator yields values as expected
+        This test checks a generator with no issues
+        """
+        import types
+        import collections
+
+        retry_ = retry_streaming.StreamingRetry()
+
+        decorated = retry_(self._generator_mock)
+
+        num = 10
+        result = decorated(num)
+        # check types
+        assert isinstance(decorated(num), collections.abc.Iterable)
+        assert isinstance(decorated(num), types.GeneratorType)
+        assert isinstance(self._generator_mock(num), collections.abc.Iterable)
+        assert isinstance(self._generator_mock(num), types.GeneratorType)
+        # check yield contents
+        unpacked = [i for i in result]
+        assert len(unpacked) == num
+        for a, b in zip(unpacked, self._generator_mock(num)):
+            assert a == b
+        sleep.assert_not_called()
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___retry(self, sleep):
+        """
+        Tests that a retry-decorated generator will retry on errors
+        """
+        on_error = mock.Mock(return_value=None)
+        retry_ = retry_streaming.StreamingRetry(
+            on_error=on_error,
+            predicate=retry.if_exception_type(ValueError),
+            timeout=None,
+        )
+        result = retry_(self._generator_mock)(error_on=3)
+        # error thrown on 3
+        # generator should contain 0, 1, 2 looping
+        unpacked = [next(result) for i in range(10)]
+        assert unpacked == [0, 1, 2, 0, 1, 2, 0, 1, 2, 0]
+        assert on_error.call_count == 3
+
+    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
+    @mock.patch("time.sleep", autospec=True)
+    @pytest.mark.parametrize("use_deadline_arg", [True, False])
+    def test___call___retry_hitting_timeout(self, sleep, uniform, use_deadline_arg):
+        """
+        Tests that a retry-decorated generator will throw a RetryError
+        after using the time budget
+        """
+        import time
+
+        timeout_val = 30.9
+        # support "deadline" as an alias for "timeout"
+        timeout_kwarg = (
+            {"timeout": timeout_val}
+            if not use_deadline_arg
+            else {"deadline": timeout_val}
+        )
+
+        on_error = mock.Mock(return_value=None)
+        retry_ = retry_streaming.StreamingRetry(
+            predicate=retry.if_exception_type(ValueError),
+            initial=1.0,
+            maximum=1024.0,
+            multiplier=2.0,
+            **timeout_kwarg,
+        )
+
+        timenow = time.monotonic()
+        now_patcher = mock.patch(
+            "time.monotonic",
+            return_value=timenow,
+        )
+
+        decorated = retry_(self._generator_mock, on_error=on_error)
+        generator = decorated(error_on=1)
+        with now_patcher as patched_now:
+            # Make sure that calls to fake time.sleep() also advance the mocked
+            # time clock.
+            def increase_time(sleep_delay):
+                patched_now.return_value += sleep_delay
+
+            sleep.side_effect = increase_time
+            with pytest.raises(exceptions.RetryError):
+                [i for i in generator]
+
+        assert on_error.call_count == 5
+        # check the delays
+        assert sleep.call_count == 4  # once between each successive target calls
+        last_wait = sleep.call_args.args[0]
+        total_wait = sum(call_args.args[0] for call_args in sleep.call_args_list)
+        assert last_wait == 8.0
+        assert total_wait == 15.0
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_generator_send(self, sleep):
+        """
+        Send should be passed through retry into target generator
+        """
+
+        def _mock_send_gen():
+            """
+            always yield whatever was sent in
+            """
+            in_ = yield
+            while True:
+                in_ = yield in_
+
+        retry_ = retry_streaming.StreamingRetry()
+
+        decorated = retry_(_mock_send_gen)
+
+        generator = decorated()
+        result = next(generator)
+        # first yield should be None
+        assert result is None
+        in_messages = ["test_1", "hello", "world"]
+        out_messages = []
+        for msg in in_messages:
+            recv = generator.send(msg)
+            out_messages.append(recv)
+        assert in_messages == out_messages
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_generator_send_retry(self, sleep):
+        """
+        Send should support retries like next
+        """
+        on_error = mock.Mock(return_value=None)
+        retry_ = retry_streaming.StreamingRetry(
+            on_error=on_error,
+            predicate=retry.if_exception_type(ValueError),
+            timeout=None,
+        )
+        result = retry_(self._generator_mock)(error_on=3)
+        with pytest.raises(TypeError) as exc_info:
+            # calling first send with non-None input should raise a TypeError
+            result.send("can not send to fresh generator")
+            assert exc_info.match("can't send non-None value")
+        # initiate iteration with None
+        result = retry_(self._generator_mock)(error_on=3)
+        assert result.send(None) == 0
+        # error thrown on 3
+        # generator should contain 0, 1, 2 looping
+        unpacked = [result.send(i) for i in range(10)]
+        assert unpacked == [1, 2, 0, 1, 2, 0, 1, 2, 0, 1]
+        assert on_error.call_count == 3
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_iterable_send(self, sleep):
+        """
+        send should raise attribute error if wrapped iterator does not support it
+        """
+        retry_ = retry_streaming.StreamingRetry()
+
+        def iterable_fn(n):
+            return iter(range(n))
+
+        decorated = retry_(iterable_fn)
+        generator = decorated(5)
+        # initialize
+        next(generator)
+        # call send
+        with pytest.raises(AttributeError):
+            generator.send("test")
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_iterable_close(self, sleep):
+        """
+        close should be handled by wrapper if wrapped iterable does not support it
+        """
+        retry_ = retry_streaming.StreamingRetry()
+
+        def iterable_fn(n):
+            return iter(range(n))
+
+        decorated = retry_(iterable_fn)
+
+        # try closing active generator
+        retryable = decorated(10)
+        assert next(retryable) == 0
+        retryable.close()
+        with pytest.raises(StopIteration):
+            next(retryable)
+
+        # try closing a new generator
+        retryable = decorated(10)
+        retryable.close()
+        with pytest.raises(StopIteration):
+            next(retryable)
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_iterable_throw(self, sleep):
+        """
+        Throw should work even if the wrapped iterable does not support it
+        """
+        predicate = retry.if_exception_type(ValueError)
+        retry_ = retry_streaming.StreamingRetry(predicate=predicate)
+
+        def iterable_fn(n):
+            return iter(range(n))
+
+        decorated = retry_(iterable_fn)
+
+        # try throwing with active generator
+        retryable = decorated(10)
+        assert next(retryable) == 0
+        # should swallow errors in predicate
+        retryable.throw(ValueError)
+        assert next(retryable) == 1
+        # should raise on other errors
+        with pytest.raises(TypeError):
+            retryable.throw(TypeError)
+        with pytest.raises(StopIteration):
+            next(retryable)
+
+        # try throwing with a new generator
+        retryable = decorated(10)
+        with pytest.raises(ValueError):
+            retryable.throw(ValueError)
+        with pytest.raises(StopIteration):
+            next(retryable)
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_generator_return(self, sleep):
+        """
+        Generator return value should be passed through retry decorator
+        """
+        retry_ = retry_streaming.StreamingRetry()
+
+        decorated = retry_(self._generator_mock)
+
+        expected_value = "done"
+        generator = decorated(5, return_val=expected_value)
+        found_value = None
+        try:
+            while True:
+                next(generator)
+        except StopIteration as e:
+            found_value = e.value
+        assert found_value == expected_value
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_generator_close(self, sleep):
+        """
+        Close should be passed through retry into target generator
+        """
+        retry_ = retry_streaming.StreamingRetry()
+
+        decorated = retry_(self._generator_mock)
+
+        exception_list = []
+        generator = decorated(10, exceptions_seen=exception_list)
+        for i in range(2):
+            next(generator)
+        generator.close()
+        assert isinstance(exception_list[0], GeneratorExit)
+        with pytest.raises(StopIteration):
+            # calling next on closed generator should raise error
+            next(generator)
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___with_generator_throw(self, sleep):
+        """
+        Throw should be passed through retry into target generator
+        """
+        retry_ = retry_streaming.StreamingRetry(
+            predicate=retry.if_exception_type(ValueError),
+        )
+        decorated = retry_(self._generator_mock)
+
+        exception_list = []
+        generator = decorated(10, exceptions_seen=exception_list)
+        for i in range(2):
+            next(generator)
+        with pytest.raises(BufferError):
+            generator.throw(BufferError("test"))
+        assert isinstance(exception_list[0], BufferError)
+        with pytest.raises(StopIteration):
+            # calling next on closed generator should raise error
+            next(generator)
+        # should retry if throw retryable exception
+        exception_list = []
+        generator = decorated(10, exceptions_seen=exception_list)
+        for i in range(2):
+            next(generator)
+        val = generator.throw(ValueError("test"))
+        assert val == 0
+        assert isinstance(exception_list[0], ValueError)
+        # calling next on closed generator should not raise error
+        assert next(generator) == 1
+
+    def test_exc_factory_non_retryable_error(self):
+        """
+        generator should give the option to override exception creation logic
+        test when non-retryable error is thrown
+        """
+        from google.api_core.retry import RetryFailureReason
+        from google.api_core.retry.retry_streaming import retry_target_stream
+
+        timeout = None
+        sent_errors = [ValueError("test"), ValueError("test2"), BufferError("test3")]
+        expected_final_err = RuntimeError("done")
+        expected_source_err = ZeroDivisionError("test4")
+
+        def factory(*args, **kwargs):
+            assert len(kwargs) == 0
+            assert args[0] == sent_errors
+            assert args[1] == RetryFailureReason.NON_RETRYABLE_ERROR
+            assert args[2] == timeout
+            return expected_final_err, expected_source_err
+
+        generator = retry_target_stream(
+            self._generator_mock,
+            retry.if_exception_type(ValueError),
+            [0] * 3,
+            timeout=timeout,
+            exception_factory=factory,
+        )
+        # initialize generator
+        next(generator)
+        # trigger some retryable errors
+        generator.throw(sent_errors[0])
+        generator.throw(sent_errors[1])
+        # trigger a non-retryable error
+        with pytest.raises(expected_final_err.__class__) as exc_info:
+            generator.throw(sent_errors[2])
+        assert exc_info.value == expected_final_err
+        assert exc_info.value.__cause__ == expected_source_err
+
+    def test_exc_factory_timeout(self):
+        """
+        generator should give the option to override exception creation logic
+        test when timeout is exceeded
+        """
+        import time
+        from google.api_core.retry import RetryFailureReason
+        from google.api_core.retry.retry_streaming import retry_target_stream
+
+        timeout = 2
+        time_now = time.monotonic()
+        now_patcher = mock.patch(
+            "time.monotonic",
+            return_value=time_now,
+        )
+
+        with now_patcher as patched_now:
+            timeout = 2
+            sent_errors = [ValueError("test"), ValueError("test2"), ValueError("test3")]
+            expected_final_err = RuntimeError("done")
+            expected_source_err = ZeroDivisionError("test4")
+
+            def factory(*args, **kwargs):
+                assert len(kwargs) == 0
+                assert args[0] == sent_errors
+                assert args[1] == RetryFailureReason.TIMEOUT
+                assert args[2] == timeout
+                return expected_final_err, expected_source_err
+
+            generator = retry_target_stream(
+                self._generator_mock,
+                retry.if_exception_type(ValueError),
+                [0] * 3,
+                timeout=timeout,
+                exception_factory=factory,
+                check_timeout_on_yield=True,
+            )
+            # initialize generator
+            next(generator)
+            # trigger some retryable errors
+            generator.throw(sent_errors[0])
+            generator.throw(sent_errors[1])
+            # trigger a timeout
+            patched_now.return_value += timeout + 1
+            with pytest.raises(expected_final_err.__class__) as exc_info:
+                generator.throw(sent_errors[2])
+            assert exc_info.value == expected_final_err
+            assert exc_info.value.__cause__ == expected_source_err
diff --git a/tests/unit/retry/test_retry_unary.py b/tests/unit/retry/test_retry_unary.py
new file mode 100644
index 0000000..7dcd8dd
--- /dev/null
+++ b/tests/unit/retry/test_retry_unary.py
@@ -0,0 +1,314 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import re
+
+import mock
+import pytest
+
+from google.api_core import exceptions
+from google.api_core import retry
+
+from .test_retry_base import Test_BaseRetry
+
+
+@mock.patch("time.sleep", autospec=True)
+@mock.patch(
+    "google.api_core.datetime_helpers.utcnow",
+    return_value=datetime.datetime.min,
+    autospec=True,
+)
+def test_retry_target_success(utcnow, sleep):
+    predicate = retry.if_exception_type(ValueError)
+    call_count = [0]
+
+    def target():
+        call_count[0] += 1
+        if call_count[0] < 3:
+            raise ValueError()
+        return 42
+
+    result = retry.retry_target(target, predicate, range(10), None)
+
+    assert result == 42
+    assert call_count[0] == 3
+    sleep.assert_has_calls([mock.call(0), mock.call(1)])
+
+
+@mock.patch("time.sleep", autospec=True)
+@mock.patch(
+    "google.api_core.datetime_helpers.utcnow",
+    return_value=datetime.datetime.min,
+    autospec=True,
+)
+def test_retry_target_w_on_error(utcnow, sleep):
+    predicate = retry.if_exception_type(ValueError)
+    call_count = {"target": 0}
+    to_raise = ValueError()
+
+    def target():
+        call_count["target"] += 1
+        if call_count["target"] < 3:
+            raise to_raise
+        return 42
+
+    on_error = mock.Mock()
+
+    result = retry.retry_target(target, predicate, range(10), None, on_error=on_error)
+
+    assert result == 42
+    assert call_count["target"] == 3
+
+    on_error.assert_has_calls([mock.call(to_raise), mock.call(to_raise)])
+    sleep.assert_has_calls([mock.call(0), mock.call(1)])
+
+
+@mock.patch("time.sleep", autospec=True)
+@mock.patch(
+    "google.api_core.datetime_helpers.utcnow",
+    return_value=datetime.datetime.min,
+    autospec=True,
+)
+def test_retry_target_non_retryable_error(utcnow, sleep):
+    predicate = retry.if_exception_type(ValueError)
+    exception = TypeError()
+    target = mock.Mock(side_effect=exception)
+
+    with pytest.raises(TypeError) as exc_info:
+        retry.retry_target(target, predicate, range(10), None)
+
+    assert exc_info.value == exception
+    sleep.assert_not_called()
+
+
+@mock.patch("asyncio.sleep", autospec=True)
+@mock.patch(
+    "google.api_core.datetime_helpers.utcnow",
+    return_value=datetime.datetime.min,
+    autospec=True,
+)
+@pytest.mark.asyncio
+async def test_retry_target_warning_for_retry(utcnow, sleep):
+    predicate = retry.if_exception_type(ValueError)
+    target = mock.AsyncMock(spec=["__call__"])
+
+    with pytest.warns(Warning) as exc_info:
+        # Note: predicate is just a filler and doesn't affect the test
+        retry.retry_target(target, predicate, range(10), None)
+
+    assert len(exc_info) == 2
+    assert str(exc_info[0].message) == retry.retry_unary._ASYNC_RETRY_WARNING
+    sleep.assert_not_called()
+
+
+@mock.patch("time.sleep", autospec=True)
+@mock.patch("time.monotonic", autospec=True)
+@pytest.mark.parametrize("use_deadline_arg", [True, False])
+def test_retry_target_timeout_exceeded(monotonic, sleep, use_deadline_arg):
+    predicate = retry.if_exception_type(ValueError)
+    exception = ValueError("meep")
+    target = mock.Mock(side_effect=exception)
+    # Setup the timeline so that the first call takes 5 seconds but the second
+    # call takes 6, which puts the retry over the timeout.
+    monotonic.side_effect = [0, 5, 11]
+
+    # support "deadline" as an alias for "timeout"
+    kwargs = {"timeout": 10} if not use_deadline_arg else {"deadline": 10}
+
+    with pytest.raises(exceptions.RetryError) as exc_info:
+        retry.retry_target(target, predicate, range(10), **kwargs)
+
+    assert exc_info.value.cause == exception
+    assert exc_info.match("Timeout of 10.0s exceeded")
+    assert exc_info.match("last exception: meep")
+    assert target.call_count == 2
+
+    # Ensure the exception message does not include the target fn:
+    # it may be a partial with user data embedded
+    assert str(target) not in exc_info.exconly()
+
+
+def test_retry_target_bad_sleep_generator():
+    with pytest.raises(ValueError, match="Sleep generator"):
+        retry.retry_target(mock.sentinel.target, mock.sentinel.predicate, [], None)
+
+
+class TestRetry(Test_BaseRetry):
+    def _make_one(self, *args, **kwargs):
+        return retry.Retry(*args, **kwargs)
+
+    def test___str__(self):
+        def if_exception_type(exc):
+            return bool(exc)  # pragma: NO COVER
+
+        # Explicitly set all attributes as changed Retry defaults should not
+        # cause this test to start failing.
+        retry_ = retry.Retry(
+            predicate=if_exception_type,
+            initial=1.0,
+            maximum=60.0,
+            multiplier=2.0,
+            timeout=120.0,
+            on_error=None,
+        )
+        assert re.match(
+            (
+                r"<Retry predicate=<function.*?if_exception_type.*?>, "
+                r"initial=1.0, maximum=60.0, multiplier=2.0, timeout=120.0, "
+                r"on_error=None>"
+            ),
+            str(retry_),
+        )
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___and_execute_success(self, sleep):
+        retry_ = retry.Retry()
+        target = mock.Mock(spec=["__call__"], return_value=42)
+        # __name__ is needed by functools.partial.
+        target.__name__ = "target"
+
+        decorated = retry_(target)
+        target.assert_not_called()
+
+        result = decorated("meep")
+
+        assert result == 42
+        target.assert_called_once_with("meep")
+        sleep.assert_not_called()
+
+    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___and_execute_retry(self, sleep, uniform):
+        on_error = mock.Mock(spec=["__call__"], side_effect=[None])
+        retry_ = retry.Retry(predicate=retry.if_exception_type(ValueError))
+
+        target = mock.Mock(spec=["__call__"], side_effect=[ValueError(), 42])
+        # __name__ is needed by functools.partial.
+        target.__name__ = "target"
+
+        decorated = retry_(target, on_error=on_error)
+        target.assert_not_called()
+
+        result = decorated("meep")
+
+        assert result == 42
+        assert target.call_count == 2
+        target.assert_has_calls([mock.call("meep"), mock.call("meep")])
+        sleep.assert_called_once_with(retry_._initial)
+        assert on_error.call_count == 1
+
+    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
+    @mock.patch("time.sleep", autospec=True)
+    def test___call___and_execute_retry_hitting_timeout(self, sleep, uniform):
+        on_error = mock.Mock(spec=["__call__"], side_effect=[None] * 10)
+        retry_ = retry.Retry(
+            predicate=retry.if_exception_type(ValueError),
+            initial=1.0,
+            maximum=1024.0,
+            multiplier=2.0,
+            timeout=30.9,
+        )
+
+        monotonic_patcher = mock.patch("time.monotonic", return_value=0)
+
+        target = mock.Mock(spec=["__call__"], side_effect=[ValueError()] * 10)
+        # __name__ is needed by functools.partial.
+        target.__name__ = "target"
+
+        decorated = retry_(target, on_error=on_error)
+        target.assert_not_called()
+
+        with monotonic_patcher as patched_monotonic:
+            # Make sure that calls to fake time.sleep() also advance the mocked
+            # time clock.
+            def increase_time(sleep_delay):
+                patched_monotonic.return_value += sleep_delay
+
+            sleep.side_effect = increase_time
+
+            with pytest.raises(exceptions.RetryError):
+                decorated("meep")
+
+        assert target.call_count == 5
+        target.assert_has_calls([mock.call("meep")] * 5)
+        assert on_error.call_count == 5
+
+        # check the delays
+        assert sleep.call_count == 4  # once between each successive target calls
+        last_wait = sleep.call_args.args[0]
+        total_wait = sum(call_args.args[0] for call_args in sleep.call_args_list)
+
+        assert last_wait == 8.0
+        # Next attempt would be scheduled in 16 secs, 15 + 16 = 31 > 30.9, thus
+        # we do not even wait for it to be scheduled (30.9 is configured timeout).
+        # This changes the previous logic of shortening the last attempt to fit
+        # in the timeout. The previous logic was removed to make Python retry
+        # logic consistent with the other languages and to not disrupt the
+        # randomized retry delays distribution by artificially increasing a
+        # probability of scheduling two (instead of one) last attempts with very
+        # short delay between them, while the second retry having very low chance
+        # of succeeding anyways.
+        assert total_wait == 15.0
+
+    @mock.patch("time.sleep", autospec=True)
+    def test___init___without_retry_executed(self, sleep):
+        _some_function = mock.Mock()
+
+        retry_ = retry.Retry(
+            predicate=retry.if_exception_type(ValueError), on_error=_some_function
+        )
+        # check the proper creation of the class
+        assert retry_._on_error is _some_function
+
+        target = mock.Mock(spec=["__call__"], side_effect=[42])
+        # __name__ is needed by functools.partial.
+        target.__name__ = "target"
+
+        wrapped = retry_(target)
+
+        result = wrapped("meep")
+
+        assert result == 42
+        target.assert_called_once_with("meep")
+        sleep.assert_not_called()
+        _some_function.assert_not_called()
+
+    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
+    @mock.patch("time.sleep", autospec=True)
+    def test___init___when_retry_is_executed(self, sleep, uniform):
+        _some_function = mock.Mock()
+
+        retry_ = retry.Retry(
+            predicate=retry.if_exception_type(ValueError), on_error=_some_function
+        )
+        # check the proper creation of the class
+        assert retry_._on_error is _some_function
+
+        target = mock.Mock(
+            spec=["__call__"], side_effect=[ValueError(), ValueError(), 42]
+        )
+        # __name__ is needed by functools.partial.
+        target.__name__ = "target"
+
+        wrapped = retry_(target)
+        target.assert_not_called()
+
+        result = wrapped("meep")
+
+        assert result == 42
+        assert target.call_count == 3
+        assert _some_function.call_count == 2
+        target.assert_has_calls([mock.call("meep"), mock.call("meep")])
+        sleep.assert_any_call(retry_._initial)
diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py
deleted file mode 100644
index 2faf77c..0000000
--- a/tests/unit/test_retry.py
+++ /dev/null
@@ -1,487 +0,0 @@
-# Copyright 2017 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import datetime
-import itertools
-import re
-
-import mock
-import pytest
-import requests.exceptions
-
-from google.api_core import exceptions
-from google.api_core import retry
-from google.auth import exceptions as auth_exceptions
-
-
-def test_if_exception_type():
-    predicate = retry.if_exception_type(ValueError)
-
-    assert predicate(ValueError())
-    assert not predicate(TypeError())
-
-
-def test_if_exception_type_multiple():
-    predicate = retry.if_exception_type(ValueError, TypeError)
-
-    assert predicate(ValueError())
-    assert predicate(TypeError())
-    assert not predicate(RuntimeError())
-
-
-def test_if_transient_error():
-    assert retry.if_transient_error(exceptions.InternalServerError(""))
-    assert retry.if_transient_error(exceptions.TooManyRequests(""))
-    assert retry.if_transient_error(exceptions.ServiceUnavailable(""))
-    assert retry.if_transient_error(requests.exceptions.ConnectionError(""))
-    assert retry.if_transient_error(requests.exceptions.ChunkedEncodingError(""))
-    assert retry.if_transient_error(auth_exceptions.TransportError(""))
-    assert not retry.if_transient_error(exceptions.InvalidArgument(""))
-
-
-# Make uniform return half of its maximum, which will be the calculated
-# sleep time.
-@mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
-def test_exponential_sleep_generator_base_2(uniform):
-    gen = retry.exponential_sleep_generator(1, 60, multiplier=2)
-
-    result = list(itertools.islice(gen, 8))
-    assert result == [1, 2, 4, 8, 16, 32, 60, 60]
-
-
-@mock.patch("time.sleep", autospec=True)
-@mock.patch(
-    "google.api_core.datetime_helpers.utcnow",
-    return_value=datetime.datetime.min,
-    autospec=True,
-)
-def test_retry_target_success(utcnow, sleep):
-    predicate = retry.if_exception_type(ValueError)
-    call_count = [0]
-
-    def target():
-        call_count[0] += 1
-        if call_count[0] < 3:
-            raise ValueError()
-        return 42
-
-    result = retry.retry_target(target, predicate, range(10), None)
-
-    assert result == 42
-    assert call_count[0] == 3
-    sleep.assert_has_calls([mock.call(0), mock.call(1)])
-
-
-@mock.patch("time.sleep", autospec=True)
-@mock.patch(
-    "google.api_core.datetime_helpers.utcnow",
-    return_value=datetime.datetime.min,
-    autospec=True,
-)
-def test_retry_target_w_on_error(utcnow, sleep):
-    predicate = retry.if_exception_type(ValueError)
-    call_count = {"target": 0}
-    to_raise = ValueError()
-
-    def target():
-        call_count["target"] += 1
-        if call_count["target"] < 3:
-            raise to_raise
-        return 42
-
-    on_error = mock.Mock()
-
-    result = retry.retry_target(target, predicate, range(10), None, on_error=on_error)
-
-    assert result == 42
-    assert call_count["target"] == 3
-
-    on_error.assert_has_calls([mock.call(to_raise), mock.call(to_raise)])
-    sleep.assert_has_calls([mock.call(0), mock.call(1)])
-
-
-@mock.patch("time.sleep", autospec=True)
-@mock.patch(
-    "google.api_core.datetime_helpers.utcnow",
-    return_value=datetime.datetime.min,
-    autospec=True,
-)
-def test_retry_target_non_retryable_error(utcnow, sleep):
-    predicate = retry.if_exception_type(ValueError)
-    exception = TypeError()
-    target = mock.Mock(side_effect=exception)
-
-    with pytest.raises(TypeError) as exc_info:
-        retry.retry_target(target, predicate, range(10), None)
-
-    assert exc_info.value == exception
-    sleep.assert_not_called()
-
-
-@mock.patch("asyncio.sleep", autospec=True)
-@mock.patch(
-    "google.api_core.datetime_helpers.utcnow",
-    return_value=datetime.datetime.min,
-    autospec=True,
-)
-@pytest.mark.asyncio
-async def test_retry_target_warning_for_retry(utcnow, sleep):
-    predicate = retry.if_exception_type(ValueError)
-    target = mock.AsyncMock(spec=["__call__"])
-
-    with pytest.warns(Warning) as exc_info:
-        # Note: predicate is just a filler and doesn't affect the test
-        retry.retry_target(target, predicate, range(10), None)
-
-    assert len(exc_info) == 2
-    assert str(exc_info[0].message) == retry._ASYNC_RETRY_WARNING
-    sleep.assert_not_called()
-
-
-@mock.patch("time.sleep", autospec=True)
-@mock.patch("google.api_core.datetime_helpers.utcnow", autospec=True)
-def test_retry_target_deadline_exceeded(utcnow, sleep):
-    predicate = retry.if_exception_type(ValueError)
-    exception = ValueError("meep")
-    target = mock.Mock(side_effect=exception)
-    # Setup the timeline so that the first call takes 5 seconds but the second
-    # call takes 6, which puts the retry over the deadline.
-    utcnow.side_effect = [
-        # The first call to utcnow establishes the start of the timeline.
-        datetime.datetime.min,
-        datetime.datetime.min + datetime.timedelta(seconds=5),
-        datetime.datetime.min + datetime.timedelta(seconds=11),
-    ]
-
-    with pytest.raises(exceptions.RetryError) as exc_info:
-        retry.retry_target(target, predicate, range(10), deadline=10)
-
-    assert exc_info.value.cause == exception
-    assert exc_info.match("Deadline of 10.0s exceeded")
-    assert exc_info.match("last exception: meep")
-    assert target.call_count == 2
-
-    # Ensure the exception message does not include the target fn:
-    # it may be a partial with user data embedded
-    assert str(target) not in exc_info.exconly()
-
-
-def test_retry_target_bad_sleep_generator():
-    with pytest.raises(ValueError, match="Sleep generator"):
-        retry.retry_target(mock.sentinel.target, mock.sentinel.predicate, [], None)
-
-
-class TestRetry(object):
-    def test_constructor_defaults(self):
-        retry_ = retry.Retry()
-        assert retry_._predicate == retry.if_transient_error
-        assert retry_._initial == 1
-        assert retry_._maximum == 60
-        assert retry_._multiplier == 2
-        assert retry_._deadline == 120
-        assert retry_._on_error is None
-        assert retry_.deadline == 120
-        assert retry_.timeout == 120
-
-    def test_constructor_options(self):
-        _some_function = mock.Mock()
-
-        retry_ = retry.Retry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=_some_function,
-        )
-        assert retry_._predicate == mock.sentinel.predicate
-        assert retry_._initial == 1
-        assert retry_._maximum == 2
-        assert retry_._multiplier == 3
-        assert retry_._deadline == 4
-        assert retry_._on_error is _some_function
-
-    def test_with_deadline(self):
-        retry_ = retry.Retry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_deadline(42)
-        assert retry_ is not new_retry
-        assert new_retry._deadline == 42
-
-        # the rest of the attributes should remain the same
-        assert new_retry._predicate is retry_._predicate
-        assert new_retry._initial == retry_._initial
-        assert new_retry._maximum == retry_._maximum
-        assert new_retry._multiplier == retry_._multiplier
-        assert new_retry._on_error is retry_._on_error
-
-    def test_with_predicate(self):
-        retry_ = retry.Retry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_predicate(mock.sentinel.predicate)
-        assert retry_ is not new_retry
-        assert new_retry._predicate == mock.sentinel.predicate
-
-        # the rest of the attributes should remain the same
-        assert new_retry._deadline == retry_._deadline
-        assert new_retry._initial == retry_._initial
-        assert new_retry._maximum == retry_._maximum
-        assert new_retry._multiplier == retry_._multiplier
-        assert new_retry._on_error is retry_._on_error
-
-    def test_with_delay_noop(self):
-        retry_ = retry.Retry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_delay()
-        assert retry_ is not new_retry
-        assert new_retry._initial == retry_._initial
-        assert new_retry._maximum == retry_._maximum
-        assert new_retry._multiplier == retry_._multiplier
-
-    def test_with_delay(self):
-        retry_ = retry.Retry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_delay(initial=5, maximum=6, multiplier=7)
-        assert retry_ is not new_retry
-        assert new_retry._initial == 5
-        assert new_retry._maximum == 6
-        assert new_retry._multiplier == 7
-
-        # the rest of the attributes should remain the same
-        assert new_retry._deadline == retry_._deadline
-        assert new_retry._predicate is retry_._predicate
-        assert new_retry._on_error is retry_._on_error
-
-    def test_with_delay_partial_options(self):
-        retry_ = retry.Retry(
-            predicate=mock.sentinel.predicate,
-            initial=1,
-            maximum=2,
-            multiplier=3,
-            deadline=4,
-            on_error=mock.sentinel.on_error,
-        )
-        new_retry = retry_.with_delay(initial=4)
-        assert retry_ is not new_retry
-        assert new_retry._initial == 4
-        assert new_retry._maximum == 2
-        assert new_retry._multiplier == 3
-
-        new_retry = retry_.with_delay(maximum=4)
-        assert retry_ is not new_retry
-        assert new_retry._initial == 1
-        assert new_retry._maximum == 4
-        assert new_retry._multiplier == 3
-
-        new_retry = retry_.with_delay(multiplier=4)
-        assert retry_ is not new_retry
-        assert new_retry._initial == 1
-        assert new_retry._maximum == 2
-        assert new_retry._multiplier == 4
-
-        # the rest of the attributes should remain the same
-        assert new_retry._deadline == retry_._deadline
-        assert new_retry._predicate is retry_._predicate
-        assert new_retry._on_error is retry_._on_error
-
-    def test___str__(self):
-        def if_exception_type(exc):
-            return bool(exc)  # pragma: NO COVER
-
-        # Explicitly set all attributes as changed Retry defaults should not
-        # cause this test to start failing.
-        retry_ = retry.Retry(
-            predicate=if_exception_type,
-            initial=1.0,
-            maximum=60.0,
-            multiplier=2.0,
-            deadline=120.0,
-            on_error=None,
-        )
-        assert re.match(
-            (
-                r"<Retry predicate=<function.*?if_exception_type.*?>, "
-                r"initial=1.0, maximum=60.0, multiplier=2.0, timeout=120.0, "
-                r"on_error=None>"
-            ),
-            str(retry_),
-        )
-
-    @mock.patch("time.sleep", autospec=True)
-    def test___call___and_execute_success(self, sleep):
-        retry_ = retry.Retry()
-        target = mock.Mock(spec=["__call__"], return_value=42)
-        # __name__ is needed by functools.partial.
-        target.__name__ = "target"
-
-        decorated = retry_(target)
-        target.assert_not_called()
-
-        result = decorated("meep")
-
-        assert result == 42
-        target.assert_called_once_with("meep")
-        sleep.assert_not_called()
-
-    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
-    @mock.patch("time.sleep", autospec=True)
-    def test___call___and_execute_retry(self, sleep, uniform):
-        on_error = mock.Mock(spec=["__call__"], side_effect=[None])
-        retry_ = retry.Retry(predicate=retry.if_exception_type(ValueError))
-
-        target = mock.Mock(spec=["__call__"], side_effect=[ValueError(), 42])
-        # __name__ is needed by functools.partial.
-        target.__name__ = "target"
-
-        decorated = retry_(target, on_error=on_error)
-        target.assert_not_called()
-
-        result = decorated("meep")
-
-        assert result == 42
-        assert target.call_count == 2
-        target.assert_has_calls([mock.call("meep"), mock.call("meep")])
-        sleep.assert_called_once_with(retry_._initial)
-        assert on_error.call_count == 1
-
-    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
-    @mock.patch("time.sleep", autospec=True)
-    def test___call___and_execute_retry_hitting_deadline(self, sleep, uniform):
-        on_error = mock.Mock(spec=["__call__"], side_effect=[None] * 10)
-        retry_ = retry.Retry(
-            predicate=retry.if_exception_type(ValueError),
-            initial=1.0,
-            maximum=1024.0,
-            multiplier=2.0,
-            deadline=30.9,
-        )
-
-        utcnow = datetime.datetime.now(tz=datetime.timezone.utc)
-        utcnow_patcher = mock.patch(
-            "google.api_core.datetime_helpers.utcnow", return_value=utcnow
-        )
-
-        target = mock.Mock(spec=["__call__"], side_effect=[ValueError()] * 10)
-        # __name__ is needed by functools.partial.
-        target.__name__ = "target"
-
-        decorated = retry_(target, on_error=on_error)
-        target.assert_not_called()
-
-        with utcnow_patcher as patched_utcnow:
-            # Make sure that calls to fake time.sleep() also advance the mocked
-            # time clock.
-            def increase_time(sleep_delay):
-                patched_utcnow.return_value += datetime.timedelta(seconds=sleep_delay)
-
-            sleep.side_effect = increase_time
-
-            with pytest.raises(exceptions.RetryError):
-                decorated("meep")
-
-        assert target.call_count == 5
-        target.assert_has_calls([mock.call("meep")] * 5)
-        assert on_error.call_count == 5
-
-        # check the delays
-        assert sleep.call_count == 4  # once between each successive target calls
-        last_wait = sleep.call_args.args[0]
-        total_wait = sum(call_args.args[0] for call_args in sleep.call_args_list)
-
-        assert last_wait == 8.0
-        # Next attempt would be scheduled in 16 secs, 15 + 16 = 31 > 30.9, thus
-        # we do not even wait for it to be scheduled (30.9 is configured timeout).
-        # This changes the previous logic of shortening the last attempt to fit
-        # in the deadline. The previous logic was removed to make Python retry
-        # logic consistent with the other languages and to not disrupt the
-        # randomized retry delays distribution by artificially increasing a
-        # probability of scheduling two (instead of one) last attempts with very
-        # short delay between them, while the second retry having very low chance
-        # of succeeding anyways.
-        assert total_wait == 15.0
-
-    @mock.patch("time.sleep", autospec=True)
-    def test___init___without_retry_executed(self, sleep):
-        _some_function = mock.Mock()
-
-        retry_ = retry.Retry(
-            predicate=retry.if_exception_type(ValueError), on_error=_some_function
-        )
-        # check the proper creation of the class
-        assert retry_._on_error is _some_function
-
-        target = mock.Mock(spec=["__call__"], side_effect=[42])
-        # __name__ is needed by functools.partial.
-        target.__name__ = "target"
-
-        wrapped = retry_(target)
-
-        result = wrapped("meep")
-
-        assert result == 42
-        target.assert_called_once_with("meep")
-        sleep.assert_not_called()
-        _some_function.assert_not_called()
-
-    @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n)
-    @mock.patch("time.sleep", autospec=True)
-    def test___init___when_retry_is_executed(self, sleep, uniform):
-        _some_function = mock.Mock()
-
-        retry_ = retry.Retry(
-            predicate=retry.if_exception_type(ValueError), on_error=_some_function
-        )
-        # check the proper creation of the class
-        assert retry_._on_error is _some_function
-
-        target = mock.Mock(
-            spec=["__call__"], side_effect=[ValueError(), ValueError(), 42]
-        )
-        # __name__ is needed by functools.partial.
-        target.__name__ = "target"
-
-        wrapped = retry_(target)
-        target.assert_not_called()
-
-        result = wrapped("meep")
-
-        assert result == 42
-        assert target.call_count == 3
-        assert _some_function.call_count == 2
-        target.assert_has_calls([mock.call("meep"), mock.call("meep")])
-        sleep.assert_any_call(retry_._initial)
