Merge changes I1406c879,Ie661d701

* changes:
  [logparser] Remove unused schema files.
  [logparser] Add support for batch and synchronous bigquery queries.
diff --git a/build/masters/master.client.adt/slave_logs/parser/bigquery.py b/build/masters/master.client.adt/slave_logs/parser/bigquery.py
index e22dc2d..bf7f0c2 100644
--- a/build/masters/master.client.adt/slave_logs/parser/bigquery.py
+++ b/build/masters/master.client.adt/slave_logs/parser/bigquery.py
@@ -16,6 +16,7 @@
 import pprint
 import threading
 import time
+import uuid
 from apiclient import http
 from googleapiclient import discovery
 
@@ -92,19 +93,23 @@
     while True:
         with context.lock:
             result = status_request.execute(num_retries=2)
+
         status = result['status']
-        if status['state'] == 'DONE':
-            if 'errorResult' in status:
-                err = ('Error when updating table %s with rows from %s '
-                        'using schema %s' % (context.table_id, item.data_path,
-                                             context.schema_path))
-                logging.error(err)
-                logging.error('BigQuery error details:')
-                logging.error(pprint.pformat(status['errorResult']))
-                raise BigQueryException(err)
-            else:
-                return
-        time.sleep(1)
+        if status['state'] != 'DONE':
+            time.sleep(1)
+            continue
+
+        if 'errorResult' not in status:
+            break
+
+        msg = ('Error when updating table %s with rows from %s using '
+               'schema %s' % (context.table_id, item.data_path,
+                              context.schema_path))
+        error = status['errorResult']
+        if 'debugInfo' in error:
+            del error['debugInfo']
+        logging.error(pprint.pformat(error))
+        raise BigQueryException(msg)
 
 
 class BigQuery(object):
