Merge changes I83162c52,I4b4a7775,I9b2cc8de,I0b63d2fd,I8fa2d271, ... into main am: 4ae87eaef3 am: b7d5a179dc

Original change: https://android-review.googlesource.com/c/platform/tools/asuite/+/3366497

Change-Id: I02f14b384067ef1305d48225e6c8180400bcc26e
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/Android.bp b/Android.bp
index 60c8327..67f7510 100644
--- a/Android.bp
+++ b/Android.bp
@@ -57,3 +57,11 @@
         canonical_path_from_root: false,
     },
 }
+
+filegroup {
+    name: "adte-owners-files",
+    srcs: [
+        "OWNERS_ADTE_TEAM",
+        "OWNERS",
+    ],
+}
diff --git a/OWNERS_ADTE_TEAM b/OWNERS_ADTE_TEAM
index 76e26bc..12ca660 100644
--- a/OWNERS_ADTE_TEAM
+++ b/OWNERS_ADTE_TEAM
@@ -2,6 +2,7 @@
 davidjames@google.com
 hwj@google.com
 hzalek@google.com
+ihcinihsdk@google.com
 kevindagostino@google.com
 liuyg@google.com
 lucafarsi@google.com
diff --git a/atest/Android.bp b/atest/Android.bp
index 7337b1d..c5fda1f 100644
--- a/atest/Android.bp
+++ b/atest/Android.bp
@@ -68,6 +68,7 @@
     defaults: ["atest_binary_defaults"],
     main: "atest_main.py",
     data: [
+        ":adte-owners-files",
         ":atest_flag_list_for_completion",
         ":atest_log_uploader",
     ],
diff --git a/atest/atest_utils.py b/atest/atest_utils.py
index a893d60..2bc306d 100644
--- a/atest/atest_utils.py
+++ b/atest/atest_utils.py
@@ -27,7 +27,9 @@
 import fnmatch
 import hashlib
 import html
-import importlib
+import importlib.resources
+import importlib.util
+import io
 import itertools
 import json
 import logging
@@ -40,6 +42,7 @@
 import shutil
 import subprocess
 import sys
+import threading
 from threading import Thread
 import traceback
 from typing import Any, Dict, IO, List, Set, Tuple
@@ -54,7 +57,7 @@
 from atest.metrics import metrics_utils
 from atest.tf_proto import test_record_pb2
 
