blob: 701417215068b5c4709c7a4f6b2eb9d952c1da94 [file] [log] [blame]
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
import os
import subprocess
import sys
import shutil
import re
import json
import tarfile
import zipfile
from . import package_root, build_root, other_packages
from ._pep425 import _pep425tags, _pep425_implementation
if sys.version_info < (3,):
str_cls = unicode # noqa
else:
str_cls = str
def run():
"""
Installs required development dependencies. Uses git to checkout other
modularcrypto repos for more accurate coverage data.
"""
deps_dir = os.path.join(build_root, 'modularcrypto-deps')
if os.path.exists(deps_dir):
shutil.rmtree(deps_dir, ignore_errors=True)
os.mkdir(deps_dir)
try:
print("Staging ci dependencies")
_stage_requirements(deps_dir, os.path.join(package_root, 'requires', 'ci'))
print("Checking out modularcrypto packages for coverage")
for other_package in other_packages:
pkg_url = 'https://github.com/wbond/%s.git' % other_package
pkg_dir = os.path.join(build_root, other_package)
if os.path.exists(pkg_dir):
print("%s is already present" % other_package)
continue
print("Cloning %s" % pkg_url)
_execute(['git', 'clone', pkg_url], build_root)
print()
except (Exception):
if os.path.exists(deps_dir):
shutil.rmtree(deps_dir, ignore_errors=True)
raise
return True
def _download(url, dest):
"""
Downloads a URL to a directory
:param url:
The URL to download
:param dest:
The path to the directory to save the file in
:return:
The filesystem path to the saved file
"""
print('Downloading %s' % url)
filename = os.path.basename(url)
dest_path = os.path.join(dest, filename)
if sys.platform == 'win32':
powershell_exe = os.path.join('system32\\WindowsPowerShell\\v1.0\\powershell.exe')
code = "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12;"
code += "(New-Object Net.WebClient).DownloadFile('%s', '%s');" % (url, dest_path)
_execute([powershell_exe, '-Command', code], dest, 'Unable to connect to')
else:
_execute(
['curl', '-L', '--silent', '--show-error', '-O', url],
dest,
'Failed to connect to'
)
return dest_path
def _tuple_from_ver(version_string):
"""
:param version_string:
A unicode dotted version string
:return:
A tuple of integers
"""
return tuple(map(int, version_string.split('.')))
def _open_archive(path):
"""
:param path:
A unicode string of the filesystem path to the archive
:return:
An archive object
"""
if path.endswith('.zip'):
return zipfile.ZipFile(path, 'r')
return tarfile.open(path, 'r')
def _list_archive_members(archive):
"""
:param archive:
An archive from _open_archive()
:return:
A list of info objects to be used with _info_name() and _extract_info()
"""
if isinstance(archive, zipfile.ZipFile):
return archive.infolist()
return archive.getmembers()
def _archive_single_dir(archive):
"""
Check if all members of the archive are in a single top-level directory
:param archive:
An archive from _open_archive()
:return:
None if not a single top level directory in archive, otherwise a
unicode string of the top level directory name
"""
common_root = None
for info in _list_archive_members(archive):
fn = _info_name(info)
if fn in set(['.', '/']):
continue
sep = None
if '/' in fn:
sep = '/'
elif '\\' in fn:
sep = '\\'
if sep is None:
root_dir = fn
else:
root_dir, _ = fn.split(sep, 1)
if common_root is None:
common_root = root_dir
else:
if common_root != root_dir:
return None
return common_root
def _info_name(info):
"""
Returns a normalized file path for an archive info object
:param info:
An info object from _list_archive_members()
:return:
A unicode string with all directory separators normalized to "/"
"""
if isinstance(info, zipfile.ZipInfo):
return info.filename.replace('\\', '/')
return info.name.replace('\\', '/')
def _extract_info(archive, info):
"""
Extracts the contents of an archive info object
;param archive:
An archive from _open_archive()
:param info:
An info object from _list_archive_members()
:return:
None, or a byte string of the file contents
"""
if isinstance(archive, zipfile.ZipFile):
fn = info.filename
is_dir = fn.endswith('/') or fn.endswith('\\')
out = archive.read(info)
if is_dir and out == b'':
return None
return out
info_file = archive.extractfile(info)
if info_file:
return info_file.read()
return None
def _extract_package(deps_dir, pkg_path, pkg_dir):
"""
Extract a .whl, .zip, .tar.gz or .tar.bz2 into a package path to
use when running CI tasks
:param deps_dir:
A unicode string of the directory the package should be extracted to
:param pkg_path:
A unicode string of the path to the archive
:param pkg_dir:
If running setup.py, change to this dir first - a unicode string
"""
if pkg_path.endswith('.exe'):
try:
zf = None
zf = zipfile.ZipFile(pkg_path, 'r')
# Exes have a PLATLIB folder containing everything we want
for zi in zf.infolist():
if not zi.filename.startswith('PLATLIB'):
continue
data = _extract_info(zf, zi)
if data is not None:
dst_path = os.path.join(deps_dir, zi.filename[8:])
dst_dir = os.path.dirname(dst_path)
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
with open(dst_path, 'wb') as f:
f.write(data)
finally:
if zf:
zf.close()
return
if pkg_path.endswith('.whl'):
try:
zf = None
zf = zipfile.ZipFile(pkg_path, 'r')
# Wheels contain exactly what we need and nothing else
zf.extractall(deps_dir)
finally:
if zf:
zf.close()
return
# Source archives may contain a bunch of other things, including mutliple
# packages, so we must use setup.py/setuptool to install/extract it
ar = None
staging_dir = os.path.join(deps_dir, '_staging')
try:
ar = _open_archive(pkg_path)
common_root = _archive_single_dir(ar)
members = []
for info in _list_archive_members(ar):
dst_rel_path = _info_name(info)
if common_root is not None:
dst_rel_path = dst_rel_path[len(common_root) + 1:]
members.append((info, dst_rel_path))
if not os.path.exists(staging_dir):
os.makedirs(staging_dir)
for info, rel_path in members:
info_data = _extract_info(ar, info)
# Dirs won't return a file
if info_data is not None:
dst_path = os.path.join(staging_dir, rel_path)
dst_dir = os.path.dirname(dst_path)
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
with open(dst_path, 'wb') as f:
f.write(info_data)
setup_dir = staging_dir
if pkg_dir:
setup_dir = os.path.join(staging_dir, pkg_dir)
root = os.path.abspath(os.path.join(deps_dir, '..'))
install_lib = os.path.basename(deps_dir)
_execute(
[
sys.executable,
'setup.py',
'install',
'--root=%s' % root,
'--install-lib=%s' % install_lib,
'--no-compile'
],
setup_dir
)
finally:
if ar:
ar.close()
if staging_dir:
shutil.rmtree(staging_dir)
def _stage_requirements(deps_dir, path):
"""
Installs requirements without using Python to download, since
different services are limiting to TLS 1.2, and older version of
Python do not support that
:param deps_dir:
A unicode path to a temporary diretory to use for downloads
:param path:
A unicode filesystem path to a requirements file
"""
valid_tags = _pep425tags()
exe_suffix = None
if sys.platform == 'win32' and _pep425_implementation() == 'cp':
win_arch = 'win32' if sys.maxsize == 2147483647 else 'win-amd64'
version_info = sys.version_info
exe_suffix = '.%s-py%d.%d.exe' % (win_arch, version_info[0], version_info[1])
packages = _parse_requires(path)
for p in packages:
pkg = p['pkg']
pkg_sub_dir = None
if p['type'] == 'url':
anchor = None
if '#' in pkg:
pkg, anchor = pkg.split('#', 1)
if '&' in anchor:
parts = anchor.split('&')
else:
parts = [anchor]
for part in parts:
param, value = part.split('=')
if param == 'subdirectory':
pkg_sub_dir = value
if pkg.endswith('.zip') or pkg.endswith('.tar.gz') or pkg.endswith('.tar.bz2') or pkg.endswith('.whl'):
url = pkg
else:
raise Exception('Unable to install package from URL that is not an archive')
else:
pypi_json_url = 'https://pypi.org/pypi/%s/json' % pkg
json_dest = _download(pypi_json_url, deps_dir)
with open(json_dest, 'rb') as f:
pkg_info = json.loads(f.read().decode('utf-8'))
if os.path.exists(json_dest):
os.remove(json_dest)
latest = pkg_info['info']['version']
if p['type'] == '>=':
if _tuple_from_ver(p['ver']) > _tuple_from_ver(latest):
raise Exception('Unable to find version %s of %s, newest is %s' % (p['ver'], pkg, latest))
version = latest
elif p['type'] == '==':
if p['ver'] not in pkg_info['releases']:
raise Exception('Unable to find version %s of %s' % (p['ver'], pkg))
version = p['ver']
else:
version = latest
wheels = {}
whl = None
tar_bz2 = None
tar_gz = None
exe = None
for download in pkg_info['releases'][version]:
if exe_suffix and download['url'].endswith(exe_suffix):
exe = download['url']
if download['url'].endswith('.whl'):
parts = os.path.basename(download['url']).split('-')
tag_impl = parts[-3]
tag_abi = parts[-2]
tag_arch = parts[-1].split('.')[0]
wheels[(tag_impl, tag_abi, tag_arch)] = download['url']
if download['url'].endswith('.tar.bz2'):
tar_bz2 = download['url']
if download['url'].endswith('.tar.gz'):
tar_gz = download['url']
# Find the most-specific wheel possible
for tag in valid_tags:
if tag in wheels:
whl = wheels[tag]
break
if exe_suffix and exe:
url = exe
elif whl:
url = whl
elif tar_bz2:
url = tar_bz2
elif tar_gz:
url = tar_gz
else:
raise Exception('Unable to find suitable download for %s' % pkg)
local_path = _download(url, deps_dir)
_extract_package(deps_dir, local_path, pkg_sub_dir)
os.remove(local_path)
def _parse_requires(path):
"""
Does basic parsing of pip requirements files, to allow for
using something other than Python to do actual TLS requests
:param path:
A path to a requirements file
:return:
A list of dict objects containing the keys:
- 'type' ('any', 'url', '==', '>=')
- 'pkg'
- 'ver' (if 'type' == '==' or 'type' == '>=')
"""
python_version = '.'.join(map(str_cls, sys.version_info[0:2]))
sys_platform = sys.platform
packages = []
with open(path, 'rb') as f:
contents = f.read().decode('utf-8')
for line in re.split(r'\r?\n', contents):
line = line.strip()
if not len(line):
continue
if re.match(r'^\s*#', line):
continue
if ';' in line:
package, cond = line.split(';', 1)
package = package.strip()
cond = cond.strip()
cond = cond.replace('sys_platform', repr(sys_platform))
cond = cond.replace('python_version', repr(python_version))
if not eval(cond):
continue
else:
package = line.strip()
if re.match(r'^\s*-r\s*', package):
sub_req_file = re.sub(r'^\s*-r\s*', '', package)
sub_req_file = os.path.abspath(os.path.join(os.path.dirname(path), sub_req_file))
packages.extend(_parse_requires(sub_req_file))
continue
if re.match(r'https?://', package):
packages.append({'type': 'url', 'pkg': package})
continue
if '>=' in package:
parts = package.split('>=')
package = parts[0].strip()
ver = parts[1].strip()
packages.append({'type': '>=', 'pkg': package, 'ver': ver})
continue
if '==' in package:
parts = package.split('==')
package = parts[0].strip()
ver = parts[1].strip()
packages.append({'type': '==', 'pkg': package, 'ver': ver})
continue
if re.search(r'[^ a-zA-Z0-9\-]', package):
raise Exception('Unsupported requirements format version constraint: %s' % package)
packages.append({'type': 'any', 'pkg': package})
return packages
def _execute(params, cwd, retry=None):
"""
Executes a subprocess
:param params:
A list of the executable and arguments to pass to it
:param cwd:
The working directory to execute the command in
:param retry:
If this string is present in stderr, retry the operation
:return:
A 2-element tuple of (stdout, stderr)
"""
proc = subprocess.Popen(
params,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd
)
stdout, stderr = proc.communicate()
code = proc.wait()
if code != 0:
if retry and retry in stderr.decode('utf-8'):
return _execute(params, cwd)
e = OSError('subprocess exit code for "%s" was %d: %s' % (' '.join(params), code, stderr))
e.stdout = stdout
e.stderr = stderr
raise e
return (stdout, stderr)