blob: 5d06076a53824fe89a10693b5938ce7b608282f6 [file] [log] [blame]
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import re
import time
import urlparse
from telemetry.core import util
def _UrlPathJoin(*args):
"""Joins each path in |args| for insertion into a URL path.
This is distinct from os.path.join in that:
1. Forward slashes are always used.
2. Paths beginning with '/' are not treated as absolute.
For example:
_UrlPathJoin('a', 'b') => 'a/b'
_UrlPathJoin('a/', 'b') => 'a/b'
_UrlPathJoin('a', '/b') => 'a/b'
_UrlPathJoin('a/', '/b') => 'a/b'
"""
if not args:
return ''
if len(args) == 1:
return str(args[0])
else:
args = [str(arg).replace('\\', '/') for arg in args]
work = [args[0]]
for arg in args[1:]:
if not arg:
continue
if arg.startswith('/'):
work.append(arg[1:])
else:
work.append(arg)
joined = reduce(os.path.join, work)
return joined.replace('\\', '/')
class Page(object):
def __init__(self, url, page_set, attributes=None, base_dir=None):
parsed_url = urlparse.urlparse(url)
if not parsed_url.scheme:
abspath = os.path.abspath(os.path.join(base_dir, parsed_url.path))
if os.path.exists(abspath):
url = 'file://%s' % os.path.abspath(os.path.join(base_dir, url))
else:
raise Exception('URLs must be fully qualified: %s' % url)
self.url = url
self.page_set = page_set
self.base_dir = base_dir
# These attributes can be set dynamically by the page.
self.credentials = None
self.disabled = False
self.script_to_evaluate_on_commit = None
if attributes:
for k, v in attributes.iteritems():
setattr(self, k, v)
def __getattr__(self, name):
if self.page_set and hasattr(self.page_set, name):
return getattr(self.page_set, name)
raise AttributeError()
@property
def is_file(self):
parsed_url = urlparse.urlparse(self.url)
return parsed_url.scheme == 'file'
@property
def is_local(self):
parsed_url = urlparse.urlparse(self.url)
return parsed_url.scheme == 'file' or parsed_url.scheme == 'chrome'
@property
def serving_dirs_and_file(self):
parsed_url = urlparse.urlparse(self.url)
path = _UrlPathJoin(self.base_dir, parsed_url.netloc, parsed_url.path)
if hasattr(self.page_set, 'serving_dirs'):
url_base_dir = os.path.commonprefix(self.page_set.serving_dirs)
base_path = _UrlPathJoin(self.base_dir, url_base_dir)
return ([_UrlPathJoin(self.base_dir, d)
for d in self.page_set.serving_dirs],
path.replace(base_path, ''))
return os.path.split(path)
# A version of this page's URL that's safe to use as a filename.
@property
def url_as_file_safe_name(self):
# Just replace all special characters in the url with underscore.
return re.sub('[^a-zA-Z0-9]', '_', self.display_url)
@property
def display_url(self):
if not self.is_local:
return self.url
url_paths = ['/'.join(p.url.strip('/').split('/')[:-1])
for p in self.page_set if p.is_file]
common_prefix = os.path.commonprefix(url_paths)
return self.url[len(common_prefix):].strip('/')
@property
def archive_path(self):
return self.page_set.WprFilePathForPage(self)
def __str__(self):
return self.url
def WaitToLoad(self, tab, timeout, poll_interval=0.1):
Page.WaitForPageToLoad(self, tab, timeout, poll_interval)
# TODO(dtu): Remove this method when no page sets use a click interaction
# with a wait condition. crbug.com/168431
@staticmethod
def WaitForPageToLoad(obj, tab, timeout, poll_interval=0.1):
"""Waits for various wait conditions present in obj."""
if hasattr(obj, 'wait_seconds'):
time.sleep(obj.wait_seconds)
if hasattr(obj, 'wait_for_element_with_text'):
callback_code = 'function(element) { return element != null; }'
util.WaitFor(
lambda: util.FindElementAndPerformAction(
tab, obj.wait_for_element_with_text, callback_code),
timeout, poll_interval)
if hasattr(obj, 'wait_for_element_with_selector'):
util.WaitFor(lambda: tab.EvaluateJavaScript(
'document.querySelector(\'' + obj.wait_for_element_with_selector +
'\') != null'), timeout, poll_interval)
if hasattr(obj, 'post_navigate_javascript_to_execute'):
tab.EvaluateJavaScript(obj.post_navigate_javascript_to_execute)
if hasattr(obj, 'wait_for_javascript_expression'):
util.WaitFor(
lambda: tab.EvaluateJavaScript(obj.wait_for_javascript_expression),
timeout, poll_interval)