| # Copyright 2015 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import io |
| import json |
| import logging |
| import time |
| import uuid |
| |
| from google.appengine.api import app_identity |
| |
| from apiclient import http |
| from apiclient.discovery import build |
| from oauth2client import client |
| |
| from base import exceptions |
| |
| |
| # urlfetch max size is 10 MB. Assume 1000 bytes per row and split the |
| # insert into chunks of 10,000 rows. |
| INSERTION_MAX_ROWS = 10000 |
| |
| |
| class BigQuery(object): |
| """Methods for interfacing with BigQuery.""" |
| |
| def __init__(self, project_id=None): |
| self._service = _Service() |
| if project_id: |
| self._project_id = project_id |
| else: |
| self._project_id = app_identity.get_application_id() |
| |
| def InsertRowsAsync(self, dataset_id, table_id, rows, |
| truncate=False, num_retries=5): |
| responses = [] |
| for i in xrange(0, len(rows), INSERTION_MAX_ROWS): |
| rows_chunk = rows[i:i+INSERTION_MAX_ROWS] |
| logging.info('Inserting %d rows into %s.%s.', |
| len(rows_chunk), dataset_id, table_id) |
| body = { |
| 'configuration': { |
| 'jobReference': { |
| 'projectId': self._project_id, |
| 'jobId': str(uuid.uuid4()), |
| }, |
| 'load': { |
| 'destinationTable': { |
| 'projectId': self._project_id, |
| 'datasetId': dataset_id, |
| 'tableId': table_id, |
| }, |
| 'sourceFormat': 'NEWLINE_DELIMITED_JSON', |
| 'writeDisposition': |
| 'WRITE_TRUNCATE' if truncate else 'WRITE_APPEND', |
| } |
| } |
| } |
| |
| # Format rows as newline-delimited JSON. |
| media_buffer = io.BytesIO() |
| for row in rows_chunk: |
| json.dump(row, media_buffer, separators=(',', ':')) |
| print >> media_buffer |
| media_body = http.MediaIoBaseUpload( |
| media_buffer, mimetype='application/octet-stream') |
| |
| responses.append(self._service.jobs().insert( |
| projectId=self._project_id, |
| body=body, media_body=media_body).execute(num_retries=num_retries)) |
| |
| # Only truncate on the first insert! |
| truncate = False |
| |
| # TODO(dtu): Return a Job object. |
| return responses |
| |
| def InsertRowsSync(self, dataset_id, table_id, rows, num_retries=5): |
| for i in xrange(0, len(rows), INSERTION_MAX_ROWS): |
| rows_chunk = rows[i:i+INSERTION_MAX_ROWS] |
| logging.info('Inserting %d rows into %s.%s.', |
| len(rows_chunk), dataset_id, table_id) |
| rows_chunk = [{'insertId': str(uuid.uuid4()), 'json': row} |
| for row in rows_chunk] |
| insert_data = {'rows': rows_chunk} |
| response = self._service.tabledata().insertAll( |
| projectId=self._project_id, |
| datasetId=dataset_id, |
| tableId=table_id, |
| body=insert_data).execute(num_retries=num_retries) |
| |
| if 'insertErrors' in response: |
| raise exceptions.QueryError(response['insertErrors']) |
| |
| def QueryAsync(self, query, num_retries=5): |
| logging.debug(query) |
| body = { |
| 'jobReference': { |
| 'projectId': self._project_id, |
| 'jobId': str(uuid.uuid4()), |
| }, |
| 'configuration': { |
| 'query': { |
| 'query': query, |
| 'priority': 'INTERACTIVE', |
| } |
| } |
| } |
| return self._service.jobs().insert( |
| projectId=self._project_id, |
| body=body).execute(num_retries=num_retries) |
| |
| def QuerySync(self, query, timeout=60, num_retries=5): |
| """Query Bigtable and return the results as a dict. |
| |
| Args: |
| query: Query string. |
| timeout: Timeout in seconds. |
| num_retries: Number of attempts. |
| |
| Returns: |
| Query results. The format is specified in the "rows" field here: |
| https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#getQueryResults |
| """ |
| logging.debug(query) |
| query_data = { |
| 'query': query, |
| 'timeoutMs': timeout * 1000, |
| } |
| start_time = time.time() |
| response = self._service.jobs().query( |
| projectId=self._project_id, |
| body=query_data).execute(num_retries=num_retries) |
| |
| if 'errors' in response: |
| raise exceptions.QueryError(response['errors']) |
| |
| # TODO(dtu): Fetch subsequent pages of rows for big queries. |
| # TODO(dtu): Reformat results as dicts. |
| result = response.get('rows', []) |
| logging.debug('Query fetched %d rows in %fs.', |
| len(result), time.time() - start_time) |
| return result |
| |
| def IsJobDone(self, job): |
| response = self._service.jobs().get(**job['jobReference']).execute() |
| if response['status']['state'] == 'DONE': |
| return response |
| else: |
| return None |
| |
| def PollJob(self, job, timeout): |
| # TODO(dtu): Take multiple jobs as parameters. |
| start_time = time.time() |
| iteration = 0 |
| |
| while True: |
| elapsed_time = time.time() - start_time |
| |
| response = self.IsJobDone(job) |
| if response: |
| if 'errors' in response['status']: |
| raise exceptions.QueryError(response['status']['errors']) |
| logging.debug('Polled job for %d seconds.', int(elapsed_time)) |
| return response |
| |
| if elapsed_time >= timeout: |
| break |
| time.sleep(min(1.5 ** iteration, timeout - elapsed_time)) |
| iteration += 1 |
| |
| raise exceptions.TimeoutError() |
| |
| |
| def _Service(): |
| """Returns an initialized and authorized BigQuery client.""" |
| # pylint: disable=no-member |
| credentials = client.GoogleCredentials.get_application_default() |
| if credentials.create_scoped_required(): |
| credentials = credentials.create_scoped( |
| 'https://www.googleapis.com/auth/bigquery') |
| return build('bigquery', 'v2', credentials=credentials) |