@@ -114,6 +119,7 @@
     """
 
     NUM_CONCURRENT_UPLOADS = 20
+    QUERY_TEMPLATE_PARAMTER = 'DATASET_ID'
     DEFAULT_PROJECT_ID = 'android-devtools-emulator'
     DEFAULT_DATASET_ID = 'emu_buildbot_test'
 
@@ -160,3 +166,140 @@
             threads = threads[concurrency:]
             done += concurrency
         logging.info('Upload complete.')
+
+    def batch_query(self, query_template, destination_table):
+        """Run a query on bigquery in batch mode.
+
+        Runs a query on bigquery in batch mode. The result of the query is
+        written to |destination_table|.
+        Query should be specified as a templated string where all occurences of
+        DATASET_ID will be replaced with the current dataset id.
+        """
+        return self._run_async_query(
+                self._format_query(query_template),
+                destination_table)
+
+    def sync_query(self, query_template, return_table_schema, timeout_s=60):
+        """Runs a synchronous query and returns the result.
+
+        Args:
+            query_template: A templated string where all occurences of
+                    DATASET_ID will be replaced with the current dataset id.
+        Returns: TODO(pprabhu) describe the type of returned object.
+        """
+        query = self._format_query(query_template)
+        logging.debug('Running (de-templated) query: |%s|', query)
+        job_id = self._run_async_query(
+                query, batch_mode=False, large_tables_mode=False)
+
+        rows = []
+        page_token = None
+        while True:
+            page = self._bigquery.jobs().getQueryResults(
+                    projectId=self._project_id,
+                    jobId=job_id,
+                    pageToken=page_token,
+                    timeoutMs=timeout_s*1000).execute(num_retries=2)
+            # After the first query, we'll wait only 5 seconds for successive
+            # results.
+            timeout_s = 5
+
+            if not page['jobComplete']:
+                msg = ('Timed out waiting for query to complete: |%s|' %
+                       (query,))
+                logging.error(msg)
+                raise BigQueryException(msg)
+
+            rows += page['rows']
+            page_token = page.get('pageToken')
+            if page_token is None:
+                break
+
+        return self._marshall_query_result(rows, return_table_schema)
+
+    def job_status(self, job_id):
+        """Check the status of the indicated job.
+
+        Returns: True if the job completed successfully, False if it is still
+                outstanding.
+        Raises: BigQueryException if an error occured in running the job.
+        """
+        result = self._bigquery.jobs().get(
+            projectId=self._project_id,
+            jobId=job_id).execute(num_retries=2)
+        status = result['status']
+        if status['state'] != 'DONE':
+            return False
+        if 'errorResult' not in status:
+            return True
+
+        error = status['errorResult']
+        if 'debugInfo' in error:
+            del error['debugInfo']
+        logging.error('Job %s failed.', job_id)
+        logging.error(pprint.pformat(error))
+        raise BigQueryException('Job %s failed.' % (job_id,))
+
+    def _run_async_query(self, query,
+                         destination=None, batch_mode=True,
+                         large_tables_mode=True):
+        job_id = str(uuid.uuid4())
+        body = {
+                'jobReference': {
+                    'projectId': self._project_id,
+                    'job_id': job_id
+                },
+                'configuration': {
+                        'query': {
+                                'query': query,
+                                'priority': ('BATCH' if batch_mode
+                                             else 'INTERACTIVE')
+                        }
+                }
+        }
+        if destination is not None:
+            body['configuration']['query']['destinationTable'] = {
+                    'projectId': self._project_id,
+                    'datasetId': self._dataset_id,
+                    'tableId': destination
+            }
+        if large_tables_mode:
+            body['configuration']['query']['allowLargeResults'] = True
+        pprint.pprint(body)
+
+        insert_request = self._bigquery.jobs().insert(
+                projectId=self._project_id,
+                body=body)
+        return insert_request.execute(num_retries=2)['jobReference']['jobId']
+
+    def _format_query(self, query_template):
+        return query_template.replace(self.QUERY_TEMPLATE_PARAMTER,
+                                      self._dataset_id)
+
+    def _marshall_query_result(self, result, schema):
+        num_columns = len(schema)
+        names = [x['name'] for x in schema]
+        types = [x['type'] for x in schema]
+        allowed_types = set(['string', 'integer', 'float', 'timestamp'])
+        contained_types = set([x.lower() for x in types])
+        if contained_types - allowed_types:
+            raise BigQueryException('Schema contains disallowed types: %s' %
+                                    (str(contained_types - allowed_types),))
+
+        marshalled = []
+        for row in result:
+            columns = row['f']
+            if len(columns) != num_columns:
+                raise ValueError('Row should have %d columns, has %d' %
+                                 (num_columns, len(columns)))
+
+            marshalled_row = {}
+            for c in range(num_columns):
+                value = columns[c]['v']
+                if types[c] == 'integer':
+                    value = int(value)
+                elif types[c] == 'float':
+                    value = float(value)
+                marshalled_row[names[c]] = value
+            marshalled.append(marshalled_row)
+        return marshalled
diff --git a/build/masters/master.client.adt/slave_logs/parser/ctsTestFlakinessBQSchema.json b/build/masters/master.client.adt/slave_logs/parser/ctsTestFlakinessBQSchema.json
deleted file mode 100644
index f6c0235..0000000
--- a/build/masters/master.client.adt/slave_logs/parser/ctsTestFlakinessBQSchema.json
+++ /dev/null
@@ -1,59 +0,0 @@
-[
-    {
-        "name": "systemImageBranch",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageBuildId",
-        "type": "INTEGER",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageApi",
-        "type": "INTEGER",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageTag",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageAbi",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "emulatorBranch",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "emulatorQemu",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "ctsBranch",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "mode": "NULLABLE",
-        "name": "ctsFlakinessRecords",
-        "type": "RECORD",
-        "fields": [
-            {
-                "mode": "NULLABLE",
-                "name": "fullName",
-                "type": "STRING"
-            },
-            {
-                "mode": "NULLABLE",
-                "name": "flakinessResult",
-                "type": "STRING"
-            }
-        ]
-    }
-]
diff --git a/build/masters/master.client.adt/slave_logs/parser/ctsTestResultsBQSchema.json b/build/masters/master.client.adt/slave_logs/parser/ctsTestResultsBQSchema.json
deleted file mode 100644
index dfeb790..0000000
--- a/build/masters/master.client.adt/slave_logs/parser/ctsTestResultsBQSchema.json
+++ /dev/null
@@ -1,109 +0,0 @@
-[
-    {
-        "name": "runUid",
-        "type": "INTEGER",
-        "mode": "REQUIRED"
-    },
-    {
-        "name": "runStartTime",
-        "type": "TIMESTAMP",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "runEndTime",
-        "type": "TIMESTAMP",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "buildbotSlaveName",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "buildbotRunId",
-        "type": "INTEGER",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageBranch",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageBuildId",
-        "type": "INTEGER",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageApi",
-        "type": "INTEGER",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageTag",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "systemImageAbi",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "avdDevice",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "avdRam",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "avdGpu",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "emulatorBranch",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "emulatorRevision",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "emulatorQemu",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "ctsBranch",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "name": "ctsBuild",
-        "type": "STRING",
-        "mode": "NULLABLE"
-    },
-    {
-        "mode": "NULLABLE",
-        "name": "ctsTestRecords",
-        "type": "RECORD",
-        "fields": [
-            {
-                "mode": "NULLABLE",
-                "name": "fullName",
-                "type": "STRING"
-            },
-            {
-                "mode": "NULLABLE",
-                "name": "result",
-                "type": "STRING"
-            }
-        ]
-    }
-]