| from __future__ import absolute_import |
| import errno |
| import warnings |
| import hmac |
| import sys |
| |
| from binascii import hexlify, unhexlify |
| from hashlib import md5, sha1, sha256 |
| |
| from .url import IPV4_RE, BRACELESS_IPV6_ADDRZ_RE |
| from ..exceptions import SSLError, InsecurePlatformWarning, SNIMissingWarning |
| from ..packages import six |
| |
| |
| SSLContext = None |
| HAS_SNI = False |
| IS_PYOPENSSL = False |
| IS_SECURETRANSPORT = False |
| |
| # Maps the length of a digest to a possible hash function producing this digest |
| HASHFUNC_MAP = {32: md5, 40: sha1, 64: sha256} |
| |
| |
| def _const_compare_digest_backport(a, b): |
| """ |
| Compare two digests of equal length in constant time. |
| |
| The digests must be of type str/bytes. |
| Returns True if the digests match, and False otherwise. |
| """ |
| result = abs(len(a) - len(b)) |
| for l, r in zip(bytearray(a), bytearray(b)): |
| result |= l ^ r |
| return result == 0 |
| |
| |
| _const_compare_digest = getattr(hmac, "compare_digest", _const_compare_digest_backport) |
| |
| try: # Test for SSL features |
| import ssl |
| from ssl import wrap_socket, CERT_REQUIRED |
| from ssl import HAS_SNI # Has SNI? |
| except ImportError: |
| pass |
| |
| try: # Platform-specific: Python 3.6 |
| from ssl import PROTOCOL_TLS |
| |
| PROTOCOL_SSLv23 = PROTOCOL_TLS |
| except ImportError: |
| try: |
| from ssl import PROTOCOL_SSLv23 as PROTOCOL_TLS |
| |
| PROTOCOL_SSLv23 = PROTOCOL_TLS |
| except ImportError: |
| PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 |
| |
| |
| try: |
| from ssl import OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION |
| except ImportError: |
| OP_NO_SSLv2, OP_NO_SSLv3 = 0x1000000, 0x2000000 |
| OP_NO_COMPRESSION = 0x20000 |
| |
| |
| # A secure default. |
| # Sources for more information on TLS ciphers: |
| # |
| # - https://wiki.mozilla.org/Security/Server_Side_TLS |
| # - https://www.ssllabs.com/projects/best-practices/index.html |
| # - https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/ |
| # |
| # The general intent is: |
| # - prefer cipher suites that offer perfect forward secrecy (DHE/ECDHE), |
| # - prefer ECDHE over DHE for better performance, |
| # - prefer any AES-GCM and ChaCha20 over any AES-CBC for better performance and |
| # security, |
| # - prefer AES-GCM over ChaCha20 because hardware-accelerated AES is common, |
| # - disable NULL authentication, MD5 MACs, DSS, and other |
| # insecure ciphers for security reasons. |
| # - NOTE: TLS 1.3 cipher suites are managed through a different interface |
| # not exposed by CPython (yet!) and are enabled by default if they're available. |
| DEFAULT_CIPHERS = ":".join( |
| [ |
| "ECDHE+AESGCM", |
| "ECDHE+CHACHA20", |
| "DHE+AESGCM", |
| "DHE+CHACHA20", |
| "ECDH+AESGCM", |
| "DH+AESGCM", |
| "ECDH+AES", |
| "DH+AES", |
| "RSA+AESGCM", |
| "RSA+AES", |
| "!aNULL", |
| "!eNULL", |
| "!MD5", |
| "!DSS", |
| ] |
| ) |
| |
| try: |
| from ssl import SSLContext # Modern SSL? |
| except ImportError: |
| |
| class SSLContext(object): # Platform-specific: Python 2 |
| def __init__(self, protocol_version): |
| self.protocol = protocol_version |
| # Use default values from a real SSLContext |
| self.check_hostname = False |
| self.verify_mode = ssl.CERT_NONE |
| self.ca_certs = None |
| self.options = 0 |
| self.certfile = None |
| self.keyfile = None |
| self.ciphers = None |
| |
| def load_cert_chain(self, certfile, keyfile): |
| self.certfile = certfile |
| self.keyfile = keyfile |
| |
| def load_verify_locations(self, cafile=None, capath=None, cadata=None): |
| self.ca_certs = cafile |
| |
| if capath is not None: |
| raise SSLError("CA directories not supported in older Pythons") |
| |
| if cadata is not None: |
| raise SSLError("CA data not supported in older Pythons") |
| |
| def set_ciphers(self, cipher_suite): |
| self.ciphers = cipher_suite |
| |
| def wrap_socket(self, socket, server_hostname=None, server_side=False): |
| warnings.warn( |
| "A true SSLContext object is not available. This prevents " |
| "urllib3 from configuring SSL appropriately and may cause " |
| "certain SSL connections to fail. You can upgrade to a newer " |
| "version of Python to solve this. For more information, see " |
| "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" |
| "#ssl-warnings", |
| InsecurePlatformWarning, |
| ) |
| kwargs = { |
| "keyfile": self.keyfile, |
| "certfile": self.certfile, |
| "ca_certs": self.ca_certs, |
| "cert_reqs": self.verify_mode, |
| "ssl_version": self.protocol, |
| "server_side": server_side, |
| } |
| return wrap_socket(socket, ciphers=self.ciphers, **kwargs) |
| |
| |
| def assert_fingerprint(cert, fingerprint): |
| """ |
| Checks if given fingerprint matches the supplied certificate. |
| |
| :param cert: |
| Certificate as bytes object. |
| :param fingerprint: |
| Fingerprint as string of hexdigits, can be interspersed by colons. |
| """ |
| |
| fingerprint = fingerprint.replace(":", "").lower() |
| digest_length = len(fingerprint) |
| hashfunc = HASHFUNC_MAP.get(digest_length) |
| if not hashfunc: |
| raise SSLError("Fingerprint of invalid length: {0}".format(fingerprint)) |
| |
| # We need encode() here for py32; works on py2 and p33. |
| fingerprint_bytes = unhexlify(fingerprint.encode()) |
| |
| cert_digest = hashfunc(cert).digest() |
| |
| if not _const_compare_digest(cert_digest, fingerprint_bytes): |
| raise SSLError( |
| 'Fingerprints did not match. Expected "{0}", got "{1}".'.format( |
| fingerprint, hexlify(cert_digest) |
| ) |
| ) |
| |
| |
| def resolve_cert_reqs(candidate): |
| """ |
| Resolves the argument to a numeric constant, which can be passed to |
| the wrap_socket function/method from the ssl module. |
| Defaults to :data:`ssl.CERT_REQUIRED`. |
| If given a string it is assumed to be the name of the constant in the |
| :mod:`ssl` module or its abbreviation. |
| (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. |
| If it's neither `None` nor a string we assume it is already the numeric |
| constant which can directly be passed to wrap_socket. |
| """ |
| if candidate is None: |
| return CERT_REQUIRED |
| |
| if isinstance(candidate, str): |
| res = getattr(ssl, candidate, None) |
| if res is None: |
| res = getattr(ssl, "CERT_" + candidate) |
| return res |
| |
| return candidate |
| |
| |
| def resolve_ssl_version(candidate): |
| """ |
| like resolve_cert_reqs |
| """ |
| if candidate is None: |
| return PROTOCOL_TLS |
| |
| if isinstance(candidate, str): |
| res = getattr(ssl, candidate, None) |
| if res is None: |
| res = getattr(ssl, "PROTOCOL_" + candidate) |
| return res |
| |
| return candidate |
| |
| |
| def create_urllib3_context( |
| ssl_version=None, cert_reqs=None, options=None, ciphers=None |
| ): |
| """All arguments have the same meaning as ``ssl_wrap_socket``. |
| |
| By default, this function does a lot of the same work that |
| ``ssl.create_default_context`` does on Python 3.4+. It: |
| |
| - Disables SSLv2, SSLv3, and compression |
| - Sets a restricted set of server ciphers |
| |
| If you wish to enable SSLv3, you can do:: |
| |
| from pip._vendor.urllib3.util import ssl_ |
| context = ssl_.create_urllib3_context() |
| context.options &= ~ssl_.OP_NO_SSLv3 |
| |
| You can do the same to enable compression (substituting ``COMPRESSION`` |
| for ``SSLv3`` in the last line above). |
| |
| :param ssl_version: |
| The desired protocol version to use. This will default to |
| PROTOCOL_SSLv23 which will negotiate the highest protocol that both |
| the server and your installation of OpenSSL support. |
| :param cert_reqs: |
| Whether to require the certificate verification. This defaults to |
| ``ssl.CERT_REQUIRED``. |
| :param options: |
| Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, |
| ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``. |
| :param ciphers: |
| Which cipher suites to allow the server to select. |
| :returns: |
| Constructed SSLContext object with specified options |
| :rtype: SSLContext |
| """ |
| context = SSLContext(ssl_version or PROTOCOL_TLS) |
| |
| context.set_ciphers(ciphers or DEFAULT_CIPHERS) |
| |
| # Setting the default here, as we may have no ssl module on import |
| cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs |
| |
| if options is None: |
| options = 0 |
| # SSLv2 is easily broken and is considered harmful and dangerous |
| options |= OP_NO_SSLv2 |
| # SSLv3 has several problems and is now dangerous |
| options |= OP_NO_SSLv3 |
| # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ |
| # (issue #309) |
| options |= OP_NO_COMPRESSION |
| |
| context.options |= options |
| |
| # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is |
| # necessary for conditional client cert authentication with TLS 1.3. |
| # The attribute is None for OpenSSL <= 1.1.0 or does not exist in older |
| # versions of Python. We only enable on Python 3.7.4+ or if certificate |
| # verification is enabled to work around Python issue #37428 |
| # See: https://bugs.python.org/issue37428 |
| if (cert_reqs == ssl.CERT_REQUIRED or sys.version_info >= (3, 7, 4)) and getattr( |
| context, "post_handshake_auth", None |
| ) is not None: |
| context.post_handshake_auth = True |
| |
| context.verify_mode = cert_reqs |
| if ( |
| getattr(context, "check_hostname", None) is not None |
| ): # Platform-specific: Python 3.2 |
| # We do our own verification, including fingerprints and alternative |
| # hostnames. So disable it here |
| context.check_hostname = False |
| return context |
| |
| |
| def ssl_wrap_socket( |
| sock, |
| keyfile=None, |
| certfile=None, |
| cert_reqs=None, |
| ca_certs=None, |
| server_hostname=None, |
| ssl_version=None, |
| ciphers=None, |
| ssl_context=None, |
| ca_cert_dir=None, |
| key_password=None, |
| ca_cert_data=None, |
| ): |
| """ |
| All arguments except for server_hostname, ssl_context, and ca_cert_dir have |
| the same meaning as they do when using :func:`ssl.wrap_socket`. |
| |
| :param server_hostname: |
| When SNI is supported, the expected hostname of the certificate |
| :param ssl_context: |
| A pre-made :class:`SSLContext` object. If none is provided, one will |
| be created using :func:`create_urllib3_context`. |
| :param ciphers: |
| A string of ciphers we wish the client to support. |
| :param ca_cert_dir: |
| A directory containing CA certificates in multiple separate files, as |
| supported by OpenSSL's -CApath flag or the capath argument to |
| SSLContext.load_verify_locations(). |
| :param key_password: |
| Optional password if the keyfile is encrypted. |
| :param ca_cert_data: |
| Optional string containing CA certificates in PEM format suitable for |
| passing as the cadata parameter to SSLContext.load_verify_locations() |
| """ |
| context = ssl_context |
| if context is None: |
| # Note: This branch of code and all the variables in it are no longer |
| # used by urllib3 itself. We should consider deprecating and removing |
| # this code. |
| context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) |
| |
| if ca_certs or ca_cert_dir or ca_cert_data: |
| try: |
| context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) |
| except IOError as e: # Platform-specific: Python 2.7 |
| raise SSLError(e) |
| # Py33 raises FileNotFoundError which subclasses OSError |
| # These are not equivalent unless we check the errno attribute |
| except OSError as e: # Platform-specific: Python 3.3 and beyond |
| if e.errno == errno.ENOENT: |
| raise SSLError(e) |
| raise |
| |
| elif ssl_context is None and hasattr(context, "load_default_certs"): |
| # try to load OS default certs; works well on Windows (require Python3.4+) |
| context.load_default_certs() |
| |
| # Attempt to detect if we get the goofy behavior of the |
| # keyfile being encrypted and OpenSSL asking for the |
| # passphrase via the terminal and instead error out. |
| if keyfile and key_password is None and _is_key_file_encrypted(keyfile): |
| raise SSLError("Client private key is encrypted, password is required") |
| |
| if certfile: |
| if key_password is None: |
| context.load_cert_chain(certfile, keyfile) |
| else: |
| context.load_cert_chain(certfile, keyfile, key_password) |
| |
| # If we detect server_hostname is an IP address then the SNI |
| # extension should not be used according to RFC3546 Section 3.1 |
| # We shouldn't warn the user if SNI isn't available but we would |
| # not be using SNI anyways due to IP address for server_hostname. |
| if ( |
| server_hostname is not None and not is_ipaddress(server_hostname) |
| ) or IS_SECURETRANSPORT: |
| if HAS_SNI and server_hostname is not None: |
| return context.wrap_socket(sock, server_hostname=server_hostname) |
| |
| warnings.warn( |
| "An HTTPS request has been made, but the SNI (Server Name " |
| "Indication) extension to TLS is not available on this platform. " |
| "This may cause the server to present an incorrect TLS " |
| "certificate, which can cause validation failures. You can upgrade to " |
| "a newer version of Python to solve this. For more information, see " |
| "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" |
| "#ssl-warnings", |
| SNIMissingWarning, |
| ) |
| |
| return context.wrap_socket(sock) |
| |
| |
| def is_ipaddress(hostname): |
| """Detects whether the hostname given is an IPv4 or IPv6 address. |
| Also detects IPv6 addresses with Zone IDs. |
| |
| :param str hostname: Hostname to examine. |
| :return: True if the hostname is an IP address, False otherwise. |
| """ |
| if not six.PY2 and isinstance(hostname, bytes): |
| # IDN A-label bytes are ASCII compatible. |
| hostname = hostname.decode("ascii") |
| return bool(IPV4_RE.match(hostname) or BRACELESS_IPV6_ADDRZ_RE.match(hostname)) |
| |
| |
| def _is_key_file_encrypted(key_file): |
| """Detects if a key file is encrypted or not.""" |
| with open(key_file, "r") as f: |
| for line in f: |
| # Look for Proc-Type: 4,ENCRYPTED |
| if "ENCRYPTED" in line: |
| return True |
| |
| return False |