-_BUILD_OUTPUT_ROLLING_LINES = 6
+DEFAULT_OUTPUT_ROLLING_LINES = 6
 _BASH_CLEAR_PREVIOUS_LINE_CODE = '\033[F\033[K'
 _BASH_RESET_CODE = '\033[0m'
 DIST_OUT_DIR = Path(
@@ -270,44 +273,111 @@
   return output
 
 
-def _stream_io_output(io_input: IO, io_output: IO, max_lines=None):
+def stream_io_output(
+    io_input: IO,
+    max_lines=None,
+    full_output_receiver: IO = None,
+    io_output: IO = None,
+    is_io_output_atty=None,
+):
   """Stream an IO output with max number of rolling lines to display if set.
 
   Args:
       input: The file-like object to read the output from.
-      output: The file-like object to write the output to.
       max_lines: The maximum number of rolling lines to display. If None, all
         lines will be displayed.
+      full_output_receiver: Optional io to receive the full output.
+      io_output: The file-like object to write the output to.
+      is_io_output_atty: Whether the io_output is a TTY.
   """
-  print('\n----------------------------------------------------')
-  term_width, _ = get_terminal_size()
-  full_output = []
-  last_lines = None if not max_lines else deque(maxlen=max_lines)
-  last_number_of_lines = 0
-  for line in iter(io_input.readline, ''):
-    full_output.append(line)
-    line = line.rstrip()
-    if last_lines is None:
+  if io_output is None:
+    io_output = _original_sys_stdout
+  if is_io_output_atty is None:
+    is_io_output_atty = _has_colors(io_output)
+  if not max_lines or not is_io_output_atty:
+    for line in iter(io_input.readline, ''):
+      if not line:
+        break
+      if full_output_receiver is not None:
+        full_output_receiver.write(
+            line if isinstance(line, str) else line.decode('utf-8')
+        )
       io_output.write(line)
-      io_output.write('\n')
       io_output.flush()
-      continue
+    return
+
+  term_width, _ = get_terminal_size()
+  last_lines = deque(maxlen=max_lines)
+  is_rolling = True
+
+  def reset_output():
+    if not is_rolling:
+      return
+    io_output.write(_BASH_CLEAR_PREVIOUS_LINE_CODE * (len(last_lines) + 2))
+
+  def write_output(new_lines: list[str]):
+    if not is_rolling:
+      return
+    last_lines.extend(new_lines)
+    lines = ['========== Rolling subprocess output ==========']
+    lines.extend(last_lines)
+    lines.append('-----------------------------------------------')
+    io_output.write('\n'.join(lines))
+    io_output.write('\n')
+    io_output.flush()
+
+  original_stdout = sys.stdout
+
+  lock = threading.Lock()
+
+  class SafeStdout:
+
+    def __init__(self):
+      self._buffers = []
+
+    def write(self, buf: str) -> None:
+      if len(buf) == 1 and buf[0] == '\n' and self._buffers:
+        with lock:
+          reset_output()
+          original_stdout.write(''.join(self._buffers))
+          original_stdout.write('\n')
+          original_stdout.flush()
+          write_output([])
+          self._buffers.clear()
+      else:
+        self._buffers.append(buf)
+
+    def flush(self) -> None:
+      original_stdout.flush()
+
+  sys.stdout = SafeStdout()
+
+  for line in iter(io_input.readline, ''):
+    if not line:
+      break
+    line = line.decode('utf-8') if isinstance(line, bytes) else line
+    if full_output_receiver is not None:
+      full_output_receiver.write(line)
+    line = line.rstrip()
     # Split the line if it's longer than the terminal width
     wrapped_lines = (
         [line]
         if len(line) <= term_width
         else [line[i : i + term_width] for i in range(0, len(line), term_width)]
     )
-    last_lines.extend(wrapped_lines)
-    io_output.write(_BASH_CLEAR_PREVIOUS_LINE_CODE * last_number_of_lines)
-    io_output.write('\n'.join(last_lines))
-    io_output.write('\n')
+    with lock:
+      reset_output()
+      write_output(wrapped_lines)
+
+  with lock:
+    reset_output()
+    is_rolling = False
+    io_output.write(_BASH_RESET_CODE)
     io_output.flush()
-    last_number_of_lines = len(last_lines)
+
+  sys.stdout = original_stdout
+
   io_input.close()
-  io_output.write(_BASH_RESET_CODE)
-  io_output.flush()
-  print('----------------------------------------------------')
 
 
 def run_limited_output(
@@ -336,12 +406,18 @@
       start_new_session=start_new_session,
       text=True,
   ) as proc:
-    _stream_io_output(
-        proc.stdout, _original_sys_stdout, _BUILD_OUTPUT_ROLLING_LINES
+    full_output_receiver = io.StringIO()
+    stream_io_output(
+        proc.stdout,
+        DEFAULT_OUTPUT_ROLLING_LINES,
+        full_output_receiver,
+        _original_sys_stdout,
     )
     returncode = proc.wait()
     if returncode:
-      raise subprocess.CalledProcessError(returncode, cmd, full_output)
+      raise subprocess.CalledProcessError(
+          returncode, cmd, full_output_receiver.getvalue()
+      )
 
 
 def get_build_out_dir(*joinpaths) -> Path:
@@ -533,6 +609,11 @@
   return all((len(args.tests) == 1, args.tests[0][0] == ':'))
 
 
+def is_atty_terminal() -> bool:
+  """Check if the current process is running in a TTY."""
+  return getattr(_original_sys_stdout, 'isatty', lambda: False)()
+
+
 def _has_colors(stream):
   """Check the output stream is colorful.
 
diff --git a/atest/atest_utils_unittest.py b/atest/atest_utils_unittest.py
index 0b8d6a3..243b6b5 100755
--- a/atest/atest_utils_unittest.py
+++ b/atest/atest_utils_unittest.py
@@ -66,6 +66,7 @@
 ----------------------------
 """
 
+
 class StreamIoOutputTest(unittest.TestCase):
   """Class that tests the _stream_io_output function."""
 
@@ -76,9 +77,13 @@
     io_input.seek(0)
     io_output = StringIO()
 
-    atest_utils._stream_io_output(io_input, io_output, max_lines=None)
+    atest_utils.stream_io_output(
+        io_input, max_lines=None, io_output=io_output, is_io_output_atty=True
+    )
 
-    self.assertNotIn(atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE, io_output.getvalue())
+    self.assertNotIn(
+        atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE, io_output.getvalue()
+    )
 
   @mock.patch.object(atest_utils, 'get_terminal_size', return_value=(5, -1))
   def test_stream_io_output_wrap_long_lines(self, _):
@@ -88,7 +93,9 @@
     io_input.seek(0)
     io_output = StringIO()
 
-    atest_utils._stream_io_output(io_input, io_output, max_lines=10)
+    atest_utils.stream_io_output(
+        io_input, max_lines=10, io_output=io_output, is_io_output_atty=True
+    )
 
     self.assertIn('11111\n11111', io_output.getvalue())
 
@@ -100,10 +107,16 @@
     io_input.seek(0)
     io_output = StringIO()
 
-    atest_utils._stream_io_output(io_input, io_output, max_lines=2)
+    atest_utils.stream_io_output(
+        io_input, max_lines=2, io_output=io_output, is_io_output_atty=True
+    )
 
     self.assertIn(
-        atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE * 2 + '2\n3\n',
+        '2\n3\n',
+        io_output.getvalue(),
+    )
+    self.assertNotIn(
+        '1\n2\n3\n',
         io_output.getvalue(),
     )
 
@@ -115,10 +128,12 @@
     io_input.seek(0)
     io_output = StringIO()
 
-    atest_utils._stream_io_output(io_input, io_output, max_lines=4)
+    atest_utils.stream_io_output(
+        io_input, max_lines=4, io_output=io_output, is_io_output_atty=True
+    )
 
     self.assertIn(
-        atest_utils._BASH_CLEAR_PREVIOUS_LINE_CODE * 2 + '1\n2\n3\n',
+        '1\n2\n3\n',
         io_output.getvalue(),
     )
 
diff --git a/atest/rollout_control.py b/atest/rollout_control.py
index df146f5..9dd9f01 100644
--- a/atest/rollout_control.py
+++ b/atest/rollout_control.py
@@ -18,12 +18,34 @@
 import functools
 import getpass
 import hashlib
+import importlib.resources
 import logging
 import os
 from atest import atest_enum
 from atest.metrics import metrics
 
 
+@functools.cache
+def _get_project_owners() -> list[str]:
+  """Returns the owners of the feature."""
+  owners = []
+  try:
+    with importlib.resources.as_file(
+        importlib.resources.files('atest').joinpath('OWNERS')
+    ) as version_file_path:
+      owners.extend(version_file_path.read_text(encoding='utf-8').splitlines())
+  except (ModuleNotFoundError, FileNotFoundError) as e:
+    logging.error(e)
+  try:
+    with importlib.resources.as_file(
+        importlib.resources.files('atest').joinpath('OWNERS_ADTE_TEAM')
+    ) as version_file_path:
+      owners.extend(version_file_path.read_text(encoding='utf-8').splitlines())
+  except (ModuleNotFoundError, FileNotFoundError) as e:
+    logging.error(e)
+  return [line.split('@')[0] for line in owners if '@google.com' in line]
+
+
 class RolloutControlledFeature:
   """Base class for Atest features under rollout control."""
 
@@ -33,6 +55,7 @@
       rollout_percentage: float,
       env_control_flag: str,
       feature_id: int = None,
+      owners: list[str] | None = None,
   ):
     """Initializes the object.
 
@@ -45,6 +68,8 @@
           disable.
         feature_id: The ID of the feature that is controlled by rollout control
           for metric collection purpose. Must be a positive integer.
+        owners: The owners of the feature. If not provided, the owners of the
+          feature will be read from OWNERS file.
     """
     if rollout_percentage < 0 or rollout_percentage > 100:
       raise ValueError(
@@ -55,10 +80,13 @@
       raise ValueError(
           'Feature ID must be a positive integer. Got %s instead.' % feature_id
       )
+    if owners is None:
+      owners = _get_project_owners()
     self._name = name
     self._rollout_percentage = rollout_percentage
     self._env_control_flag = env_control_flag
     self._feature_id = feature_id
+    self._owners = owners
 
   def _check_env_control_flag(self) -> bool | None:
     """Checks the environment variable to override the feature enablement.
@@ -98,22 +126,30 @@
         )
       return override_flag_value
 
+    if self._rollout_percentage == 100:
+      return True
+
     if username is None:
       username = getpass.getuser()
 
     if not username:
-      logging.error(
+      logging.debug(
           'Unable to determine the username. Disabling the feature %s.',
           self._name,
       )
       return False
 
-    hash_object = hashlib.sha256()
-    hash_object.update((username + ' ' + self._name).encode('utf-8'))
+    is_enabled = username in self._owners
 
-    is_enabled = (
-        int(hash_object.hexdigest(), 16) % 100 < self._rollout_percentage
-    )
+    if not is_enabled:
+      if self._rollout_percentage == 0:
+        return False
+
+      hash_object = hashlib.sha256()
+      hash_object.update((username + ' ' + self._name).encode('utf-8'))
+      is_enabled = (
+          int(hash_object.hexdigest(), 16) % 100 < self._rollout_percentage
+      )
 
     logging.debug(
         'Feature %s is %s for user %s.',
@@ -122,7 +158,7 @@
         username,
     )
 
-    if self._feature_id and 0 < self._rollout_percentage < 100:
+    if self._feature_id:
       metrics.LocalDetectEvent(
           detect_type=atest_enum.DetectType.ROLLOUT_CONTROLLED_FEATURE_ID,
           result=self._feature_id if is_enabled else -self._feature_id,
@@ -137,3 +173,10 @@
     env_control_flag='DISABLE_BAZEL_MODE_BY_DEFAULT',
     feature_id=1,
 )
+
+rolling_tf_subprocess_output = RolloutControlledFeature(
+    name='rolling_tf_subprocess_output',
+    rollout_percentage=0,
+    env_control_flag='ROLLING_TF_SUBPROCESS_OUTPUT',
+    feature_id=2,
+)
diff --git a/atest/rollout_control_unittest.py b/atest/rollout_control_unittest.py
index 05dc9b0..ca000d0 100644
--- a/atest/rollout_control_unittest.py
+++ b/atest/rollout_control_unittest.py
@@ -56,7 +56,7 @@
   def test_is_enabled_username_undetermined_returns_false(self):
     sut = rollout_control.RolloutControlledFeature(
         name='test_feature',
-        rollout_percentage=100,
+        rollout_percentage=99,
         env_control_flag='TEST_FEATURE',
     )
 
@@ -91,3 +91,14 @@
 
     with mock.patch.dict('os.environ', {'TEST_FEATURE': 'false'}):
       self.assertFalse(sut.is_enabled())
+
+  def test_is_enabled_is_owner_returns_true(self):
+    sut = rollout_control.RolloutControlledFeature(
+        name='test_feature',
+        rollout_percentage=0,
+        env_control_flag='TEST_FEATURE',
+        owners=['owner_name'],
+    )
+
+    self.assertFalse(sut.is_enabled('name'))
+    self.assertTrue(sut.is_enabled('owner_name'))
diff --git a/atest/test_runners/atest_tf_test_runner.py b/atest/test_runners/atest_tf_test_runner.py
index f227c4a..87e1046 100644
--- a/atest/test_runners/atest_tf_test_runner.py
+++ b/atest/test_runners/atest_tf_test_runner.py
@@ -31,6 +31,7 @@
 import select
 import shutil
 import socket
+import threading
 import time
 from typing import Any, Dict, List, Set, Tuple
 
@@ -40,6 +41,7 @@
 from atest import constants
 from atest import module_info
 from atest import result_reporter
+from atest import rollout_control
 from atest.atest_enum import DetectType, ExitCode
 from atest.coverage import coverage
 from atest.logstorage import logstorage_utils
@@ -399,12 +401,26 @@
     run_cmds = self.generate_run_commands(
         test_infos, extra_args, server.getsockname()[1]
     )
+    is_rolling_output = (
+        rollout_control.rolling_tf_subprocess_output.is_enabled()
+        and not extra_args.get(constants.VERBOSE, False)
+        and atest_utils.is_atty_terminal()
+    )
+
     logging.debug('Running test: %s', run_cmds[0])
     subproc = self.run(
         run_cmds[0],
         output_to_stdout=extra_args.get(constants.VERBOSE, False),
         env_vars=self.generate_env_vars(extra_args),
+        rolling_output_lines=is_rolling_output,
     )
+
+    if is_rolling_output:
+      threading.Thread(
+          target=atest_utils.stream_io_output,
+          args=(subproc.stdout, atest_utils.DEFAULT_OUTPUT_ROLLING_LINES),
+      ).start()
+
     self.handle_subprocess(
         subproc,
         partial(self._start_monitor, server, subproc, reporter, extra_args),
diff --git a/atest/test_runners/test_runner_base.py b/atest/test_runners/test_runner_base.py
index 499cb1f..927960a 100644
--- a/atest/test_runners/test_runner_base.py
+++ b/atest/test_runners/test_runner_base.py
@@ -79,6 +79,7 @@
     """Init stuff for base class."""
     self.results_dir = results_dir
     self.test_log_file = None
+    self._subprocess_stdout = None
     if not self.NAME:
       raise atest_error.NoTestRunnerName('Class var NAME is not defined.')
     if not self.EXECUTABLE:
@@ -116,7 +117,13 @@
     """Checks whether this runner requires device update."""
     return False
 
-  def run(self, cmd, output_to_stdout=False, env_vars=None):
+  def run(
+      self,
+      cmd,
+      output_to_stdout=False,
+      env_vars=None,
+      rolling_output_lines=False,
+  ):
     """Shell out and execute command.
 
     Args:
@@ -127,20 +134,34 @@
           reporter to print the test results.  Set to True to see the output of
           the cmd. This would be appropriate for verbose runs.
         env_vars: Environment variables passed to the subprocess.
+        rolling_output_lines: If True, the subprocess output will be streamed
+          with rolling lines when output_to_stdout is False.
     """
-    if not output_to_stdout:
-      self.test_log_file = tempfile.NamedTemporaryFile(
-          mode='w', dir=self.results_dir, delete=True
-      )
     logging.debug('Executing command: %s', cmd)
-    return subprocess.Popen(
-        cmd,
-        start_new_session=True,
-        shell=True,
-        stderr=subprocess.STDOUT,
-        stdout=self.test_log_file,
-        env=env_vars,
-    )
+    if rolling_output_lines:
+      proc = subprocess.Popen(
+          cmd,
+          start_new_session=True,
+          shell=True,
+          stderr=subprocess.STDOUT,
+          stdout=None if output_to_stdout else subprocess.PIPE,
+          env=env_vars,
+      )
+      self._subprocess_stdout = proc.stdout
+      return proc
+    else:
+      if not output_to_stdout:
+        self.test_log_file = tempfile.NamedTemporaryFile(
+            mode='w', dir=self.results_dir, delete=True
+        )
+      return subprocess.Popen(
+          cmd,
+          start_new_session=True,
+          shell=True,
+          stderr=subprocess.STDOUT,
+          stdout=self.test_log_file,
+          env=env_vars,
+      )
 
   # pylint: disable=broad-except
   def handle_subprocess(self, subproc, func):
@@ -165,11 +186,15 @@
         # we have to save it above.
         logging.debug('Subproc already terminated, skipping')
       finally:
-        if self.test_log_file:
+        full_output = ''
+        if self._subprocess_stdout:
+          full_output = self._subprocess_stdout.read()
+        elif self.test_log_file:
           with open(self.test_log_file.name, 'r') as f:
-            intro_msg = 'Unexpected Issue. Raw Output:'
-            print(atest_utils.mark_red(intro_msg))
-            print(f.read())
+            full_output = f.read()
+        if full_output:
+          print(atest_utils.mark_red('Unexpected Issue. Raw Output:'))
+          print(full_output)
         # Ignore socket.recv() raising due to ctrl-c
         if not error.args or error.args[0] != errno.EINTR:
           raise error