blob: b9a55de917f8abd2b037b125c227debbd4a212f7 [file] [log] [blame]
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
import cgi
import codecs
import coverage
import imp
import json
import os
import unittest
import re
import sys
import tempfile
import time
import platform as _plat
import subprocess
from fnmatch import fnmatch
from . import package_name, package_root, other_packages
if sys.version_info < (3,):
str_cls = unicode # noqa
from urllib2 import URLError
from urllib import urlencode
from io import open
else:
str_cls = str
from urllib.error import URLError
from urllib.parse import urlencode
if sys.version_info < (3, 7):
Pattern = re._pattern_type
else:
Pattern = re.Pattern
def run(ci=False):
"""
Runs the tests while measuring coverage
:param ci:
If coverage is being run in a CI environment - this triggers trying to
run the tests for the rest of modularcrypto and uploading coverage data
:return:
A bool - if the tests ran successfully
"""
xml_report_path = os.path.join(package_root, 'coverage.xml')
if os.path.exists(xml_report_path):
os.unlink(xml_report_path)
cov = coverage.Coverage(include='%s/*.py' % package_name)
cov.start()
from .tests import run as run_tests
result = run_tests()
print()
if ci:
suite = unittest.TestSuite()
loader = unittest.TestLoader()
for other_package in other_packages:
for test_class in _load_package_tests(other_package):
suite.addTest(loader.loadTestsFromTestCase(test_class))
if suite.countTestCases() > 0:
print('Running tests from other modularcrypto packages')
sys.stdout.flush()
runner_result = unittest.TextTestRunner(stream=sys.stdout, verbosity=1).run(suite)
result = runner_result.wasSuccessful() and result
print()
sys.stdout.flush()
cov.stop()
cov.save()
cov.report(show_missing=False)
print()
sys.stdout.flush()
if ci:
cov.xml_report()
if ci and result and os.path.exists(xml_report_path):
_codecov_submit()
print()
return result
def _load_package_tests(name):
"""
Load the test classes from another modularcrypto package
:param name:
A unicode string of the other package name
:return:
A list of unittest.TestCase classes of the tests for the package
"""
package_dir = os.path.join('..', name)
if not os.path.exists(package_dir):
return []
tests_module_info = imp.find_module('tests', [package_dir])
tests_module = imp.load_module('%s.tests' % name, *tests_module_info)
return tests_module.test_classes()
def _env_info():
"""
:return:
A two-element tuple of unicode strings. The first is the name of the
environment, the second the root of the repo. The environment name
will be one of: "ci-travis", "ci-circle", "ci-appveyor",
"ci-github-actions", "local"
"""
if os.getenv('CI') == 'true' and os.getenv('TRAVIS') == 'true':
return ('ci-travis', os.getenv('TRAVIS_BUILD_DIR'))
if os.getenv('CI') == 'True' and os.getenv('APPVEYOR') == 'True':
return ('ci-appveyor', os.getenv('APPVEYOR_BUILD_FOLDER'))
if os.getenv('CI') == 'true' and os.getenv('CIRCLECI') == 'true':
return ('ci-circle', os.getcwdu() if sys.version_info < (3,) else os.getcwd())
if os.getenv('GITHUB_ACTIONS') == 'true':
return ('ci-github-actions', os.getenv('GITHUB_WORKSPACE'))
return ('local', package_root)
def _codecov_submit():
env_name, root = _env_info()
try:
with open(os.path.join(root, 'codecov.json'), 'rb') as f:
json_data = json.loads(f.read().decode('utf-8'))
except (OSError, ValueError, UnicodeDecodeError, KeyError):
print('error reading codecov.json')
return
if json_data.get('disabled'):
return
if env_name == 'ci-travis':
# http://docs.travis-ci.com/user/environment-variables/#Default-Environment-Variables
build_url = 'https://travis-ci.org/%s/jobs/%s' % (os.getenv('TRAVIS_REPO_SLUG'), os.getenv('TRAVIS_JOB_ID'))
query = {
'service': 'travis',
'branch': os.getenv('TRAVIS_BRANCH'),
'build': os.getenv('TRAVIS_JOB_NUMBER'),
'pr': os.getenv('TRAVIS_PULL_REQUEST'),
'job': os.getenv('TRAVIS_JOB_ID'),
'tag': os.getenv('TRAVIS_TAG'),
'slug': os.getenv('TRAVIS_REPO_SLUG'),
'commit': os.getenv('TRAVIS_COMMIT'),
'build_url': build_url,
}
elif env_name == 'ci-appveyor':
# http://www.appveyor.com/docs/environment-variables
build_url = 'https://ci.appveyor.com/project/%s/build/%s' % (
os.getenv('APPVEYOR_REPO_NAME'),
os.getenv('APPVEYOR_BUILD_VERSION')
)
query = {
'service': "appveyor",
'branch': os.getenv('APPVEYOR_REPO_BRANCH'),
'build': os.getenv('APPVEYOR_JOB_ID'),
'pr': os.getenv('APPVEYOR_PULL_REQUEST_NUMBER'),
'job': '/'.join((
os.getenv('APPVEYOR_ACCOUNT_NAME'),
os.getenv('APPVEYOR_PROJECT_SLUG'),
os.getenv('APPVEYOR_BUILD_VERSION')
)),
'tag': os.getenv('APPVEYOR_REPO_TAG_NAME'),
'slug': os.getenv('APPVEYOR_REPO_NAME'),
'commit': os.getenv('APPVEYOR_REPO_COMMIT'),
'build_url': build_url,
}
elif env_name == 'ci-circle':
# https://circleci.com/docs/environment-variables
query = {
'service': 'circleci',
'branch': os.getenv('CIRCLE_BRANCH'),
'build': os.getenv('CIRCLE_BUILD_NUM'),
'pr': os.getenv('CIRCLE_PR_NUMBER'),
'job': os.getenv('CIRCLE_BUILD_NUM') + "." + os.getenv('CIRCLE_NODE_INDEX'),
'tag': os.getenv('CIRCLE_TAG'),
'slug': os.getenv('CIRCLE_PROJECT_USERNAME') + "/" + os.getenv('CIRCLE_PROJECT_REPONAME'),
'commit': os.getenv('CIRCLE_SHA1'),
'build_url': os.getenv('CIRCLE_BUILD_URL'),
}
elif env_name == 'ci-github-actions':
branch = ''
tag = ''
ref = os.getenv('GITHUB_REF', '')
if ref.startswith('refs/tags/'):
tag = ref[10:]
elif ref.startswith('refs/heads/'):
branch = ref[11:]
impl = _plat.python_implementation()
major, minor = _plat.python_version_tuple()[0:2]
build_name = '%s %s %s.%s' % (_platform_name(), impl, major, minor)
query = {
'service': 'custom',
'token': json_data['token'],
'branch': branch,
'tag': tag,
'slug': os.getenv('GITHUB_REPOSITORY'),
'commit': os.getenv('GITHUB_SHA'),
'build_url': 'https://github.com/wbond/oscrypto/commit/%s/checks' % os.getenv('GITHUB_SHA'),
'name': 'GitHub Actions %s on %s' % (build_name, os.getenv('RUNNER_OS'))
}
else:
if not os.path.exists(os.path.join(root, '.git')):
print('git repository not found, not submitting coverage data')
return
git_status = _git_command(['status', '--porcelain'], root)
if git_status != '':
print('git repository has uncommitted changes, not submitting coverage data')
return
branch = _git_command(['rev-parse', '--abbrev-ref', 'HEAD'], root)
commit = _git_command(['rev-parse', '--verify', 'HEAD'], root)
tag = _git_command(['name-rev', '--tags', '--name-only', commit], root)
impl = _plat.python_implementation()
major, minor = _plat.python_version_tuple()[0:2]
build_name = '%s %s %s.%s' % (_platform_name(), impl, major, minor)
query = {
'branch': branch,
'commit': commit,
'slug': json_data['slug'],
'token': json_data['token'],
'build': build_name,
}
if tag != 'undefined':
query['tag'] = tag
payload = 'PLATFORM=%s\n' % _platform_name()
payload += 'PYTHON_VERSION=%s %s\n' % (_plat.python_version(), _plat.python_implementation())
if 'oscrypto' in sys.modules:
payload += 'OSCRYPTO_BACKEND=%s\n' % sys.modules['oscrypto'].backend()
payload += '<<<<<< ENV\n'
for path in _list_files(root):
payload += path + '\n'
payload += '<<<<<< network\n'
payload += '# path=coverage.xml\n'
with open(os.path.join(root, 'coverage.xml'), 'r', encoding='utf-8') as f:
payload += f.read() + '\n'
payload += '<<<<<< EOF\n'
url = 'https://codecov.io/upload/v4'
headers = {
'Accept': 'text/plain'
}
filtered_query = {}
for key in query:
value = query[key]
if value == '' or value is None:
continue
filtered_query[key] = value
print('Submitting coverage info to codecov.io')
info = _do_request(
'POST',
url,
headers,
query_params=filtered_query
)
encoding = info[1] or 'utf-8'
text = info[2].decode(encoding).strip()
parts = text.split()
upload_url = parts[1]
headers = {
'Content-Type': 'text/plain',
'x-amz-acl': 'public-read',
'x-amz-storage-class': 'REDUCED_REDUNDANCY'
}
print('Uploading coverage data to codecov.io S3 bucket')
_do_request(
'PUT',
upload_url,
headers,
data=payload.encode('utf-8')
)
def _git_command(params, cwd):
"""
Executes a git command, returning the output
:param params:
A list of the parameters to pass to git
:param cwd:
The working directory to execute git in
:return:
A 2-element tuple of (stdout, stderr)
"""
proc = subprocess.Popen(
['git'] + params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=cwd
)
stdout, stderr = proc.communicate()
code = proc.wait()
if code != 0:
e = OSError('git exit code was non-zero')
e.stdout = stdout
raise e
return stdout.decode('utf-8').strip()
def _parse_env_var_file(data):
"""
Parses a basic VAR="value data" file contents into a dict
:param data:
A unicode string of the file data
:return:
A dict of parsed name/value data
"""
output = {}
for line in data.splitlines():
line = line.strip()
if not line or '=' not in line:
continue
parts = line.split('=')
if len(parts) != 2:
continue
name = parts[0]
value = parts[1]
if len(value) > 1:
if value[0] == '"' and value[-1] == '"':
value = value[1:-1]
output[name] = value
return output
def _platform_name():
"""
Returns information about the current operating system and version
:return:
A unicode string containing the OS name and version
"""
if sys.platform == 'darwin':
version = _plat.mac_ver()[0]
_plat_ver_info = tuple(map(int, version.split('.')))
if _plat_ver_info < (10, 12):
name = 'OS X'
else:
name = 'macOS'
return '%s %s' % (name, version)
elif sys.platform == 'win32':
_win_ver = sys.getwindowsversion()
_plat_ver_info = (_win_ver[0], _win_ver[1])
return 'Windows %s' % _plat.win32_ver()[0]
elif sys.platform in ['linux', 'linux2']:
if os.path.exists('/etc/os-release'):
with open('/etc/os-release', 'r', encoding='utf-8') as f:
pairs = _parse_env_var_file(f.read())
if 'NAME' in pairs and 'VERSION_ID' in pairs:
return '%s %s' % (pairs['NAME'], pairs['VERSION_ID'])
version = pairs['VERSION_ID']
elif 'PRETTY_NAME' in pairs:
return pairs['PRETTY_NAME']
elif 'NAME' in pairs:
return pairs['NAME']
else:
raise ValueError('No suitable version info found in /etc/os-release')
elif os.path.exists('/etc/lsb-release'):
with open('/etc/lsb-release', 'r', encoding='utf-8') as f:
pairs = _parse_env_var_file(f.read())
if 'DISTRIB_DESCRIPTION' in pairs:
return pairs['DISTRIB_DESCRIPTION']
else:
raise ValueError('No suitable version info found in /etc/lsb-release')
else:
return 'Linux'
else:
return '%s %s' % (_plat.system(), _plat.release())
def _list_files(root):
"""
Lists all of the files in a directory, taking into account any .gitignore
file that is present
:param root:
A unicode filesystem path
:return:
A list of unicode strings, containing paths of all files not ignored
by .gitignore with root, using relative paths
"""
dir_patterns, file_patterns = _gitignore(root)
paths = []
prefix = os.path.abspath(root) + os.sep
for base, dirs, files in os.walk(root):
for d in dirs:
for dir_pattern in dir_patterns:
if fnmatch(d, dir_pattern):
dirs.remove(d)
break
for f in files:
skip = False
for file_pattern in file_patterns:
if fnmatch(f, file_pattern):
skip = True
break
if skip:
continue
full_path = os.path.join(base, f)
if full_path[:len(prefix)] == prefix:
full_path = full_path[len(prefix):]
paths.append(full_path)
return sorted(paths)
def _gitignore(root):
"""
Parses a .gitignore file and returns patterns to match dirs and files.
Only basic gitignore patterns are supported. Pattern negation, ** wildcards
and anchored patterns are not currently implemented.
:param root:
A unicode string of the path to the git repository
:return:
A 2-element tuple:
- 0: a list of unicode strings to match against dirs
- 1: a list of unicode strings to match against dirs and files
"""
gitignore_path = os.path.join(root, '.gitignore')
dir_patterns = ['.git']
file_patterns = []
if not os.path.exists(gitignore_path):
return (dir_patterns, file_patterns)
with open(gitignore_path, 'r', encoding='utf-8') as f:
for line in f.readlines():
line = line.strip()
if not line:
continue
if line.startswith('#'):
continue
if '**' in line:
raise NotImplementedError('gitignore ** wildcards are not implemented')
if line.startswith('!'):
raise NotImplementedError('gitignore pattern negation is not implemented')
if line.startswith('/'):
raise NotImplementedError('gitignore anchored patterns are not implemented')
if line.startswith('\\#'):
line = '#' + line[2:]
if line.startswith('\\!'):
line = '!' + line[2:]
if line.endswith('/'):
dir_patterns.append(line[:-1])
else:
file_patterns.append(line)
return (dir_patterns, file_patterns)
def _do_request(method, url, headers, data=None, query_params=None, timeout=20):
"""
Performs an HTTP request
:param method:
A unicode string of 'POST' or 'PUT'
:param url;
A unicode string of the URL to request
:param headers:
A dict of unicode strings, where keys are header names and values are
the header values.
:param data:
A dict of unicode strings (to be encoded as
application/x-www-form-urlencoded), or a byte string of data.
:param query_params:
A dict of unicode keys and values to pass as query params
:param timeout:
An integer number of seconds to use as the timeout
:return:
A 3-element tuple:
- 0: A unicode string of the response content-type
- 1: A unicode string of the response encoding, or None
- 2: A byte string of the response body
"""
if query_params:
url += '?' + urlencode(query_params).replace('+', '%20')
if isinstance(data, dict):
data_bytes = {}
for key in data:
data_bytes[key.encode('utf-8')] = data[key].encode('utf-8')
data = urlencode(data_bytes)
headers['Content-Type'] = 'application/x-www-form-urlencoded'
if isinstance(data, str_cls):
raise TypeError('data must be a byte string')
try:
tempfd, tempf_path = tempfile.mkstemp('-coverage')
os.write(tempfd, data or b'')
os.close(tempfd)
if sys.platform == 'win32':
powershell_exe = os.path.join('system32\\WindowsPowerShell\\v1.0\\powershell.exe')
code = "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12;"
code += "$wc = New-Object Net.WebClient;"
for key in headers:
code += "$wc.Headers.add('%s','%s');" % (key, headers[key])
code += "$out = $wc.UploadFile('%s', '%s', '%s');" % (url, method, tempf_path)
code += "[System.Text.Encoding]::GetEncoding('ISO-8859-1').GetString($wc.ResponseHeaders.ToByteArray())"
# To properly obtain bytes, we use BitConverter to get hex dash
# encoding (e.g. AE-09-3F) and they decode in python
code += " + [System.BitConverter]::ToString($out);"
stdout, stderr = _execute(
[powershell_exe, '-Command', code],
os.getcwd(),
re.compile(r'Unable to connect to|TLS|Internal Server Error'),
6
)
if stdout[-2:] == b'\r\n' and b'\r\n\r\n' in stdout:
# An extra trailing crlf is added at the end by powershell
stdout = stdout[0:-2]
parts = stdout.split(b'\r\n\r\n', 1)
if len(parts) == 2:
stdout = parts[0] + b'\r\n\r\n' + codecs.decode(parts[1].replace(b'-', b''), 'hex_codec')
else:
args = [
'curl',
'--request',
method,
'--location',
'--silent',
'--show-error',
'--include',
# Prevent curl from asking for an HTTP "100 Continue" response
'--header', 'Expect:'
]
for key in headers:
args.append('--header')
args.append("%s: %s" % (key, headers[key]))
args.append('--data-binary')
args.append('@%s' % tempf_path)
args.append(url)
stdout, stderr = _execute(
args,
os.getcwd(),
re.compile(r'Failed to connect to|TLS|SSLRead|outstanding|cleanly'),
6
)
finally:
if tempf_path and os.path.exists(tempf_path):
os.remove(tempf_path)
if len(stderr) > 0:
raise URLError("Error %sing %s:\n%s" % (method, url, stderr))
parts = stdout.split(b'\r\n\r\n', 1)
if len(parts) != 2:
raise URLError("Error %sing %s, response data malformed:\n%s" % (method, url, stdout))
header_block, body = parts
content_type_header = None
content_len_header = None
for hline in header_block.decode('iso-8859-1').splitlines():
hline_parts = hline.split(':', 1)
if len(hline_parts) != 2:
continue
name, val = hline_parts
name = name.strip().lower()
val = val.strip()
if name == 'content-type':
content_type_header = val
if name == 'content-length':
content_len_header = val
if content_type_header is None and content_len_header != '0':
raise URLError("Error %sing %s, no content-type header:\n%s" % (method, url, stdout))
if content_type_header is None:
content_type = 'text/plain'
encoding = 'utf-8'
else:
content_type, params = cgi.parse_header(content_type_header)
encoding = params.get('charset')
return (content_type, encoding, body)
def _execute(params, cwd, retry=None, retries=0):
"""
Executes a subprocess
:param params:
A list of the executable and arguments to pass to it
:param cwd:
The working directory to execute the command in
:param retry:
If this string is present in stderr, or regex pattern matches stderr, retry the operation
:param retries:
An integer number of times to retry
:return:
A 2-element tuple of (stdout, stderr)
"""
proc = subprocess.Popen(
params,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd
)
stdout, stderr = proc.communicate()
code = proc.wait()
if code != 0:
if retry and retries > 0:
stderr_str = stderr.decode('utf-8')
if isinstance(retry, Pattern):
if retry.search(stderr_str) is not None:
time.sleep(5)
return _execute(params, cwd, retry, retries - 1)
elif retry in stderr_str:
time.sleep(5)
return _execute(params, cwd, retry, retries - 1)
e = OSError('subprocess exit code for "%s" was %d: %s' % (' '.join(params), code, stderr))
e.stdout = stdout
e.stderr = stderr
raise e
return (stdout, stderr)
if __name__ == '__main__':
_codecov_submit()