[autotest] add more metrics details in gs_offloader

This change is to add more details of metrics collected in gs_offloader.

BUG=chromium:716218
TEST=unittest, run gs_offloader locally with test GS bucket.

Change-Id: I09be661f3c8df8d196a3305e529d05e41ceabd78
Reviewed-on: https://chromium-review.googlesource.com/506781
Commit-Ready: Dan Shi <dshi@google.com>
Tested-by: Dan Shi <dshi@google.com>
Reviewed-by: Aviv Keshet <akeshet@chromium.org>
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index c4c3265..b9194f1 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -21,6 +21,7 @@
 import re
 import shutil
 import signal
+import socket
 import stat
 import subprocess
 import sys
@@ -33,9 +34,11 @@
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import utils
+from autotest_lib.client.common_lib.cros.graphite import autotest_es
 from autotest_lib.site_utils import job_directories
 from autotest_lib.site_utils import pubsub_utils
 from autotest_lib.tko import models
+from autotest_lib.utils import labellib
 from chromite.lib import gs
 
 # Autotest requires the psutil module from site-packages, so it must be imported
@@ -138,13 +141,15 @@
 NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id'
 NOTIFICATION_VERSION = '1'
 
-
 # the message data for new test result notification.
 NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
 
 # Full path to the correct gsutil command to run.
 _GSUTIL_CMD = gs.GSContext.GetDefaultGSUtilBin()
 
+# metadata type
+GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
+GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
 
 class TimeoutException(Exception):
     """Exception raised by the timeout_handler."""
@@ -165,6 +170,47 @@
     raise TimeoutException('Process Timed Out')
 
 
+def _get_metrics_fields(dir_entry):
+    """Get metrics fields for the given test result directory, including board
+    and milestone.
+
+    @param dir_entry: Directory entry to offload.
+    @return A dictionary for the metrics data to be uploaded.
+    """
+    fields = {'board': 'unknown',
+              'milestone': 'unknown'}
+    if dir_entry:
+        # There could be multiple hosts in the job directory, use the first one
+        # available.
+        for host in glob.glob(os.path.join(dir_entry, '*')):
+            keyval = models.test.parse_job_keyval(host)
+            build = keyval.get('build')
+            if build:
+                board, milestone = labellib.get_board_milestone_from_build(
+                        build)
+                fields['board'] = board
+                fields['milestone'] = milestone
+                break
+
+    return fields;
+
+
+def _get_es_metadata(dir_entry):
+    """Get ES metadata for the given test result directory.
+
+    @param dir_entry: Directory entry to offload.
+    @return A dictionary for the metadata to be uploaded.
+    """
+    fields = _get_metrics_fields(dir_entry)
+    fields['hostname'] = socket.gethostname()
+    # Include more data about the test job in metadata.
+    if dir_entry:
+        fields['dir_entry'] = dir_entry
+        fields['job_id'] = job_directories.get_job_id_or_task_id(dir_entry)
+
+    return fields
+
+
 def get_cmd_list(multiprocessing, dir_entry, gs_path):
     """Return the command to offload a specified directory.
 
@@ -507,6 +553,7 @@
 
     @return offload_dir function to perform the offload.
     """
+
     @metrics.SecondsTimerDecorator(
             'chromeos/autotest/gs_offloader/job_offload_duration')
     def offload_dir(dir_entry, dest_path, job_complete_time):
