| # -*- coding: utf-8 -*- |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Contains the perfdiag gsutil command.""" |
| |
| from __future__ import absolute_import |
| |
| import calendar |
| from collections import defaultdict |
| from collections import namedtuple |
| import contextlib |
| import cStringIO |
| import datetime |
| import httplib |
| import json |
| import logging |
| import math |
| import multiprocessing |
| import os |
| import random |
| import re |
| import socket |
| import string |
| import subprocess |
| import tempfile |
| import time |
| |
| import boto |
| import boto.gs.connection |
| |
| import gslib |
| from gslib.cloud_api import NotFoundException |
| from gslib.cloud_api import ServiceException |
| from gslib.cloud_api_helper import GetDownloadSerializationData |
| from gslib.command import Command |
| from gslib.command import DummyArgChecker |
| from gslib.command_argument import CommandArgument |
| from gslib.commands import config |
| from gslib.cs_api_map import ApiSelector |
| from gslib.exception import CommandException |
| from gslib.file_part import FilePart |
| from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
| from gslib.storage_url import StorageUrlFromString |
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
| from gslib.util import CheckFreeSpace |
| from gslib.util import DivideAndCeil |
| from gslib.util import GetCloudApiInstance |
| from gslib.util import GetFileSize |
| from gslib.util import GetMaxRetryDelay |
| from gslib.util import HumanReadableToBytes |
| from gslib.util import IS_LINUX |
| from gslib.util import MakeBitsHumanReadable |
| from gslib.util import MakeHumanReadable |
| from gslib.util import Percentile |
| from gslib.util import ResumableThreshold |
| |
| _SYNOPSIS = """ |
| gsutil perfdiag [-i in.json] |
| gsutil perfdiag [-o out.json] [-n objects] [-c processes] |
| [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory] |
| [-t tests] url... |
| """ |
| |
| _DETAILED_HELP_TEXT = (""" |
| <B>SYNOPSIS</B> |
| """ + _SYNOPSIS + """ |
| |
| |
| <B>DESCRIPTION</B> |
| The perfdiag command runs a suite of diagnostic tests for a given Google |
| Storage bucket. |
| |
| The 'url' parameter must name an existing bucket (e.g. gs://foo) to which |
| the user has write permission. Several test files will be uploaded to and |
| downloaded from this bucket. All test files will be deleted at the completion |
| of the diagnostic if it finishes successfully. |
| |
| gsutil performance can be impacted by many factors at the client, server, |
| and in-between, such as: CPU speed; available memory; the access path to the |
| local disk; network bandwidth; contention and error rates along the path |
| between gsutil and Google; operating system buffering configuration; and |
| firewalls and other network elements. The perfdiag command is provided so |
| that customers can run a known measurement suite when troubleshooting |
| performance problems. |
| |
| |
| <B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B> |
| If the Google Cloud Storage Team asks you to run a performance diagnostic |
| please use the following command, and email the output file (output.json) |
| to gs-team@google.com: |
| |
| gsutil perfdiag -o output.json gs://your-bucket |
| |
| |
| <B>OPTIONS</B> |
| -n Sets the number of objects to use when downloading and uploading |
| files during tests. Defaults to 5. |
| |
| -c Sets the number of processes to use while running throughput |
| experiments. The default value is 1. |
| |
| -k Sets the number of threads per process to use while running |
| throughput experiments. Each process will receive an equal number |
| of threads. The default value is 1. |
| |
| Note: All specified threads and processes will be created, but may |
| not by saturated with work if too few objects (specified with -n) |
| and too few components (specified with -y) are specified. |
| |
| -p Sets the type of parallelism to be used (only applicable when |
| threads or processes are specified and threads * processes > 1). |
| The default is to use fan. Must be one of the following: |
| |
| fan |
| Use one thread per object. This is akin to using gsutil -m cp, |
| with sliced object download / parallel composite upload |
| disabled. |
| |
| slice |
| Use Y (specified with -y) threads for each object, transferring |
| one object at a time. This is akin to using parallel object |
| download / parallel composite upload, without -m. Sliced |
| uploads not supported for s3. |
| |
| both |
| Use Y (specified with -y) threads for each object, transferring |
| multiple objects at a time. This is akin to simultaneously |
| using sliced object download / parallel composite upload and |
| gsutil -m cp. Sliced uploads not supported for s3. |
| |
| -y Sets the number of slices to divide each file/object into while |
| transferring data. Only applicable with the slice (or both) |
| parallelism type. The default is 4 slices. |
| |
| -s Sets the size (in bytes) for each of the N (set with -n) objects |
| used in the read and write throughput tests. The default is 1 MiB. |
| This can also be specified using byte suffixes such as 500K or 1M. |
| Note: these values are interpreted as multiples of 1024 (K=1024, |
| M=1024*1024, etc.) |
| Note: If rthru_file or wthru_file are performed, N (set with -n) |
| times as much disk space as specified will be required for the |
| operation. |
| |
| -d Sets the directory to store temporary local files in. If not |
| specified, a default temporary directory will be used. |
| |
| -t Sets the list of diagnostic tests to perform. The default is to |
| run the lat, rthru, and wthru diagnostic tests. Must be a |
| comma-separated list containing one or more of the following: |
| |
| lat |
| For N (set with -n) objects, write the object, retrieve its |
| metadata, read the object, and finally delete the object. |
| Record the latency of each operation. |
| |
| list |
| Write N (set with -n) objects to the bucket, record how long |
| it takes for the eventually consistent listing call to return |
| the N objects in its result, delete the N objects, then record |
| how long it takes listing to stop returning the N objects. |
| This test is off by default. |
| |
| rthru |
| Runs N (set with -n) read operations, with at most C |
| (set with -c) reads outstanding at any given time. |
| |
| rthru_file |
| The same as rthru, but simultaneously writes data to the disk, |
| to gauge the performance impact of the local disk on downloads. |
| |
| wthru |
| Runs N (set with -n) write operations, with at most C |
| (set with -c) writes outstanding at any given time. |
| |
| wthru_file |
| The same as wthru, but simultaneously reads data from the disk, |
| to gauge the performance impact of the local disk on uploads. |
| |
| -m Adds metadata to the result JSON file. Multiple -m values can be |
| specified. Example: |
| |
| gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname |
| |
| Each metadata key will be added to the top-level "metadata" |
| dictionary in the output JSON file. |
| |
| -o Writes the results of the diagnostic to an output file. The output |
| is a JSON file containing system information and performance |
| diagnostic results. The file can be read and reported later using |
| the -i option. |
| |
| -i Reads the JSON output file created using the -o command and prints |
| a formatted description of the results. |
| |
| |
| <B>MEASURING AVAILABILITY</B> |
| The perfdiag command ignores the boto num_retries configuration parameter. |
| Instead, it always retries on HTTP errors in the 500 range and keeps track of |
| how many 500 errors were encountered during the test. The availability |
| measurement is reported at the end of the test. |
| |
| Note that HTTP responses are only recorded when the request was made in a |
| single process. When using multiple processes or threads, read and write |
| throughput measurements are performed in an external process, so the |
| availability numbers reported won't include the throughput measurements. |
| |
| |
| <B>NOTE</B> |
| The perfdiag command collects system information. It collects your IP address, |
| executes DNS queries to Google servers and collects the results, and collects |
| network statistics information from the output of netstat -s. It will also |
| attempt to connect to your proxy server if you have one configured. None of |
| this information will be sent to Google unless you choose to send it. |
| """) |
| |
| FileDataTuple = namedtuple( |
| 'FileDataTuple', |
| 'size md5 data') |
| |
| # Describes one object in a fanned download. If need_to_slice is specified as |
| # True, the object should be downloaded with the slice strategy. Other field |
| # names are the same as documented in PerfDiagCommand.Download. |
| FanDownloadTuple = namedtuple( |
| 'FanDownloadTuple', |
| 'need_to_slice object_name file_name serialization_data') |
| |
| # Describes one slice in a sliced download. |
| # Field names are the same as documented in PerfDiagCommand.Download. |
| SliceDownloadTuple = namedtuple( |
| 'SliceDownloadTuple', |
| 'object_name file_name serialization_data start_byte end_byte') |
| |
| # Describes one file in a fanned upload. If need_to_slice is specified as |
| # True, the file should be uploaded with the slice strategy. Other field |
| # names are the same as documented in PerfDiagCommand.Upload. |
| FanUploadTuple = namedtuple( |
| 'FanUploadTuple', |
| 'need_to_slice file_name object_name use_file') |
| |
| # Describes one slice in a sliced upload. |
| # Field names are the same as documented in PerfDiagCommand.Upload. |
| SliceUploadTuple = namedtuple( |
| 'SliceUploadTuple', |
| 'file_name object_name use_file file_start file_size') |
| |
| # Dict storing file_path:FileDataTuple for each temporary file used by |
| # perfdiag. This data should be kept outside of the PerfDiagCommand class |
| # since calls to Apply will make copies of all member data. |
| temp_file_dict = {} |
| |
| |
| class Error(Exception): |
| """Base exception class for this module.""" |
| pass |
| |
| |
| class InvalidArgument(Error): |
| """Raised on invalid arguments to functions.""" |
| pass |
| |
| |
| def _DownloadObject(cls, args, thread_state=None): |
| """Function argument to apply for performing fanned parallel downloads. |
| |
| Args: |
| cls: The calling PerfDiagCommand class instance. |
| args: A FanDownloadTuple object describing this download. |
| thread_state: gsutil Cloud API instance to use for the operation. |
| """ |
| cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| if args.need_to_slice: |
| cls.PerformSlicedDownload(args.object_name, args.file_name, |
| args.serialization_data) |
| else: |
| cls.Download(args.object_name, args.file_name, args.serialization_data) |
| |
| |
| def _DownloadSlice(cls, args, thread_state=None): |
| """Function argument to apply for performing sliced downloads. |
| |
| Args: |
| cls: The calling PerfDiagCommand class instance. |
| args: A SliceDownloadTuple object describing this download. |
| thread_state: gsutil Cloud API instance to use for the operation. |
| """ |
| cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| cls.Download(args.object_name, args.file_name, args.serialization_data, |
| args.start_byte, args.end_byte) |
| |
| |
| def _UploadObject(cls, args, thread_state=None): |
| """Function argument to apply for performing fanned parallel uploads. |
| |
| Args: |
| cls: The calling PerfDiagCommand class instance. |
| args: A FanUploadTuple object describing this upload. |
| thread_state: gsutil Cloud API instance to use for the operation. |
| """ |
| cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| if args.need_to_slice: |
| cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file) |
| else: |
| cls.Upload(args.file_name, args.object_name, args.use_file) |
| |
| |
| def _UploadSlice(cls, args, thread_state=None): |
| """Function argument to apply for performing sliced parallel uploads. |
| |
| Args: |
| cls: The calling PerfDiagCommand class instance. |
| args: A SliceUploadTuple object describing this upload. |
| thread_state: gsutil Cloud API instance to use for the operation. |
| """ |
| cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| cls.Upload(args.file_name, args.object_name, args.use_file, |
| args.file_start, args.file_size) |
| |
| |
| def _DeleteWrapper(cls, object_name, thread_state=None): |
| """Function argument to apply for performing parallel object deletions. |
| |
| Args: |
| cls: The calling PerfDiagCommand class instance. |
| object_name: The object name to delete from the test bucket. |
| thread_state: gsutil Cloud API instance to use for the operation. |
| """ |
| cls.gsutil_api = GetCloudApiInstance(cls, thread_state) |
| cls.Delete(object_name) |
| |
| |
| def _PerfdiagExceptionHandler(cls, e): |
| """Simple exception handler to allow post-completion status.""" |
| cls.logger.error(str(e)) |
| |
| |
| def _DummyTrackerCallback(_): |
| pass |
| |
| |
| class DummyFile(object): |
| """A dummy, file-like object that throws away everything written to it.""" |
| |
| def write(self, *args, **kwargs): # pylint: disable=invalid-name |
| pass |
| |
| def close(self): # pylint: disable=invalid-name |
| pass |
| |
| |
| # Many functions in perfdiag re-define a temporary function based on a |
| # variable from a loop, resulting in a false positive from the linter. |
| # pylint: disable=cell-var-from-loop |
| class PerfDiagCommand(Command): |
| """Implementation of gsutil perfdiag command.""" |
| |
| # Command specification. See base class for documentation. |
| command_spec = Command.CreateCommandSpec( |
| 'perfdiag', |
| command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'], |
| usage_synopsis=_SYNOPSIS, |
| min_args=0, |
| max_args=1, |
| supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:', |
| file_url_ok=False, |
| provider_url_ok=False, |
| urls_start_arg=0, |
| gs_api_support=[ApiSelector.XML, ApiSelector.JSON], |
| gs_default_api=ApiSelector.JSON, |
| argparse_arguments=[ |
| CommandArgument.MakeNCloudBucketURLsArgument(1) |
| ] |
| ) |
| # Help specification. See help_provider.py for documentation. |
| help_spec = Command.HelpSpec( |
| help_name='perfdiag', |
| help_name_aliases=[], |
| help_type='command_help', |
| help_one_line_summary='Run performance diagnostic', |
| help_text=_DETAILED_HELP_TEXT, |
| subcommand_help_text={}, |
| ) |
| |
| # Byte sizes to use for latency testing files. |
| # TODO: Consider letting the user specify these sizes with a configuration |
| # parameter. |
| test_lat_file_sizes = ( |
| 0, # 0 bytes |
| 1024, # 1 KiB |
| 102400, # 100 KiB |
| 1048576, # 1 MiB |
| ) |
| |
| # Test names. |
| RTHRU = 'rthru' |
| RTHRU_FILE = 'rthru_file' |
| WTHRU = 'wthru' |
| WTHRU_FILE = 'wthru_file' |
| LAT = 'lat' |
| LIST = 'list' |
| |
| # Parallelism strategies. |
| FAN = 'fan' |
| SLICE = 'slice' |
| BOTH = 'both' |
| |
| # List of all diagnostic tests. |
| ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST) |
| |
| # List of diagnostic tests to run by default. |
| DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT) |
| |
| # List of parallelism strategies. |
| PARALLEL_STRATEGIES = (FAN, SLICE, BOTH) |
| |
| # Google Cloud Storage XML API endpoint host. |
| XML_API_HOST = boto.config.get( |
| 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) |
| # Google Cloud Storage XML API endpoint port. |
| XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80) |
| |
| # Maximum number of times to retry requests on 5xx errors. |
| MAX_SERVER_ERROR_RETRIES = 5 |
| # Maximum number of times to retry requests on more serious errors like |
| # the socket breaking. |
| MAX_TOTAL_RETRIES = 10 |
| |
| # The default buffer size in boto's Key object is set to 8 KiB. This becomes a |
| # bottleneck at high throughput rates, so we increase it. |
| KEY_BUFFER_SIZE = 16384 |
| |
| # The maximum number of bytes to generate pseudo-randomly before beginning |
| # to repeat bytes. This number was chosen as the next prime larger than 5 MiB. |
| MAX_UNIQUE_RANDOM_BYTES = 5242883 |
| |
| # Maximum amount of time, in seconds, we will wait for object listings to |
| # reflect what we expect in the listing tests. |
| MAX_LISTING_WAIT_TIME = 60.0 |
| |
| def _Exec(self, cmd, raise_on_error=True, return_output=False, |
| mute_stderr=False): |
| """Executes a command in a subprocess. |
| |
| Args: |
| cmd: List containing the command to execute. |
| raise_on_error: Whether or not to raise an exception when a process exits |
| with a non-zero return code. |
| return_output: If set to True, the return value of the function is the |
| stdout of the process. |
| mute_stderr: If set to True, the stderr of the process is not printed to |
| the console. |
| |
| Returns: |
| The return code of the process or the stdout if return_output is set. |
| |
| Raises: |
| Exception: If raise_on_error is set to True and any process exits with a |
| non-zero return code. |
| """ |
| self.logger.debug('Running command: %s', cmd) |
| stderr = subprocess.PIPE if mute_stderr else None |
| p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr) |
| (stdoutdata, _) = p.communicate() |
| if raise_on_error and p.returncode: |
| raise CommandException("Received non-zero return code (%d) from " |
| "subprocess '%s'." % (p.returncode, ' '.join(cmd))) |
| return stdoutdata if return_output else p.returncode |
| |
| def _WarnIfLargeData(self): |
| """Outputs a warning message if a large amount of data is being used.""" |
| if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'): |
| self.logger.info('This is a large operation, and could take a while.') |
| |
| def _MakeTempFile(self, file_size=0, mem_metadata=False, |
| mem_data=False, prefix='gsutil_test_file'): |
| """Creates a temporary file of the given size and returns its path. |
| |
| Args: |
| file_size: The size of the temporary file to create. |
| mem_metadata: If true, store md5 and file size in memory at |
| temp_file_dict[fpath].md5, tempfile_data[fpath].file_size. |
| mem_data: If true, store the file data in memory at |
| temp_file_dict[fpath].data |
| prefix: The prefix to use for the temporary file. Defaults to |
| gsutil_test_file. |
| |
| Returns: |
| The file path of the created temporary file. |
| """ |
| fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix, |
| dir=self.directory, text=False) |
| with os.fdopen(fd, 'wb') as fp: |
| random_bytes = os.urandom(min(file_size, |
| self.MAX_UNIQUE_RANDOM_BYTES)) |
| total_bytes_written = 0 |
| while total_bytes_written < file_size: |
| num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, |
| file_size - total_bytes_written) |
| fp.write(random_bytes[:num_bytes]) |
| total_bytes_written += num_bytes |
| |
| if mem_metadata or mem_data: |
| with open(fpath, 'rb') as fp: |
| file_size = GetFileSize(fp) if mem_metadata else None |
| md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None |
| data = fp.read() if mem_data else None |
| temp_file_dict[fpath] = FileDataTuple(file_size, md5, data) |
| |
| self.temporary_files.add(fpath) |
| return fpath |
| |
| def _SetUp(self): |
| """Performs setup operations needed before diagnostics can be run.""" |
| |
| # Stores test result data. |
| self.results = {} |
| # Set of file paths for local temporary files. |
| self.temporary_files = set() |
| # Set of names for test objects that exist in the test bucket. |
| self.temporary_objects = set() |
| # Total number of HTTP requests made. |
| self.total_requests = 0 |
| # Total number of HTTP 5xx errors. |
| self.request_errors = 0 |
| # Number of responses, keyed by response code. |
| self.error_responses_by_code = defaultdict(int) |
| # Total number of socket errors. |
| self.connection_breaks = 0 |
| # Boolean to prevent doing cleanup twice. |
| self.teardown_completed = False |
| |
| # Create files for latency test. |
| if self.LAT in self.diag_tests: |
| self.latency_files = [] |
| for file_size in self.test_lat_file_sizes: |
| fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True) |
| self.latency_files.append(fpath) |
| |
| # Create files for throughput tests. |
| if self.diag_tests.intersection( |
| (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)): |
| # Create a file for warming up the TCP connection. |
| self.tcp_warmup_file = self._MakeTempFile( |
| 5 * 1024 * 1024, mem_metadata=True, mem_data=True) |
| |
| # For in memory tests, throughput tests transfer the same object N times |
| # instead of creating N objects, in order to avoid excessive memory usage. |
| if self.diag_tests.intersection((self.RTHRU, self.WTHRU)): |
| self.mem_thru_file_name = self._MakeTempFile( |
| self.thru_filesize, mem_metadata=True, mem_data=True) |
| self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name) |
| |
| # For tests that use disk I/O, it is necessary to create N objects in |
| # in order to properly measure the performance impact of seeks. |
| if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)): |
| # List of file names and corresponding object names to use for file |
| # throughput tests. |
| self.thru_file_names = [] |
| self.thru_object_names = [] |
| |
| free_disk_space = CheckFreeSpace(self.directory) |
| if free_disk_space >= self.thru_filesize * self.num_objects: |
| self.logger.info('\nCreating %d local files each of size %s.' |
| % (self.num_objects, |
| MakeHumanReadable(self.thru_filesize))) |
| self._WarnIfLargeData() |
| for _ in range(self.num_objects): |
| file_name = self._MakeTempFile(self.thru_filesize, |
| mem_metadata=True) |
| self.thru_file_names.append(file_name) |
| self.thru_object_names.append(os.path.basename(file_name)) |
| else: |
| raise CommandException( |
| 'Not enough free disk space for throughput files: ' |
| '%s of disk space required, but only %s available.' |
| % (MakeHumanReadable(self.thru_filesize * self.num_objects), |
| MakeHumanReadable(free_disk_space))) |
| |
| # Dummy file buffer to use for downloading that goes nowhere. |
| self.discard_sink = DummyFile() |
| |
| # Filter out misleading progress callback output and the incorrect |
| # suggestion to use gsutil -m perfdiag. |
| self.logger.addFilter(self._PerfdiagFilter()) |
| |
| def _TearDown(self): |
| """Performs operations to clean things up after performing diagnostics.""" |
| if not self.teardown_completed: |
| temp_file_dict.clear() |
| |
| try: |
| for fpath in self.temporary_files: |
| os.remove(fpath) |
| if self.delete_directory: |
| os.rmdir(self.directory) |
| except OSError: |
| pass |
| |
| if self.threads > 1 or self.processes > 1: |
| args = [obj for obj in self.temporary_objects] |
| self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, |
| arg_checker=DummyArgChecker, |
| parallel_operations_override=True, |
| process_count=self.processes, thread_count=self.threads) |
| else: |
| for object_name in self.temporary_objects: |
| self.Delete(object_name) |
| self.teardown_completed = True |
| |
| @contextlib.contextmanager |
| def _Time(self, key, bucket): |
| """A context manager that measures time. |
| |
| A context manager that prints a status message before and after executing |
| the inner command and times how long the inner command takes. Keeps track of |
| the timing, aggregated by the given key. |
| |
| Args: |
| key: The key to insert the timing value into a dictionary bucket. |
| bucket: A dictionary to place the timing value in. |
| |
| Yields: |
| For the context manager. |
| """ |
| self.logger.info('%s starting...', key) |
| t0 = time.time() |
| yield |
| t1 = time.time() |
| bucket[key].append(t1 - t0) |
| self.logger.info('%s done.', key) |
| |
| def _RunOperation(self, func): |
| """Runs an operation with retry logic. |
| |
| Args: |
| func: The function to run. |
| |
| Returns: |
| True if the operation succeeds, False if aborted. |
| """ |
| # We retry on httplib exceptions that can happen if the socket was closed |
| # by the remote party or the connection broke because of network issues. |
| # Only the BotoServerError is counted as a 5xx error towards the retry |
| # limit. |
| success = False |
| server_error_retried = 0 |
| total_retried = 0 |
| i = 0 |
| return_val = None |
| while not success: |
| next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay()) |
| try: |
| return_val = func() |
| self.total_requests += 1 |
| success = True |
| except tuple(self.exceptions) as e: |
| total_retried += 1 |
| if total_retried > self.MAX_TOTAL_RETRIES: |
| self.logger.info('Reached maximum total retries. Not retrying.') |
| break |
| if isinstance(e, ServiceException): |
| if e.status >= 500: |
| self.error_responses_by_code[e.status] += 1 |
| self.total_requests += 1 |
| self.request_errors += 1 |
| server_error_retried += 1 |
| time.sleep(next_sleep) |
| else: |
| raise |
| if server_error_retried > self.MAX_SERVER_ERROR_RETRIES: |
| self.logger.info( |
| 'Reached maximum server error retries. Not retrying.') |
| break |
| else: |
| self.connection_breaks += 1 |
| return return_val |
| |
| def _RunLatencyTests(self): |
| """Runs latency tests.""" |
| # Stores timing information for each category of operation. |
| self.results['latency'] = defaultdict(list) |
| |
| for i in range(self.num_objects): |
| self.logger.info('\nRunning latency iteration %d...', i+1) |
| for fpath in self.latency_files: |
| file_data = temp_file_dict[fpath] |
| url = self.bucket_url.Clone() |
| url.object_name = os.path.basename(fpath) |
| file_size = file_data.size |
| readable_file_size = MakeHumanReadable(file_size) |
| |
| self.logger.info( |
| "\nFile of size %s located on disk at '%s' being diagnosed in the " |
| "cloud at '%s'.", readable_file_size, fpath, url) |
| |
| upload_target = StorageUrlToUploadObjectMetadata(url) |
| |
| def _Upload(): |
| io_fp = cStringIO.StringIO(file_data.data) |
| with self._Time('UPLOAD_%d' % file_size, self.results['latency']): |
| self.gsutil_api.UploadObject( |
| io_fp, upload_target, size=file_size, provider=self.provider, |
| fields=['name']) |
| self._RunOperation(_Upload) |
| |
| def _Metadata(): |
| with self._Time('METADATA_%d' % file_size, self.results['latency']): |
| return self.gsutil_api.GetObjectMetadata( |
| url.bucket_name, url.object_name, |
| provider=self.provider, fields=['name', 'contentType', |
| 'mediaLink', 'size']) |
| # Download will get the metadata first if we don't pass it in. |
| download_metadata = self._RunOperation(_Metadata) |
| serialization_data = GetDownloadSerializationData(download_metadata) |
| |
| def _Download(): |
| with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']): |
| self.gsutil_api.GetObjectMedia( |
| url.bucket_name, url.object_name, self.discard_sink, |
| provider=self.provider, serialization_data=serialization_data) |
| self._RunOperation(_Download) |
| |
| def _Delete(): |
| with self._Time('DELETE_%d' % file_size, self.results['latency']): |
| self.gsutil_api.DeleteObject(url.bucket_name, url.object_name, |
| provider=self.provider) |
| self._RunOperation(_Delete) |
| |
| class _PerfdiagFilter(logging.Filter): |
| |
| def filter(self, record): |
| # Used to prevent unnecessary output when using multiprocessing. |
| msg = record.getMessage() |
| return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or |
| ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg)) |
| |
| def _PerfdiagExceptionHandler(self, e): |
| """Simple exception handler to allow post-completion status.""" |
| self.logger.error(str(e)) |
| |
| def PerformFannedDownload(self, need_to_slice, object_names, file_names, |
| serialization_data): |
| """Performs a parallel download of multiple objects using the fan strategy. |
| |
| Args: |
| need_to_slice: If True, additionally apply the slice strategy to each |
| object in object_names. |
| object_names: A list of object names to be downloaded. Each object must |
| already exist in the test bucket. |
| file_names: A list, corresponding by index to object_names, of file names |
| for downloaded data. If None, discard downloaded data. |
| serialization_data: A list, corresponding by index to object_names, |
| of serialization data for each object. |
| """ |
| args = [] |
| for i in range(len(object_names)): |
| file_name = file_names[i] if file_names else None |
| args.append(FanDownloadTuple( |
| need_to_slice, object_names[i], file_name, |
| serialization_data[i])) |
| self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler, |
| ('total_requests', 'request_errors'), |
| arg_checker=DummyArgChecker, parallel_operations_override=True, |
| process_count=self.processes, thread_count=self.threads) |
| |
| def PerformSlicedDownload(self, object_name, file_name, serialization_data): |
| """Performs a download of an object using the slice strategy. |
| |
| Args: |
| object_name: The name of the object to download. |
| file_name: The name of the file to download data to, or None if data |
| should be discarded. |
| serialization_data: The serialization data for the object. |
| """ |
| if file_name: |
| with open(file_name, 'ab') as fp: |
| fp.truncate(self.thru_filesize) |
| component_size = DivideAndCeil(self.thru_filesize, self.num_slices) |
| args = [] |
| for i in range(self.num_slices): |
| start_byte = i * component_size |
| end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1) |
| args.append(SliceDownloadTuple(object_name, file_name, serialization_data, |
| start_byte, end_byte)) |
| self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler, |
| ('total_requests', 'request_errors'), |
| arg_checker=DummyArgChecker, parallel_operations_override=True, |
| process_count=self.processes, thread_count=self.threads) |
| |
| def PerformFannedUpload(self, need_to_slice, file_names, object_names, |
| use_file): |
| """Performs a parallel upload of multiple files using the fan strategy. |
| |
| The metadata for file_name should be present in temp_file_dict prior |
| to calling. Also, the data for file_name should be present in temp_file_dict |
| if use_file is specified as False. |
| |
| Args: |
| need_to_slice: If True, additionally apply the slice strategy to each |
| file in file_names. |
| file_names: A list of file names to be uploaded. |
| object_names: A list, corresponding by by index to file_names, of object |
| names to upload data to. |
| use_file: If true, use disk I/O, otherwise read upload data from memory. |
| """ |
| args = [] |
| for i in range(len(file_names)): |
| args.append(FanUploadTuple( |
| need_to_slice, file_names[i], object_names[i], use_file)) |
| self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, |
| ('total_requests', 'request_errors'), |
| arg_checker=DummyArgChecker, parallel_operations_override=True, |
| process_count=self.processes, thread_count=self.threads) |
| |
| def PerformSlicedUpload(self, file_name, object_name, use_file): |
| """Performs a parallel upload of a file using the slice strategy. |
| |
| The metadata for file_name should be present in temp_file_dict prior |
| to calling. Also, the data from for file_name should be present in |
| temp_file_dict if use_file is specified as False. |
| |
| Args: |
| file_name: The name of the file to upload. |
| object_name: The name of the object to upload to. |
| use_file: If true, use disk I/O, otherwise read upload data from memory. |
| """ |
| # Divide the file into components. |
| component_size = DivideAndCeil(self.thru_filesize, self.num_slices) |
| component_object_names = ( |
| [object_name + str(i) for i in range(self.num_slices)]) |
| |
| args = [] |
| for i in range(self.num_slices): |
| component_start = i * component_size |
| component_size = min(component_size, |
| temp_file_dict[file_name].size - component_start) |
| args.append(SliceUploadTuple(file_name, component_object_names[i], |
| use_file, component_start, component_size)) |
| |
| # Upload the components in parallel. |
| try: |
| self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler, |
| ('total_requests', 'request_errors'), |
| arg_checker=DummyArgChecker, parallel_operations_override=True, |
| process_count=self.processes, thread_count=self.threads) |
| |
| # Compose the components into an object. |
| request_components = [] |
| for i in range(self.num_slices): |
| src_obj_metadata = ( |
| apitools_messages.ComposeRequest.SourceObjectsValueListEntry( |
| name=component_object_names[i])) |
| request_components.append(src_obj_metadata) |
| |
| dst_obj_metadata = apitools_messages.Object() |
| dst_obj_metadata.name = object_name |
| dst_obj_metadata.bucket = self.bucket_url.bucket_name |
| def _Compose(): |
| self.gsutil_api.ComposeObject(request_components, dst_obj_metadata, |
| provider=self.provider) |
| self._RunOperation(_Compose) |
| finally: |
| # Delete the temporary components. |
| self.Apply(_DeleteWrapper, component_object_names, |
| _PerfdiagExceptionHandler, |
| ('total_requests', 'request_errors'), |
| arg_checker=DummyArgChecker, parallel_operations_override=True, |
| process_count=self.processes, thread_count=self.threads) |
| |
| def _RunReadThruTests(self, use_file=False): |
| """Runs read throughput tests.""" |
| test_name = 'read_throughput_file' if use_file else 'read_throughput' |
| file_io_string = 'with file I/O' if use_file else '' |
| self.logger.info( |
| '\nRunning read throughput tests %s (%s objects of size %s)' % |
| (file_io_string, self.num_objects, |
| MakeHumanReadable(self.thru_filesize))) |
| self._WarnIfLargeData() |
| |
| self.results[test_name] = {'file_size': self.thru_filesize, |
| 'processes': self.processes, |
| 'threads': self.threads, |
| 'parallelism': self.parallel_strategy |
| } |
| |
| # Copy the file(s) to the test bucket, and also get the serialization data |
| # so that we can pass it to download. |
| if use_file: |
| # For test with file I/O use N files on disk to preserve seek performance. |
| file_names = self.thru_file_names |
| object_names = self.thru_object_names |
| serialization_data = [] |
| for i in range(self.num_objects): |
| self.temporary_objects.add(self.thru_object_names[i]) |
| if self.WTHRU_FILE in self.diag_tests: |
| # If we ran the WTHRU_FILE test, then the objects already exist. |
| obj_metadata = self.gsutil_api.GetObjectMetadata( |
| self.bucket_url.bucket_name, self.thru_object_names[i], |
| fields=['size', 'mediaLink'], provider=self.bucket_url.scheme) |
| else: |
| obj_metadata = self.Upload(self.thru_file_names[i], |
| self.thru_object_names[i], use_file) |
| |
| # File overwrite causes performance issues with sliced downloads. |
| # Delete the file and reopen it for download. This matches what a real |
| # download would look like. |
| os.unlink(self.thru_file_names[i]) |
| open(self.thru_file_names[i], 'ab').close() |
| serialization_data.append(GetDownloadSerializationData(obj_metadata)) |
| else: |
| # For in-memory test only use one file but copy it num_objects times, to |
| # allow scalability in num_objects. |
| self.temporary_objects.add(self.mem_thru_object_name) |
| obj_metadata = self.Upload(self.mem_thru_file_name, |
| self.mem_thru_object_name, use_file) |
| file_names = None |
| object_names = [self.mem_thru_object_name] * self.num_objects |
| serialization_data = ( |
| [GetDownloadSerializationData(obj_metadata)] * self.num_objects) |
| |
| # Warmup the TCP connection. |
| warmup_obj_name = os.path.basename(self.tcp_warmup_file) |
| self.temporary_objects.add(warmup_obj_name) |
| self.Upload(self.tcp_warmup_file, warmup_obj_name) |
| self.Download(warmup_obj_name) |
| |
| t0 = time.time() |
| if self.processes == 1 and self.threads == 1: |
| for i in range(self.num_objects): |
| file_name = file_names[i] if use_file else None |
| self.Download(object_names[i], file_name, serialization_data[i]) |
| else: |
| if self.parallel_strategy in (self.FAN, self.BOTH): |
| need_to_slice = (self.parallel_strategy == self.BOTH) |
| self.PerformFannedDownload(need_to_slice, object_names, file_names, |
| serialization_data) |
| elif self.parallel_strategy == self.SLICE: |
| for i in range(self.num_objects): |
| file_name = file_names[i] if use_file else None |
| self.PerformSlicedDownload( |
| object_names[i], file_name, serialization_data[i]) |
| t1 = time.time() |
| |
| time_took = t1 - t0 |
| total_bytes_copied = self.thru_filesize * self.num_objects |
| bytes_per_second = total_bytes_copied / time_took |
| |
| self.results[test_name]['time_took'] = time_took |
| self.results[test_name]['total_bytes_copied'] = total_bytes_copied |
| self.results[test_name]['bytes_per_second'] = bytes_per_second |
| |
| def _RunWriteThruTests(self, use_file=False): |
| """Runs write throughput tests.""" |
| test_name = 'write_throughput_file' if use_file else 'write_throughput' |
| file_io_string = 'with file I/O' if use_file else '' |
| self.logger.info( |
| '\nRunning write throughput tests %s (%s objects of size %s)' % |
| (file_io_string, self.num_objects, |
| MakeHumanReadable(self.thru_filesize))) |
| self._WarnIfLargeData() |
| |
| self.results[test_name] = {'file_size': self.thru_filesize, |
| 'processes': self.processes, |
| 'threads': self.threads, |
| 'parallelism': self.parallel_strategy} |
| |
| # Warmup the TCP connection. |
| warmup_obj_name = os.path.basename(self.tcp_warmup_file) |
| self.temporary_objects.add(warmup_obj_name) |
| self.Upload(self.tcp_warmup_file, warmup_obj_name) |
| |
| if use_file: |
| # For test with file I/O use N files on disk to preserve seek performance. |
| file_names = self.thru_file_names |
| object_names = self.thru_object_names |
| else: |
| # For in-memory test only use one file but copy it num_objects times, to |
| # allow for scalability in num_objects. |
| file_names = [self.mem_thru_file_name] * self.num_objects |
| object_names = ( |
| [self.mem_thru_object_name + str(i) for i in range(self.num_objects)]) |
| |
| for object_name in object_names: |
| self.temporary_objects.add(object_name) |
| |
| t0 = time.time() |
| if self.processes == 1 and self.threads == 1: |
| for i in range(self.num_objects): |
| self.Upload(file_names[i], object_names[i], use_file) |
| else: |
| if self.parallel_strategy in (self.FAN, self.BOTH): |
| need_to_slice = (self.parallel_strategy == self.BOTH) |
| self.PerformFannedUpload(need_to_slice, file_names, object_names, |
| use_file) |
| elif self.parallel_strategy == self.SLICE: |
| for i in range(self.num_objects): |
| self.PerformSlicedUpload(file_names[i], object_names[i], use_file) |
| t1 = time.time() |
| |
| time_took = t1 - t0 |
| total_bytes_copied = self.thru_filesize * self.num_objects |
| bytes_per_second = total_bytes_copied / time_took |
| |
| self.results[test_name]['time_took'] = time_took |
| self.results[test_name]['total_bytes_copied'] = total_bytes_copied |
| self.results[test_name]['bytes_per_second'] = bytes_per_second |
| |
| def _RunListTests(self): |
| """Runs eventual consistency listing latency tests.""" |
| self.results['listing'] = {'num_files': self.num_objects} |
| |
| # Generate N random objects to put into the bucket. |
| list_prefix = 'gsutil-perfdiag-list-' |
| list_fpaths = [] |
| list_objects = [] |
| args = [] |
| for _ in xrange(self.num_objects): |
| fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True, |
| prefix=list_prefix) |
| list_fpaths.append(fpath) |
| object_name = os.path.basename(fpath) |
| list_objects.append(object_name) |
| args.append(FanUploadTuple(False, fpath, object_name, False)) |
| self.temporary_objects.add(object_name) |
| |
| # Add the objects to the bucket. |
| self.logger.info( |
| '\nWriting %s objects for listing test...', self.num_objects) |
| |
| self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, |
| arg_checker=DummyArgChecker) |
| |
| list_latencies = [] |
| files_seen = [] |
| total_start_time = time.time() |
| expected_objects = set(list_objects) |
| found_objects = set() |
| |
| def _List(): |
| """Lists and returns objects in the bucket. Also records latency.""" |
| t0 = time.time() |
| objects = list(self.gsutil_api.ListObjects( |
| self.bucket_url.bucket_name, delimiter='/', |
| provider=self.provider, fields=['items/name'])) |
| t1 = time.time() |
| list_latencies.append(t1 - t0) |
| return set([obj.data.name for obj in objects]) |
| |
| self.logger.info( |
| 'Listing bucket %s waiting for %s objects to appear...', |
| self.bucket_url.bucket_name, self.num_objects) |
| while expected_objects - found_objects: |
| def _ListAfterUpload(): |
| names = _List() |
| found_objects.update(names & expected_objects) |
| files_seen.append(len(found_objects)) |
| self._RunOperation(_ListAfterUpload) |
| if expected_objects - found_objects: |
| if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: |
| self.logger.warning('Maximum time reached waiting for listing.') |
| break |
| total_end_time = time.time() |
| |
| self.results['listing']['insert'] = { |
| 'num_listing_calls': len(list_latencies), |
| 'list_latencies': list_latencies, |
| 'files_seen_after_listing': files_seen, |
| 'time_took': total_end_time - total_start_time, |
| } |
| |
| args = [object_name for object_name in list_objects] |
| self.logger.info( |
| 'Deleting %s objects for listing test...', self.num_objects) |
| self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, |
| arg_checker=DummyArgChecker) |
| |
| self.logger.info( |
| 'Listing bucket %s waiting for %s objects to disappear...', |
| self.bucket_url.bucket_name, self.num_objects) |
| list_latencies = [] |
| files_seen = [] |
| total_start_time = time.time() |
| found_objects = set(list_objects) |
| while found_objects: |
| def _ListAfterDelete(): |
| names = _List() |
| found_objects.intersection_update(names) |
| files_seen.append(len(found_objects)) |
| self._RunOperation(_ListAfterDelete) |
| if found_objects: |
| if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: |
| self.logger.warning('Maximum time reached waiting for listing.') |
| break |
| total_end_time = time.time() |
| |
| self.results['listing']['delete'] = { |
| 'num_listing_calls': len(list_latencies), |
| 'list_latencies': list_latencies, |
| 'files_seen_after_listing': files_seen, |
| 'time_took': total_end_time - total_start_time, |
| } |
| |
| def Upload(self, file_name, object_name, use_file=False, file_start=0, |
| file_size=None): |
| """Performs an upload to the test bucket. |
| |
| The file is uploaded to the bucket referred to by self.bucket_url, and has |
| name object_name. |
| |
| Args: |
| file_name: The path to the local file, and the key to its entry in |
| temp_file_dict. |
| object_name: The name of the remote object. |
| use_file: If true, use disk I/O, otherwise read everything from memory. |
| file_start: The first byte in the file to upload to the object. |
| (only should be specified for sliced uploads) |
| file_size: The size of the file to upload. |
| (only should be specified for sliced uploads) |
| |
| Returns: |
| Uploaded Object Metadata. |
| """ |
| fp = None |
| if file_size is None: |
| file_size = temp_file_dict[file_name].size |
| |
| upload_url = self.bucket_url.Clone() |
| upload_url.object_name = object_name |
| upload_target = StorageUrlToUploadObjectMetadata(upload_url) |
| |
| try: |
| if use_file: |
| fp = FilePart(file_name, file_start, file_size) |
| else: |
| data = temp_file_dict[file_name].data[file_start:file_start+file_size] |
| fp = cStringIO.StringIO(data) |
| |
| def _InnerUpload(): |
| if file_size < ResumableThreshold(): |
| return self.gsutil_api.UploadObject( |
| fp, upload_target, provider=self.provider, size=file_size, |
| fields=['name', 'mediaLink', 'size']) |
| else: |
| return self.gsutil_api.UploadObjectResumable( |
| fp, upload_target, provider=self.provider, size=file_size, |
| fields=['name', 'mediaLink', 'size'], |
| tracker_callback=_DummyTrackerCallback) |
| return self._RunOperation(_InnerUpload) |
| finally: |
| if fp: |
| fp.close() |
| |
| def Download(self, object_name, file_name=None, serialization_data=None, |
| start_byte=0, end_byte=None): |
| """Downloads an object from the test bucket. |
| |
| Args: |
| object_name: The name of the object (in the test bucket) to download. |
| file_name: Optional file name to write downloaded data to. If None, |
| downloaded data is discarded immediately. |
| serialization_data: Optional serialization data, used so that we don't |
| have to get the metadata before downloading. |
| start_byte: The first byte in the object to download. |
| (only should be specified for sliced downloads) |
| end_byte: The last byte in the object to download. |
| (only should be specified for sliced downloads) |
| """ |
| fp = None |
| try: |
| if file_name is not None: |
| fp = open(file_name, 'r+b') |
| fp.seek(start_byte) |
| else: |
| fp = self.discard_sink |
| |
| def _InnerDownload(): |
| self.gsutil_api.GetObjectMedia( |
| self.bucket_url.bucket_name, object_name, fp, |
| provider=self.provider, start_byte=start_byte, end_byte=end_byte, |
| serialization_data=serialization_data) |
| self._RunOperation(_InnerDownload) |
| finally: |
| if fp: |
| fp.close() |
| |
| def Delete(self, object_name): |
| """Deletes an object from the test bucket. |
| |
| Args: |
| object_name: The name of the object to delete. |
| """ |
| try: |
| def _InnerDelete(): |
| self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name, |
| provider=self.provider) |
| self._RunOperation(_InnerDelete) |
| except NotFoundException: |
| pass |
| |
| def _GetDiskCounters(self): |
| """Retrieves disk I/O statistics for all disks. |
| |
| Adapted from the psutil module's psutil._pslinux.disk_io_counters: |
| http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py |
| |
| Originally distributed under under a BSD license. |
| Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola. |
| |
| Returns: |
| A dictionary containing disk names mapped to the disk counters from |
| /disk/diskstats. |
| """ |
| # iostat documentation states that sectors are equivalent with blocks and |
| # have a size of 512 bytes since 2.4 kernels. This value is needed to |
| # calculate the amount of disk I/O in bytes. |
| sector_size = 512 |
| |
| partitions = [] |
| with open('/proc/partitions', 'r') as f: |
| lines = f.readlines()[2:] |
| for line in lines: |
| _, _, _, name = line.split() |
| if name[-1].isdigit(): |
| partitions.append(name) |
| |
| retdict = {} |
| with open('/proc/diskstats', 'r') as f: |
| for line in f: |
| values = line.split()[:11] |
| _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values |
| if name in partitions: |
| rbytes = int(rbytes) * sector_size |
| wbytes = int(wbytes) * sector_size |
| reads = int(reads) |
| writes = int(writes) |
| rtime = int(rtime) |
| wtime = int(wtime) |
| retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime) |
| return retdict |
| |
| def _GetTcpStats(self): |
| """Tries to parse out TCP packet information from netstat output. |
| |
| Returns: |
| A dictionary containing TCP information, or None if netstat is not |
| available. |
| """ |
| # netstat return code is non-zero for -s on Linux, so don't raise on error. |
| try: |
| netstat_output = self._Exec(['netstat', '-s'], return_output=True, |
| raise_on_error=False) |
| except OSError: |
| self.logger.warning('netstat not found on your system; some measurement ' |
| 'data will be missing') |
| return None |
| netstat_output = netstat_output.strip().lower() |
| found_tcp = False |
| tcp_retransmit = None |
| tcp_received = None |
| tcp_sent = None |
| for line in netstat_output.split('\n'): |
| # Header for TCP section is "Tcp:" in Linux/Mac and |
| # "TCP Statistics for" in Windows. |
| if 'tcp:' in line or 'tcp statistics' in line: |
| found_tcp = True |
| |
| # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts" |
| # Windows == "segments retransmitted". |
| if (found_tcp and tcp_retransmit is None and |
| ('segments retransmited' in line or 'retransmit timeouts' in line or |
| 'segments retransmitted' in line)): |
| tcp_retransmit = ''.join(c for c in line if c in string.digits) |
| |
| # Linux+Windows == "segments received", Mac == "packets received". |
| if (found_tcp and tcp_received is None and |
| ('segments received' in line or 'packets received' in line)): |
| tcp_received = ''.join(c for c in line if c in string.digits) |
| |
| # Linux == "segments send out" (sic), Mac+Windows == "packets sent". |
| if (found_tcp and tcp_sent is None and |
| ('segments send out' in line or 'packets sent' in line or |
| 'segments sent' in line)): |
| tcp_sent = ''.join(c for c in line if c in string.digits) |
| |
| result = {} |
| try: |
| result['tcp_retransmit'] = int(tcp_retransmit) |
| result['tcp_received'] = int(tcp_received) |
| result['tcp_sent'] = int(tcp_sent) |
| except (ValueError, TypeError): |
| result['tcp_retransmit'] = None |
| result['tcp_received'] = None |
| result['tcp_sent'] = None |
| |
| return result |
| |
| def _CollectSysInfo(self): |
| """Collects system information.""" |
| sysinfo = {} |
| |
| # All exceptions that might be raised from socket module calls. |
| socket_errors = ( |
| socket.error, socket.herror, socket.gaierror, socket.timeout) |
| |
| # Find out whether HTTPS is enabled in Boto. |
| sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True) |
| |
| # Look up proxy info. |
| proxy_host = boto.config.get('Boto', 'proxy', None) |
| proxy_port = boto.config.getint('Boto', 'proxy_port', 0) |
| sysinfo['using_proxy'] = bool(proxy_host) |
| |
| if boto.config.get('Boto', 'proxy_rdns', False): |
| self.logger.info('DNS lookups are disallowed in this environment, so ' |
| 'some information is not included in this perfdiag run.') |
| |
| # Get the local IP address from socket lib. |
| try: |
| sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname()) |
| except socket_errors: |
| sysinfo['ip_address'] = '' |
| # Record the temporary directory used since it can affect performance, e.g. |
| # when on a networked filesystem. |
| sysinfo['tempdir'] = self.directory |
| |
| # Produces an RFC 2822 compliant GMT timestamp. |
| sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000', |
| time.gmtime()) |
| |
| # Execute a CNAME lookup on Google DNS to find what Google server |
| # it's routing to. |
| cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST] |
| try: |
| nslookup_cname_output = self._Exec(cmd, return_output=True) |
| m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output) |
| sysinfo['googserv_route'] = m.group('googserv') if m else None |
| except (CommandException, OSError): |
| sysinfo['googserv_route'] = '' |
| |
| # Try to determine the latency of a DNS lookup for the Google hostname |
| # endpoint. Note: we don't piggyback on gethostbyname_ex below because |
| # the _ex version requires an extra RTT. |
| try: |
| t0 = time.time() |
| socket.gethostbyname(self.XML_API_HOST) |
| t1 = time.time() |
| sysinfo['google_host_dns_latency'] = t1 - t0 |
| except socket_errors: |
| pass |
| |
| # Look up IP addresses for Google Server. |
| try: |
| (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST) |
| sysinfo['googserv_ips'] = ipaddrlist |
| except socket_errors: |
| ipaddrlist = [] |
| sysinfo['googserv_ips'] = [] |
| |
| # Reverse lookup the hostnames for the Google Server IPs. |
| sysinfo['googserv_hostnames'] = [] |
| for googserv_ip in ipaddrlist: |
| try: |
| (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip) |
| sysinfo['googserv_hostnames'].append(hostname) |
| except socket_errors: |
| pass |
| |
| # Query o-o to find out what the Google DNS thinks is the user's IP. |
| try: |
| cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.'] |
| nslookup_txt_output = self._Exec(cmd, return_output=True) |
| m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output) |
| sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None |
| except (CommandException, OSError): |
| sysinfo['dns_o-o_ip'] = '' |
| |
| # Try to determine the latency of connecting to the Google hostname |
| # endpoint. |
| sysinfo['google_host_connect_latencies'] = {} |
| for googserv_ip in ipaddrlist: |
| try: |
| sock = socket.socket() |
| t0 = time.time() |
| sock.connect((googserv_ip, self.XML_API_PORT)) |
| t1 = time.time() |
| sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0 |
| except socket_errors: |
| pass |
| |
| # If using a proxy, try to determine the latency of a DNS lookup to resolve |
| # the proxy hostname and the latency of connecting to the proxy. |
| if proxy_host: |
| proxy_ip = None |
| try: |
| t0 = time.time() |
| proxy_ip = socket.gethostbyname(proxy_host) |
| t1 = time.time() |
| sysinfo['proxy_dns_latency'] = t1 - t0 |
| except socket_errors: |
| pass |
| |
| try: |
| sock = socket.socket() |
| t0 = time.time() |
| sock.connect((proxy_ip or proxy_host, proxy_port)) |
| t1 = time.time() |
| sysinfo['proxy_host_connect_latency'] = t1 - t0 |
| except socket_errors: |
| pass |
| |
| # Try and find the number of CPUs in the system if available. |
| try: |
| sysinfo['cpu_count'] = multiprocessing.cpu_count() |
| except NotImplementedError: |
| sysinfo['cpu_count'] = None |
| |
| # For *nix platforms, obtain the CPU load. |
| try: |
| sysinfo['load_avg'] = list(os.getloadavg()) |
| except (AttributeError, OSError): |
| sysinfo['load_avg'] = None |
| |
| # Try and collect memory information from /proc/meminfo if possible. |
| mem_total = None |
| mem_free = None |
| mem_buffers = None |
| mem_cached = None |
| |
| try: |
| with open('/proc/meminfo', 'r') as f: |
| for line in f: |
| if line.startswith('MemTotal'): |
| mem_total = (int(''.join(c for c in line if c in string.digits)) |
| * 1000) |
| elif line.startswith('MemFree'): |
| mem_free = (int(''.join(c for c in line if c in string.digits)) |
| * 1000) |
| elif line.startswith('Buffers'): |
| mem_buffers = (int(''.join(c for c in line if c in string.digits)) |
| * 1000) |
| elif line.startswith('Cached'): |
| mem_cached = (int(''.join(c for c in line if c in string.digits)) |
| * 1000) |
| except (IOError, ValueError): |
| pass |
| |
| sysinfo['meminfo'] = {'mem_total': mem_total, |
| 'mem_free': mem_free, |
| 'mem_buffers': mem_buffers, |
| 'mem_cached': mem_cached} |
| |
| # Get configuration attributes from config module. |
| sysinfo['gsutil_config'] = {} |
| for attr in dir(config): |
| attr_value = getattr(config, attr) |
| # Filter out multiline strings that are not useful. |
| if attr.isupper() and not (isinstance(attr_value, basestring) and |
| '\n' in attr_value): |
| sysinfo['gsutil_config'][attr] = attr_value |
| |
| sysinfo['tcp_proc_values'] = {} |
| stats_to_check = [ |
| '/proc/sys/net/core/rmem_default', |
| '/proc/sys/net/core/rmem_max', |
| '/proc/sys/net/core/wmem_default', |
| '/proc/sys/net/core/wmem_max', |
| '/proc/sys/net/ipv4/tcp_timestamps', |
| '/proc/sys/net/ipv4/tcp_sack', |
| '/proc/sys/net/ipv4/tcp_window_scaling', |
| ] |
| for fname in stats_to_check: |
| try: |
| with open(fname, 'r') as f: |
| value = f.read() |
| sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip() |
| except IOError: |
| pass |
| |
| self.results['sysinfo'] = sysinfo |
| |
| def _DisplayStats(self, trials): |
| """Prints out mean, standard deviation, median, and 90th percentile.""" |
| n = len(trials) |
| mean = float(sum(trials)) / n |
| stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n) |
| |
| print str(n).rjust(6), '', |
| print ('%.1f' % (mean * 1000)).rjust(9), '', |
| print ('%.1f' % (stdev * 1000)).rjust(12), '', |
| print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '', |
| print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), '' |
| |
| def _DisplayResults(self): |
| """Displays results collected from diagnostic run.""" |
| print |
| print '=' * 78 |
| print 'DIAGNOSTIC RESULTS'.center(78) |
| print '=' * 78 |
| |
| if 'latency' in self.results: |
| print |
| print '-' * 78 |
| print 'Latency'.center(78) |
| print '-' * 78 |
| print ('Operation Size Trials Mean (ms) Std Dev (ms) ' |
| 'Median (ms) 90th % (ms)') |
| print ('========= ========= ====== ========= ============ ' |
| '=========== ===========') |
| for key in sorted(self.results['latency']): |
| trials = sorted(self.results['latency'][key]) |
| op, numbytes = key.split('_') |
| numbytes = int(numbytes) |
| if op == 'METADATA': |
| print 'Metadata'.rjust(9), '', |
| print MakeHumanReadable(numbytes).rjust(9), '', |
| self._DisplayStats(trials) |
| if op == 'DOWNLOAD': |
| print 'Download'.rjust(9), '', |
| print MakeHumanReadable(numbytes).rjust(9), '', |
| self._DisplayStats(trials) |
| if op == 'UPLOAD': |
| print 'Upload'.rjust(9), '', |
| print MakeHumanReadable(numbytes).rjust(9), '', |
| self._DisplayStats(trials) |
| if op == 'DELETE': |
| print 'Delete'.rjust(9), '', |
| print MakeHumanReadable(numbytes).rjust(9), '', |
| self._DisplayStats(trials) |
| |
| if 'write_throughput' in self.results: |
| print |
| print '-' * 78 |
| print 'Write Throughput'.center(78) |
| print '-' * 78 |
| write_thru = self.results['write_throughput'] |
| print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| self.num_objects, |
| MakeHumanReadable(write_thru['file_size']), |
| MakeHumanReadable(write_thru['total_bytes_copied'])) |
| print 'Write throughput: %s/s.' % ( |
| MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8)) |
| print 'Parallelism strategy: %s' % write_thru['parallelism'] |
| |
| if 'write_throughput_file' in self.results: |
| print |
| print '-' * 78 |
| print 'Write Throughput With File I/O'.center(78) |
| print '-' * 78 |
| write_thru_file = self.results['write_throughput_file'] |
| print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| self.num_objects, |
| MakeHumanReadable(write_thru_file['file_size']), |
| MakeHumanReadable(write_thru_file['total_bytes_copied'])) |
| print 'Write throughput: %s/s.' % ( |
| MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8)) |
| print 'Parallelism strategy: %s' % write_thru_file['parallelism'] |
| |
| if 'read_throughput' in self.results: |
| print |
| print '-' * 78 |
| print 'Read Throughput'.center(78) |
| print '-' * 78 |
| read_thru = self.results['read_throughput'] |
| print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| self.num_objects, |
| MakeHumanReadable(read_thru['file_size']), |
| MakeHumanReadable(read_thru['total_bytes_copied'])) |
| print 'Read throughput: %s/s.' % ( |
| MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8)) |
| print 'Parallelism strategy: %s' % read_thru['parallelism'] |
| |
| if 'read_throughput_file' in self.results: |
| print |
| print '-' * 78 |
| print 'Read Throughput With File I/O'.center(78) |
| print '-' * 78 |
| read_thru_file = self.results['read_throughput_file'] |
| print 'Copied %s %s file(s) for a total transfer size of %s.' % ( |
| self.num_objects, |
| MakeHumanReadable(read_thru_file['file_size']), |
| MakeHumanReadable(read_thru_file['total_bytes_copied'])) |
| print 'Read throughput: %s/s.' % ( |
| MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8)) |
| print 'Parallelism strategy: %s' % read_thru_file['parallelism'] |
| |
| if 'listing' in self.results: |
| print |
| print '-' * 78 |
| print 'Listing'.center(78) |
| print '-' * 78 |
| |
| listing = self.results['listing'] |
| insert = listing['insert'] |
| delete = listing['delete'] |
| print 'After inserting %s objects:' % listing['num_files'] |
| print (' Total time for objects to appear: %.2g seconds' % |
| insert['time_took']) |
| print ' Number of listing calls made: %s' % insert['num_listing_calls'] |
| print (' Individual listing call latencies: [%s]' % |
| ', '.join('%.2gs' % lat for lat in insert['list_latencies'])) |
| print (' Files reflected after each call: [%s]' % |
| ', '.join(map(str, insert['files_seen_after_listing']))) |
| |
| print 'After deleting %s objects:' % listing['num_files'] |
| print (' Total time for objects to appear: %.2g seconds' % |
| delete['time_took']) |
| print ' Number of listing calls made: %s' % delete['num_listing_calls'] |
| print (' Individual listing call latencies: [%s]' % |
| ', '.join('%.2gs' % lat for lat in delete['list_latencies'])) |
| print (' Files reflected after each call: [%s]' % |
| ', '.join(map(str, delete['files_seen_after_listing']))) |
| |
| if 'sysinfo' in self.results: |
| print |
| print '-' * 78 |
| print 'System Information'.center(78) |
| print '-' * 78 |
| info = self.results['sysinfo'] |
| print 'IP Address: \n %s' % info['ip_address'] |
| print 'Temporary Directory: \n %s' % info['tempdir'] |
| print 'Bucket URI: \n %s' % self.results['bucket_uri'] |
| print 'gsutil Version: \n %s' % self.results.get('gsutil_version', |
| 'Unknown') |
| print 'boto Version: \n %s' % self.results.get('boto_version', 'Unknown') |
| |
| if 'gmt_timestamp' in info: |
| ts_string = info['gmt_timestamp'] |
| timetuple = None |
| try: |
| # Convert RFC 2822 string to Linux timestamp. |
| timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000') |
| except ValueError: |
| pass |
| |
| if timetuple: |
| # Converts the GMT time tuple to local Linux timestamp. |
| localtime = calendar.timegm(timetuple) |
| localdt = datetime.datetime.fromtimestamp(localtime) |
| print 'Measurement time: \n %s' % localdt.strftime( |
| '%Y-%m-%d %I:%M:%S %p %Z') |
| |
| print 'Google Server: \n %s' % info['googserv_route'] |
| print ('Google Server IP Addresses: \n %s' % |
| ('\n '.join(info['googserv_ips']))) |
| print ('Google Server Hostnames: \n %s' % |
| ('\n '.join(info['googserv_hostnames']))) |
| print 'Google DNS thinks your IP is: \n %s' % info['dns_o-o_ip'] |
| print 'CPU Count: \n %s' % info['cpu_count'] |
| print 'CPU Load Average: \n %s' % info['load_avg'] |
| try: |
| print ('Total Memory: \n %s' % |
| MakeHumanReadable(info['meminfo']['mem_total'])) |
| # Free memory is really MemFree + Buffers + Cached. |
| print 'Free Memory: \n %s' % MakeHumanReadable( |
| info['meminfo']['mem_free'] + |
| info['meminfo']['mem_buffers'] + |
| info['meminfo']['mem_cached']) |
| except TypeError: |
| pass |
| |
| if 'netstat_end' in info and 'netstat_start' in info: |
| netstat_after = info['netstat_end'] |
| netstat_before = info['netstat_start'] |
| for tcp_type in ('sent', 'received', 'retransmit'): |
| try: |
| delta = (netstat_after['tcp_%s' % tcp_type] - |
| netstat_before['tcp_%s' % tcp_type]) |
| print 'TCP segments %s during test:\n %d' % (tcp_type, delta) |
| except TypeError: |
| pass |
| else: |
| print ('TCP segment counts not available because "netstat" was not ' |
| 'found during test runs') |
| |
| if 'disk_counters_end' in info and 'disk_counters_start' in info: |
| print 'Disk Counter Deltas:\n', |
| disk_after = info['disk_counters_end'] |
| disk_before = info['disk_counters_start'] |
| print '', 'disk'.rjust(6), |
| for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime', |
| 'wtime']: |
| print colname.rjust(8), |
| print |
| for diskname in sorted(disk_after): |
| before = disk_before[diskname] |
| after = disk_after[diskname] |
| (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before |
| (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after |
| print '', diskname.rjust(6), |
| deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1, |
| wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1] |
| for delta in deltas: |
| print str(delta).rjust(8), |
| print |
| |
| if 'tcp_proc_values' in info: |
| print 'TCP /proc values:\n', |
| for item in info['tcp_proc_values'].iteritems(): |
| print ' %s = %s' % item |
| |
| if 'boto_https_enabled' in info: |
| print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled'] |
| |
| if 'using_proxy' in info: |
| print 'Requests routed through proxy: \n %s' % info['using_proxy'] |
| |
| if 'google_host_dns_latency' in info: |
| print ('Latency of the DNS lookup for Google Storage server (ms): ' |
| '\n %.1f' % (info['google_host_dns_latency'] * 1000.0)) |
| |
| if 'google_host_connect_latencies' in info: |
| print 'Latencies connecting to Google Storage server IPs (ms):' |
| for ip, latency in info['google_host_connect_latencies'].iteritems(): |
| print ' %s = %.1f' % (ip, latency * 1000.0) |
| |
| if 'proxy_dns_latency' in info: |
| print ('Latency of the DNS lookup for the configured proxy (ms): ' |
| '\n %.1f' % (info['proxy_dns_latency'] * 1000.0)) |
| |
| if 'proxy_host_connect_latency' in info: |
| print ('Latency connecting to the configured proxy (ms): \n %.1f' % |
| (info['proxy_host_connect_latency'] * 1000.0)) |
| |
| if 'request_errors' in self.results and 'total_requests' in self.results: |
| print |
| print '-' * 78 |
| print 'In-Process HTTP Statistics'.center(78) |
| print '-' * 78 |
| total = int(self.results['total_requests']) |
| numerrors = int(self.results['request_errors']) |
| numbreaks = int(self.results['connection_breaks']) |
| availability = (((total - numerrors) / float(total)) * 100 |
| if total > 0 else 100) |
| print 'Total HTTP requests made: %d' % total |
| print 'HTTP 5xx errors: %d' % numerrors |
| print 'HTTP connections broken: %d' % numbreaks |
| print 'Availability: %.7g%%' % availability |
| if 'error_responses_by_code' in self.results: |
| sorted_codes = sorted( |
| self.results['error_responses_by_code'].iteritems()) |
| if sorted_codes: |
| print 'Error responses by code:' |
| print '\n'.join(' %s: %s' % c for c in sorted_codes) |
| |
| if self.output_file: |
| with open(self.output_file, 'w') as f: |
| json.dump(self.results, f, indent=2) |
| print |
| print "Output file written to '%s'." % self.output_file |
| |
| print |
| |
| def _ParsePositiveInteger(self, val, msg): |
| """Tries to convert val argument to a positive integer. |
| |
| Args: |
| val: The value (as a string) to convert to a positive integer. |
| msg: The error message to place in the CommandException on an error. |
| |
| Returns: |
| A valid positive integer. |
| |
| Raises: |
| CommandException: If the supplied value is not a valid positive integer. |
| """ |
| try: |
| val = int(val) |
| if val < 1: |
| raise CommandException(msg) |
| return val |
| except ValueError: |
| raise CommandException(msg) |
| |
| def _ParseArgs(self): |
| """Parses arguments for perfdiag command.""" |
| # From -n. |
| self.num_objects = 5 |
| # From -c. |
| self.processes = 1 |
| # From -k. |
| self.threads = 1 |
| # From -p |
| self.parallel_strategy = None |
| # From -y |
| self.num_slices = 4 |
| # From -s. |
| self.thru_filesize = 1048576 |
| # From -d. |
| self.directory = tempfile.gettempdir() |
| # Keep track of whether or not to delete the directory upon completion. |
| self.delete_directory = False |
| # From -t. |
| self.diag_tests = set(self.DEFAULT_DIAG_TESTS) |
| # From -o. |
| self.output_file = None |
| # From -i. |
| self.input_file = None |
| # From -m. |
| self.metadata_keys = {} |
| |
| if self.sub_opts: |
| for o, a in self.sub_opts: |
| if o == '-n': |
| self.num_objects = self._ParsePositiveInteger( |
| a, 'The -n parameter must be a positive integer.') |
| if o == '-c': |
| self.processes = self._ParsePositiveInteger( |
| a, 'The -c parameter must be a positive integer.') |
| if o == '-k': |
| self.threads = self._ParsePositiveInteger( |
| a, 'The -k parameter must be a positive integer.') |
| if o == '-p': |
| if a.lower() in self.PARALLEL_STRATEGIES: |
| self.parallel_strategy = a.lower() |
| else: |
| raise CommandException( |
| "'%s' is not a valid parallelism strategy." % a) |
| if o == '-y': |
| self.num_slices = self._ParsePositiveInteger( |
| a, 'The -y parameter must be a positive integer.') |
| if o == '-s': |
| try: |
| self.thru_filesize = HumanReadableToBytes(a) |
| except ValueError: |
| raise CommandException('Invalid -s parameter.') |
| if o == '-d': |
| self.directory = a |
| if not os.path.exists(self.directory): |
| self.delete_directory = True |
| os.makedirs(self.directory) |
| if o == '-t': |
| self.diag_tests = set() |
| for test_name in a.strip().split(','): |
| if test_name.lower() not in self.ALL_DIAG_TESTS: |
| raise CommandException("List of test names (-t) contains invalid " |
| "test name '%s'." % test_name) |
| self.diag_tests.add(test_name) |
| if o == '-m': |
| pieces = a.split(':') |
| if len(pieces) != 2: |
| raise CommandException( |
| "Invalid metadata key-value combination '%s'." % a) |
| key, value = pieces |
| self.metadata_keys[key] = value |
| if o == '-o': |
| self.output_file = os.path.abspath(a) |
| if o == '-i': |
| self.input_file = os.path.abspath(a) |
| if not os.path.isfile(self.input_file): |
| raise CommandException("Invalid input file (-i): '%s'." % a) |
| try: |
| with open(self.input_file, 'r') as f: |
| self.results = json.load(f) |
| self.logger.info("Read input file: '%s'.", self.input_file) |
| except ValueError: |
| raise CommandException("Could not decode input file (-i): '%s'." % |
| a) |
| return |
| |
| # If parallelism is specified, default parallelism strategy to fan. |
| if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy: |
| self.parallel_strategy = self.FAN |
| elif self.processes == 1 and self.threads == 1 and self.parallel_strategy: |
| raise CommandException( |
| 'Cannot specify parallelism strategy (-p) without also specifying ' |
| 'multiple threads and/or processes (-c and/or -k).') |
| |
| if not self.args: |
| self.RaiseWrongNumberOfArgumentsException() |
| |
| self.bucket_url = StorageUrlFromString(self.args[0]) |
| self.provider = self.bucket_url.scheme |
| if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket(): |
| raise CommandException('The perfdiag command requires a URL that ' |
| 'specifies a bucket.\n"%s" is not ' |
| 'valid.' % self.args[0]) |
| |
| if (self.thru_filesize > HumanReadableToBytes('2GiB') and |
| (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)): |
| raise CommandException( |
| 'For in-memory tests maximum file size is 2GiB. For larger file ' |
| 'sizes, specify rthru_file and/or wthru_file with the -t option.') |
| |
| perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH) |
| slice_not_available = ( |
| self.provider == 's3' and self.diag_tests.intersection(self.WTHRU, |
| self.WTHRU_FILE)) |
| if perform_slice and slice_not_available: |
| raise CommandException('Sliced uploads are not available for s3. ' |
| 'Use -p fan or sequential uploads for s3.') |
| |
| # Ensure the bucket exists. |
| self.gsutil_api.GetBucket(self.bucket_url.bucket_name, |
| provider=self.bucket_url.scheme, |
| fields=['id']) |
| self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror, |
| socket.timeout, httplib.BadStatusLine, |
| ServiceException] |
| |
| # Command entry point. |
| def RunCommand(self): |
| """Called by gsutil when the command is being invoked.""" |
| self._ParseArgs() |
| |
| if self.input_file: |
| self._DisplayResults() |
| return 0 |
| |
| # We turn off retries in the underlying boto library because the |
| # _RunOperation function handles errors manually so it can count them. |
| boto.config.set('Boto', 'num_retries', '0') |
| |
| self.logger.info( |
| 'Number of iterations to run: %d\n' |
| 'Base bucket URI: %s\n' |
| 'Number of processes: %d\n' |
| 'Number of threads: %d\n' |
| 'Parallelism strategy: %s\n' |
| 'Throughput file size: %s\n' |
| 'Diagnostics to run: %s', |
| self.num_objects, |
| self.bucket_url, |
| self.processes, |
| self.threads, |
| self.parallel_strategy, |
| MakeHumanReadable(self.thru_filesize), |
| (', '.join(self.diag_tests))) |
| |
| try: |
| self._SetUp() |
| |
| # Collect generic system info. |
| self._CollectSysInfo() |
| # Collect netstat info and disk counters before tests (and again later). |
| netstat_output = self._GetTcpStats() |
| if netstat_output: |
| self.results['sysinfo']['netstat_start'] = netstat_output |
| if IS_LINUX: |
| self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters() |
| # Record bucket URL. |
| self.results['bucket_uri'] = str(self.bucket_url) |
| self.results['json_format'] = 'perfdiag' |
| self.results['metadata'] = self.metadata_keys |
| |
| if self.LAT in self.diag_tests: |
| self._RunLatencyTests() |
| if self.RTHRU in self.diag_tests: |
| self._RunReadThruTests() |
| # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it |
| # will be used in RTHRU_FILE to save time and bandwidth. |
| if self.WTHRU_FILE in self.diag_tests: |
| self._RunWriteThruTests(use_file=True) |
| if self.RTHRU_FILE in self.diag_tests: |
| self._RunReadThruTests(use_file=True) |
| if self.WTHRU in self.diag_tests: |
| self._RunWriteThruTests() |
| if self.LIST in self.diag_tests: |
| self._RunListTests() |
| |
| # Collect netstat info and disk counters after tests. |
| netstat_output = self._GetTcpStats() |
| if netstat_output: |
| self.results['sysinfo']['netstat_end'] = netstat_output |
| if IS_LINUX: |
| self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters() |
| |
| self.results['total_requests'] = self.total_requests |
| self.results['request_errors'] = self.request_errors |
| self.results['error_responses_by_code'] = self.error_responses_by_code |
| self.results['connection_breaks'] = self.connection_breaks |
| self.results['gsutil_version'] = gslib.VERSION |
| self.results['boto_version'] = boto.__version__ |
| |
| self._TearDown() |
| self._DisplayResults() |
| finally: |
| # TODO: Install signal handlers so this is performed in response to a |
| # terminating signal; consider multi-threaded object deletes during |
| # cleanup so it happens quickly. |
| self._TearDown() |
| |
| return 0 |
| |
| |
| def StorageUrlToUploadObjectMetadata(storage_url): |
| if storage_url.IsCloudUrl() and storage_url.IsObject(): |
| upload_target = apitools_messages.Object() |
| upload_target.name = storage_url.object_name |
| upload_target.bucket = storage_url.bucket_name |
| return upload_target |
| else: |
| raise CommandException('Non-cloud URL upload target %s was created in ' |
| 'perfdiag implemenation.' % storage_url) |