Merge changes I1406c879,Ie661d701
* changes:
[logparser] Remove unused schema files.
[logparser] Add support for batch and synchronous bigquery queries.
diff --git a/build/masters/master.client.adt/slave_logs/parser/bigquery.py b/build/masters/master.client.adt/slave_logs/parser/bigquery.py
index e22dc2d..bf7f0c2 100644
--- a/build/masters/master.client.adt/slave_logs/parser/bigquery.py
+++ b/build/masters/master.client.adt/slave_logs/parser/bigquery.py
@@ -16,6 +16,7 @@
import pprint
import threading
import time
+import uuid
from apiclient import http
from googleapiclient import discovery
@@ -92,19 +93,23 @@
while True:
with context.lock:
result = status_request.execute(num_retries=2)
+
status = result['status']
- if status['state'] == 'DONE':
- if 'errorResult' in status:
- err = ('Error when updating table %s with rows from %s '
- 'using schema %s' % (context.table_id, item.data_path,
- context.schema_path))
- logging.error(err)
- logging.error('BigQuery error details:')
- logging.error(pprint.pformat(status['errorResult']))
- raise BigQueryException(err)
- else:
- return
- time.sleep(1)
+ if status['state'] != 'DONE':
+ time.sleep(1)
+ continue
+
+ if 'errorResult' not in status:
+ break
+
+ msg = ('Error when updating table %s with rows from %s using '
+ 'schema %s' % (context.table_id, item.data_path,
+ context.schema_path))
+ error = status['errorResult']
+ if 'debugInfo' in error:
+ del error['debugInfo']
+ logging.error(pprint.pformat(error))
+ raise BigQueryException(msg)
class BigQuery(object):
@@ -114,6 +119,7 @@
"""
NUM_CONCURRENT_UPLOADS = 20
+ QUERY_TEMPLATE_PARAMTER = 'DATASET_ID'
DEFAULT_PROJECT_ID = 'android-devtools-emulator'
DEFAULT_DATASET_ID = 'emu_buildbot_test'
@@ -160,3 +166,140 @@
threads = threads[concurrency:]
done += concurrency
logging.info('Upload complete.')
+
+ def batch_query(self, query_template, destination_table):
+ """Run a query on bigquery in batch mode.
+
+ Runs a query on bigquery in batch mode. The result of the query is
+ written to |destination_table|.
+ Query should be specified as a templated string where all occurences of
+ DATASET_ID will be replaced with the current dataset id.
+ """
+ return self._run_async_query(
+ self._format_query(query_template),
+ destination_table)
+
+ def sync_query(self, query_template, return_table_schema, timeout_s=60):
+ """Runs a synchronous query and returns the result.
+
+ Args:
+ query_template: A templated string where all occurences of
+ DATASET_ID will be replaced with the current dataset id.
+ Returns: TODO(pprabhu) describe the type of returned object.
+ """
+ query = self._format_query(query_template)
+ logging.debug('Running (de-templated) query: |%s|', query)
+ job_id = self._run_async_query(
+ query, batch_mode=False, large_tables_mode=False)
+
+ rows = []
+ page_token = None
+ while True:
+ page = self._bigquery.jobs().getQueryResults(
+ projectId=self._project_id,
+ jobId=job_id,
+ pageToken=page_token,
+ timeoutMs=timeout_s*1000).execute(num_retries=2)
+ # After the first query, we'll wait only 5 seconds for successive
+ # results.
+ timeout_s = 5
+
+ if not page['jobComplete']:
+ msg = ('Timed out waiting for query to complete: |%s|' %
+ (query,))
+ logging.error(msg)
+ raise BigQueryException(msg)
+
+ rows += page['rows']
+ page_token = page.get('pageToken')
+ if page_token is None:
+ break
+
+ return self._marshall_query_result(rows, return_table_schema)
+
+ def job_status(self, job_id):
+ """Check the status of the indicated job.
+
+ Returns: True if the job completed successfully, False if it is still
+ outstanding.
+ Raises: BigQueryException if an error occured in running the job.
+ """
+ result = self._bigquery.jobs().get(
+ projectId=self._project_id,
+ jobId=job_id).execute(num_retries=2)
+ status = result['status']
+ if status['state'] != 'DONE':
+ return False
+ if 'errorResult' not in status:
+ return True
+
+ error = status['errorResult']
+ if 'debugInfo' in error:
+ del error['debugInfo']
+ logging.error('Job %s failed.', job_id)
+ logging.error(pprint.pformat(error))
+ raise BigQueryException('Job %s failed.' % (job_id,))
+
+ def _run_async_query(self, query,
+ destination=None, batch_mode=True,
+ large_tables_mode=True):
+ job_id = str(uuid.uuid4())
+ body = {
+ 'jobReference': {
+ 'projectId': self._project_id,
+ 'job_id': job_id
+ },
+ 'configuration': {
+ 'query': {
+ 'query': query,
+ 'priority': ('BATCH' if batch_mode
+ else 'INTERACTIVE')
+ }
+ }
+ }
+ if destination is not None:
+ body['configuration']['query']['destinationTable'] = {
+ 'projectId': self._project_id,
+ 'datasetId': self._dataset_id,
+ 'tableId': destination
+ }
+ if large_tables_mode:
+ body['configuration']['query']['allowLargeResults'] = True
+ pprint.pprint(body)
+
+ insert_request = self._bigquery.jobs().insert(
+ projectId=self._project_id,
+ body=body)
+ return insert_request.execute(num_retries=2)['jobReference']['jobId']
+
+ def _format_query(self, query_template):
+ return query_template.replace(self.QUERY_TEMPLATE_PARAMTER,
+ self._dataset_id)
+
+ def _marshall_query_result(self, result, schema):
+ num_columns = len(schema)
+ names = [x['name'] for x in schema]
+ types = [x['type'] for x in schema]
+ allowed_types = set(['string', 'integer', 'float', 'timestamp'])
+ contained_types = set([x.lower() for x in types])
+ if contained_types - allowed_types:
+ raise BigQueryException('Schema contains disallowed types: %s' %
+ (str(contained_types - allowed_types),))
+
+ marshalled = []
+ for row in result:
+ columns = row['f']
+ if len(columns) != num_columns:
+ raise ValueError('Row should have %d columns, has %d' %
+ (num_columns, len(columns)))
+
+ marshalled_row = {}
+ for c in range(num_columns):
+ value = columns[c]['v']
+ if types[c] == 'integer':
+ value = int(value)
+ elif types[c] == 'float':
+ value = float(value)
+ marshalled_row[names[c]] = value
+ marshalled.append(marshalled_row)
+ return marshalled
diff --git a/build/masters/master.client.adt/slave_logs/parser/ctsTestFlakinessBQSchema.json b/build/masters/master.client.adt/slave_logs/parser/ctsTestFlakinessBQSchema.json
deleted file mode 100644
index f6c0235..0000000
--- a/build/masters/master.client.adt/slave_logs/parser/ctsTestFlakinessBQSchema.json
+++ /dev/null
@@ -1,59 +0,0 @@
-[
- {
- "name": "systemImageBranch",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageBuildId",
- "type": "INTEGER",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageApi",
- "type": "INTEGER",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageTag",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageAbi",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "emulatorBranch",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "emulatorQemu",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "ctsBranch",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "mode": "NULLABLE",
- "name": "ctsFlakinessRecords",
- "type": "RECORD",
- "fields": [
- {
- "mode": "NULLABLE",
- "name": "fullName",
- "type": "STRING"
- },
- {
- "mode": "NULLABLE",
- "name": "flakinessResult",
- "type": "STRING"
- }
- ]
- }
-]
diff --git a/build/masters/master.client.adt/slave_logs/parser/ctsTestResultsBQSchema.json b/build/masters/master.client.adt/slave_logs/parser/ctsTestResultsBQSchema.json
deleted file mode 100644
index dfeb790..0000000
--- a/build/masters/master.client.adt/slave_logs/parser/ctsTestResultsBQSchema.json
+++ /dev/null
@@ -1,109 +0,0 @@
-[
- {
- "name": "runUid",
- "type": "INTEGER",
- "mode": "REQUIRED"
- },
- {
- "name": "runStartTime",
- "type": "TIMESTAMP",
- "mode": "NULLABLE"
- },
- {
- "name": "runEndTime",
- "type": "TIMESTAMP",
- "mode": "NULLABLE"
- },
- {
- "name": "buildbotSlaveName",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "buildbotRunId",
- "type": "INTEGER",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageBranch",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageBuildId",
- "type": "INTEGER",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageApi",
- "type": "INTEGER",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageTag",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "systemImageAbi",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "avdDevice",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "avdRam",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "avdGpu",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "emulatorBranch",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "emulatorRevision",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "emulatorQemu",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "ctsBranch",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "name": "ctsBuild",
- "type": "STRING",
- "mode": "NULLABLE"
- },
- {
- "mode": "NULLABLE",
- "name": "ctsTestRecords",
- "type": "RECORD",
- "fields": [
- {
- "mode": "NULLABLE",
- "name": "fullName",
- "type": "STRING"
- },
- {
- "mode": "NULLABLE",
- "name": "result",
- "type": "STRING"
- }
- ]
- }
-]