@@ -519,9 +566,12 @@
                                   database.
 
         """
-        error = False
+        start_time = time.time()
+        error = True
         stdout_file = None
         stderr_file = None
+        metrics_fields = _get_metrics_fields(dir_entry)
+        es_metadata = _get_es_metadata(dir_entry)
         try:
             upload_signal_filename = '%s/%s/.GS_UPLOADED' % (
                     RESULTS_DIR, dir_entry)
@@ -532,6 +582,8 @@
 
                 if LIMIT_FILE_COUNT:
                     limit_file_count(dir_entry)
+                dir_size = get_directory_size_kibibytes(dir_entry)
+                es_metadata['size_kb'] = dir_size
 
                 stdout_file = tempfile.TemporaryFile('w+')
                 stderr_file = tempfile.TemporaryFile('w+')
@@ -545,16 +597,20 @@
                 signal.alarm(0)
 
                 _emit_gs_returncode_metric(process.returncode)
-
                 if process.returncode == 0:
-                    dir_size = get_directory_size_kibibytes(dir_entry)
-
                     m_offload_count = (
                             'chromeos/autotest/gs_offloader/jobs_offloaded')
-                    metrics.Counter(m_offload_count).increment()
+                    metrics.Counter(m_offload_count).increment(
+                            fields=metrics_fields)
                     m_offload_size = ('chromeos/autotest/gs_offloader/'
                                       'kilobytes_transferred')
-                    metrics.Counter(m_offload_size).increment_by(dir_size)
+                    metrics.Counter(m_offload_size).increment_by(
+                            dir_size, fields=metrics_fields)
+                    es_metadata['time_used_sec'] = time.time() - start_time
+                    autotest_es.post(use_http=True,
+                                     type_str=GS_OFFLOADER_SUCCESS_TYPE,
+                                     metadata=es_metadata)
+                    error = False
 
                     if pubsub_topic:
                         message = _create_test_result_notification(
@@ -566,15 +622,14 @@
 
                     if not error:
                         open(upload_signal_filename, 'a').close()
-                else:
-                    error = True
+
             if os.path.isfile(upload_signal_filename):
                 if job_directories.is_job_expired(delete_age, job_complete_time):
                     shutil.rmtree(dir_entry)
 
         except TimeoutException:
             m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
-            metrics.Counter(m_timeout).increment()
+            metrics.Counter(m_timeout).increment(fields=metrics_fields)
             # If we finished the call to Popen(), we may need to
             # terminate the child process.  We don't bother calling
             # process.poll(); that inherently races because the child
@@ -588,7 +643,6 @@
                     pass
             logging.error('Offloading %s timed out after waiting %d '
                           'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
-            error = True
         except OSError as e:
             # The wrong file permission can lead call
             # `shutil.rmtree(dir_entry)` to raise OSError with message
@@ -598,12 +652,18 @@
                 correct_results_folder_permission(dir_entry)
             m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
                                   'wrong_permissions_count')
-            metrics.Counter(m_permission_error).increment()
+            metrics.Counter(m_permission_error).increment(fields=metrics_fields)
         finally:
             signal.alarm(0)
             if error:
                 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
-                metrics.Counter(m_any_error).increment()
+                metrics.Counter(m_any_error).increment(fields=metrics_fields)
+
+                es_metadata['time_used_sec'] = time.time() - start_time
+                autotest_es.post(use_http=True,
+                                 type_str=GS_OFFLOADER_FAILURE_TYPE,
+                                 metadata=es_metadata)
+
                 # Rewind the log files for stdout and stderr and log
                 # their contents.
                 stdout_file.seek(0)
@@ -612,6 +672,7 @@
                 logging.warning('Error occurred when offloading %s:', dir_entry)
                 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
                                 stderr_content)
+
                 # Some result files may have wrong file permission. Try
                 # to correct such error so later try can success.
                 # TODO(dshi): The code is added to correct result files
@@ -621,6 +682,7 @@
                 # correct_results_folder_permission can be deleted.
                 if 'CommandException: Error opening file' in stderr_content:
                     correct_results_folder_permission(dir_entry)
+
             if stdout_file:
                 stdout_file.close()
             if stderr_file:
diff --git a/utils/labellib.py b/utils/labellib.py
index 4c29fbb..d9e057e 100644
--- a/utils/labellib.py
+++ b/utils/labellib.py
@@ -125,18 +125,26 @@
 
 
 CrosVersion = collections.namedtuple(
-        'CrosVersion', 'group, milestone, version, rc')
+        'CrosVersion', 'group, board, milestone, version, rc')
 
 
 _CROS_VERSION_REGEX = (
-    r'^'
-    r'(?P<group>[a-z0-9-]+)'
-    r'/'
-    r'(?P<milestone>R[0-9]+)'
-    r'-'
-    r'(?P<version>[0-9.]+)'
-    r'(-(?P<rc>rc[0-9]+))?'
-    r'$'
+        r'^'
+        r'(?P<group>[a-z0-9-]+)'
+        r'/'
+        r'(?P<milestone>R[0-9]+)'
+        r'-'
+        r'(?P<version>[0-9.]+)'
+        r'(-(?P<rc>rc[0-9]+))?'
+        r'$'
+)
+
+_CROS_BOARD_FROM_VERSION_REGEX = (
+        r'^'
+        r'(trybot-)?'
+        r'(?P<board>[a-z_-]+)-(release|paladin|pre-cq|test-ap|toolchain)'
+        r'/R.*'
+        r'$'
 )
 
 
@@ -149,7 +157,13 @@
     match = re.search(_CROS_VERSION_REGEX, version_string)
     if match is None:
         raise ValueError('Invalid cros version string: %r' % version_string)
-    return CrosVersion(**match.groupdict())
+    parts = match.groupdict()
+    match = re.search(_CROS_BOARD_FROM_VERSION_REGEX, version_string)
+    if match is None:
+        raise ValueError('Invalid cros version string: %r. Failed to parse '
+                         'board.' % version_string)
+    parts['board'] = match.group('board')
+    return CrosVersion(**parts)
 
 
 def format_cros_version(cros_version):
@@ -158,4 +172,4 @@
         return '{group}/{milestone}-{version}-{rc}'.format(
                 **cros_version._asdict())
     else:
-        return '{group}/{milestone}-{version}'.format(**cros_version._asdict())
+        return '{group}/{milestone}-{version}'.format(**cros_version._asdict())
\ No newline at end of file
diff --git a/utils/labellib_unittest.py b/utils/labellib_unittest.py
index c7f27d3..aeaa387 100755
--- a/utils/labellib_unittest.py
+++ b/utils/labellib_unittest.py
@@ -7,7 +7,6 @@
 import copy
 import logging
 import unittest
-import warnings
 
 import common
 from autotest_lib.utils import labellib
@@ -147,13 +146,13 @@
 
     def test_parse_cros_version_without_rc(self):
         got = labellib.parse_cros_version('lumpy-release/R27-3773.0.0')
-        self.assertEqual(got, labellib.CrosVersion('lumpy-release', 'R27',
-                                                   '3773.0.0', None))
+        self.assertEqual(got, labellib.CrosVersion('lumpy-release', 'lumpy',
+                                                   'R27', '3773.0.0', None))
 
     def test_parse_cros_version_with_rc(self):
         got = labellib.parse_cros_version('lumpy-release/R27-3773.0.0-rc1')
-        self.assertEqual(got, labellib.CrosVersion('lumpy-release', 'R27',
-                                                   '3773.0.0', 'rc1'))
+        self.assertEqual(got, labellib.CrosVersion('lumpy-release', 'lumpy',
+                                                   'R27', '3773.0.0', 'rc1'))
 
     def test_parse_cros_version_raises(self):
         with self.assertRaises(ValueError):
@@ -161,16 +160,33 @@
 
     def test_format_cros_version_without_rc(self):
         got = labellib.format_cros_version(
-                labellib.CrosVersion('lumpy-release', 'R27',
+                labellib.CrosVersion('lumpy-release', 'lumpy', 'R27',
                                      '3773.0.0', None))
         self.assertEqual(got, 'lumpy-release/R27-3773.0.0')
 
     def test_format_cros_version_with_rc(self):
         got = labellib.format_cros_version(
-                labellib.CrosVersion('lumpy-release', 'R27',
+                labellib.CrosVersion('lumpy-release', 'lumpy',  'R27',
                                      '3773.0.0', 'rc1'))
         self.assertEqual(got, 'lumpy-release/R27-3773.0.0-rc1')
 
+    def test_parse_cros_version_for_board(self):
+        test_builds = ['lumpy-release/R27-3773.0.0-rc1',
+                       'trybot-lumpy-paladin/R27-3773.0.0',
+                       'lumpy-pre-cq/R27-3773.0.0-rc1',
+                       'lumpy-test-ap/R27-3773.0.0-rc1',
+                       'lumpy-toolchain/R27-3773.0.0-rc1',
+                       ]
+        for build in test_builds:
+            cros_version = labellib.parse_cros_version(build)
+            self.assertEqual(cros_version.board, 'lumpy')
+            self.assertEqual(cros_version.milestone, 'R27')
+
+        build = 'trybot-lumpy-a-pre-cq/R27-3773.0.0-rc1'
+        cros_version = labellib.parse_cros_version(build)
+        self.assertEqual(cros_version.board, 'lumpy-a')
+        self.assertEqual(cros_version.milestone, 'R27')
+
 
 if __name__ == '__main__':
     unittest.main()