blob: 0a7345669397b9d3d298812d8e3febc454506cac [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2015 Ian Stapleton Cordasco
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module containing the urlparse compatibility logic."""
from collections import namedtuple
from . import compat
from . import exceptions
from . import misc
from . import normalizers
from . import uri
__all__ = ('ParseResult', 'ParseResultBytes')
PARSED_COMPONENTS = ('scheme', 'userinfo', 'host', 'port', 'path', 'query',
'fragment')
class ParseResultMixin(object):
def _generate_authority(self, attributes):
# I swear I did not align the comparisons below. That's just how they
# happened to align based on pep8 and attribute lengths.
userinfo, host, port = (attributes[p]
for p in ('userinfo', 'host', 'port'))
if (self.userinfo != userinfo or
self.host != host or
self.port != port):
if port:
port = '{0}'.format(port)
return normalizers.normalize_authority(
(compat.to_str(userinfo, self.encoding),
compat.to_str(host, self.encoding),
port)
)
return self.authority
def geturl(self):
"""Shim to match the standard library method."""
return self.unsplit()
@property
def hostname(self):
"""Shim to match the standard library."""
return self.host
@property
def netloc(self):
"""Shim to match the standard library."""
return self.authority
@property
def params(self):
"""Shim to match the standard library."""
return self.query
class ParseResult(namedtuple('ParseResult', PARSED_COMPONENTS),
ParseResultMixin):
"""Implementation of urlparse compatibility class.
This uses the URIReference logic to handle compatibility with the
urlparse.ParseResult class.
"""
slots = ()
def __new__(cls, scheme, userinfo, host, port, path, query, fragment,
uri_ref, encoding='utf-8'):
"""Create a new ParseResult."""
parse_result = super(ParseResult, cls).__new__(
cls,
scheme or None,
userinfo or None,
host,
port or None,
path or None,
query,
fragment)
parse_result.encoding = encoding
parse_result.reference = uri_ref
return parse_result
@classmethod
def from_parts(cls, scheme=None, userinfo=None, host=None, port=None,
path=None, query=None, fragment=None, encoding='utf-8'):
"""Create a ParseResult instance from its parts."""
authority = ''
if userinfo is not None:
authority += userinfo + '@'
if host is not None:
authority += host
if port is not None:
authority += ':{0}'.format(port)
uri_ref = uri.URIReference(scheme=scheme,
authority=authority,
path=path,
query=query,
fragment=fragment,
encoding=encoding).normalize()
userinfo, host, port = authority_from(uri_ref, strict=True)
return cls(scheme=uri_ref.scheme,
userinfo=userinfo,
host=host,
port=port,
path=uri_ref.path,
query=uri_ref.query,
fragment=uri_ref.fragment,
uri_ref=uri_ref,
encoding=encoding)
@classmethod
def from_string(cls, uri_string, encoding='utf-8', strict=True,
lazy_normalize=True):
"""Parse a URI from the given unicode URI string.
:param str uri_string: Unicode URI to be parsed into a reference.
:param str encoding: The encoding of the string provided
:param bool strict: Parse strictly according to :rfc:`3986` if True.
If False, parse similarly to the standard library's urlparse
function.
:returns: :class:`ParseResult` or subclass thereof
"""
reference = uri.URIReference.from_string(uri_string, encoding)
if not lazy_normalize:
reference = reference.normalize()
userinfo, host, port = authority_from(reference, strict)
return cls(scheme=reference.scheme,
userinfo=userinfo,
host=host,
port=port,
path=reference.path,
query=reference.query,
fragment=reference.fragment,
uri_ref=reference,
encoding=encoding)
@property
def authority(self):
"""Return the normalized authority."""
return self.reference.authority
def copy_with(self, scheme=misc.UseExisting, userinfo=misc.UseExisting,
host=misc.UseExisting, port=misc.UseExisting,
path=misc.UseExisting, query=misc.UseExisting,
fragment=misc.UseExisting):
"""Create a copy of this instance replacing with specified parts."""
attributes = zip(PARSED_COMPONENTS,
(scheme, userinfo, host, port, path, query, fragment))
attrs_dict = {}
for name, value in attributes:
if value is misc.UseExisting:
value = getattr(self, name)
attrs_dict[name] = value
authority = self._generate_authority(attrs_dict)
ref = self.reference.copy_with(scheme=attrs_dict['scheme'],
authority=authority,
path=attrs_dict['path'],
query=attrs_dict['query'],
fragment=attrs_dict['fragment'])
return ParseResult(uri_ref=ref, encoding=self.encoding, **attrs_dict)
def encode(self, encoding=None):
"""Convert to an instance of ParseResultBytes."""
encoding = encoding or self.encoding
attrs = dict(
zip(PARSED_COMPONENTS,
(attr.encode(encoding) if hasattr(attr, 'encode') else attr
for attr in self)))
return ParseResultBytes(
uri_ref=self.reference,
encoding=encoding,
**attrs
)
def unsplit(self, use_idna=False):
"""Create a URI string from the components.
:returns: The parsed URI reconstituted as a string.
:rtype: str
"""
parse_result = self
if use_idna and self.host:
hostbytes = self.host.encode('idna')
host = hostbytes.decode(self.encoding)
parse_result = self.copy_with(host=host)
return parse_result.reference.unsplit()
class ParseResultBytes(namedtuple('ParseResultBytes', PARSED_COMPONENTS),
ParseResultMixin):
"""Compatibility shim for the urlparse.ParseResultBytes object."""
def __new__(cls, scheme, userinfo, host, port, path, query, fragment,
uri_ref, encoding='utf-8', lazy_normalize=True):
"""Create a new ParseResultBytes instance."""
parse_result = super(ParseResultBytes, cls).__new__(
cls,
scheme or None,
userinfo or None,
host,
port or None,
path or None,
query or None,
fragment or None)
parse_result.encoding = encoding
parse_result.reference = uri_ref
parse_result.lazy_normalize = lazy_normalize
return parse_result
@classmethod
def from_parts(cls, scheme=None, userinfo=None, host=None, port=None,
path=None, query=None, fragment=None, encoding='utf-8',
lazy_normalize=True):
"""Create a ParseResult instance from its parts."""
authority = ''
if userinfo is not None:
authority += userinfo + '@'
if host is not None:
authority += host
if port is not None:
authority += ':{0}'.format(int(port))
uri_ref = uri.URIReference(scheme=scheme,
authority=authority,
path=path,
query=query,
fragment=fragment,
encoding=encoding)
if not lazy_normalize:
uri_ref = uri_ref.normalize()
to_bytes = compat.to_bytes
userinfo, host, port = authority_from(uri_ref, strict=True)
return cls(scheme=to_bytes(scheme, encoding),
userinfo=to_bytes(userinfo, encoding),
host=to_bytes(host, encoding),
port=port,
path=to_bytes(path, encoding),
query=to_bytes(query, encoding),
fragment=to_bytes(fragment, encoding),
uri_ref=uri_ref,
encoding=encoding,
lazy_normalize=lazy_normalize)
@classmethod
def from_string(cls, uri_string, encoding='utf-8', strict=True,
lazy_normalize=True):
"""Parse a URI from the given unicode URI string.
:param str uri_string: Unicode URI to be parsed into a reference.
:param str encoding: The encoding of the string provided
:param bool strict: Parse strictly according to :rfc:`3986` if True.
If False, parse similarly to the standard library's urlparse
function.
:returns: :class:`ParseResultBytes` or subclass thereof
"""
reference = uri.URIReference.from_string(uri_string, encoding)
if not lazy_normalize:
reference = reference.normalize()
userinfo, host, port = authority_from(reference, strict)
to_bytes = compat.to_bytes
return cls(scheme=to_bytes(reference.scheme, encoding),
userinfo=to_bytes(userinfo, encoding),
host=to_bytes(host, encoding),
port=port,
path=to_bytes(reference.path, encoding),
query=to_bytes(reference.query, encoding),
fragment=to_bytes(reference.fragment, encoding),
uri_ref=reference,
encoding=encoding,
lazy_normalize=lazy_normalize)
@property
def authority(self):
"""Return the normalized authority."""
return self.reference.authority.encode(self.encoding)
def copy_with(self, scheme=misc.UseExisting, userinfo=misc.UseExisting,
host=misc.UseExisting, port=misc.UseExisting,
path=misc.UseExisting, query=misc.UseExisting,
fragment=misc.UseExisting, lazy_normalize=True):
"""Create a copy of this instance replacing with specified parts."""
attributes = zip(PARSED_COMPONENTS,
(scheme, userinfo, host, port, path, query, fragment))
attrs_dict = {}
for name, value in attributes:
if value is misc.UseExisting:
value = getattr(self, name)
if not isinstance(value, bytes) and hasattr(value, 'encode'):
value = value.encode(self.encoding)
attrs_dict[name] = value
authority = self._generate_authority(attrs_dict)
to_str = compat.to_str
ref = self.reference.copy_with(
scheme=to_str(attrs_dict['scheme'], self.encoding),
authority=to_str(authority, self.encoding),
path=to_str(attrs_dict['path'], self.encoding),
query=to_str(attrs_dict['query'], self.encoding),
fragment=to_str(attrs_dict['fragment'], self.encoding)
)
if not lazy_normalize:
ref = ref.normalize()
return ParseResultBytes(
uri_ref=ref,
encoding=self.encoding,
lazy_normalize=lazy_normalize,
**attrs_dict
)
def unsplit(self, use_idna=False):
"""Create a URI bytes object from the components.
:returns: The parsed URI reconstituted as a string.
:rtype: bytes
"""
parse_result = self
if use_idna and self.host:
# self.host is bytes, to encode to idna, we need to decode it
# first
host = self.host.decode(self.encoding)
hostbytes = host.encode('idna')
parse_result = self.copy_with(host=hostbytes)
if self.lazy_normalize:
parse_result = parse_result.copy_with(lazy_normalize=False)
uri = parse_result.reference.unsplit()
return uri.encode(self.encoding)
def split_authority(authority):
# Initialize our expected return values
userinfo = host = port = None
# Initialize an extra var we may need to use
extra_host = None
# Set-up rest in case there is no userinfo portion
rest = authority
if '@' in authority:
userinfo, rest = authority.rsplit('@', 1)
# Handle IPv6 host addresses
if rest.startswith('['):
host, rest = rest.split(']', 1)
host += ']'
if ':' in rest:
extra_host, port = rest.split(':', 1)
elif not host and rest:
host = rest
if extra_host and not host:
host = extra_host
return userinfo, host, port
def authority_from(reference, strict):
try:
subauthority = reference.authority_info()
except exceptions.InvalidAuthority:
if strict:
raise
userinfo, host, port = split_authority(reference.authority)
else:
# Thanks to Richard Barrell for this idea:
# https://twitter.com/0x2ba22e11/status/617338811975139328
userinfo, host, port = (subauthority.get(p)
for p in ('userinfo', 'host', 'port'))
if port:
try:
port = int(port)
except ValueError:
raise exceptions.InvalidPort(port)
return userinfo, host, port