| # -*- test-case-name: twisted.test.test_compat -*- |
| # |
| # Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
| # See LICENSE for details. |
| |
| |
| """ |
| Compatibility module to provide backwards compatibility for useful Python |
| features. |
| |
| This is mainly for use of internal Twisted code. We encourage you to use |
| the latest version of Python directly from your code, if possible. |
| """ |
| |
| import sys, string, socket, struct |
| |
| def inet_pton(af, addr): |
| if af == socket.AF_INET: |
| return socket.inet_aton(addr) |
| elif af == getattr(socket, 'AF_INET6', 'AF_INET6'): |
| if [x for x in addr if x not in string.hexdigits + ':.']: |
| raise ValueError("Illegal characters: %r" % (''.join(x),)) |
| |
| parts = addr.split(':') |
| elided = parts.count('') |
| ipv4Component = '.' in parts[-1] |
| |
| if len(parts) > (8 - ipv4Component) or elided > 3: |
| raise ValueError("Syntactically invalid address") |
| |
| if elided == 3: |
| return '\x00' * 16 |
| |
| if elided: |
| zeros = ['0'] * (8 - len(parts) - ipv4Component + elided) |
| |
| if addr.startswith('::'): |
| parts[:2] = zeros |
| elif addr.endswith('::'): |
| parts[-2:] = zeros |
| else: |
| idx = parts.index('') |
| parts[idx:idx+1] = zeros |
| |
| if len(parts) != 8 - ipv4Component: |
| raise ValueError("Syntactically invalid address") |
| else: |
| if len(parts) != (8 - ipv4Component): |
| raise ValueError("Syntactically invalid address") |
| |
| if ipv4Component: |
| if parts[-1].count('.') != 3: |
| raise ValueError("Syntactically invalid address") |
| rawipv4 = socket.inet_aton(parts[-1]) |
| unpackedipv4 = struct.unpack('!HH', rawipv4) |
| parts[-1:] = [hex(x)[2:] for x in unpackedipv4] |
| |
| parts = [int(x, 16) for x in parts] |
| return struct.pack('!8H', *parts) |
| else: |
| raise socket.error(97, 'Address family not supported by protocol') |
| |
| def inet_ntop(af, addr): |
| if af == socket.AF_INET: |
| return socket.inet_ntoa(addr) |
| elif af == socket.AF_INET6: |
| if len(addr) != 16: |
| raise ValueError("address length incorrect") |
| parts = struct.unpack('!8H', addr) |
| curBase = bestBase = None |
| for i in range(8): |
| if not parts[i]: |
| if curBase is None: |
| curBase = i |
| curLen = 0 |
| curLen += 1 |
| else: |
| if curBase is not None: |
| if bestBase is None or curLen > bestLen: |
| bestBase = curBase |
| bestLen = curLen |
| curBase = None |
| if curBase is not None and (bestBase is None or curLen > bestLen): |
| bestBase = curBase |
| bestLen = curLen |
| parts = [hex(x)[2:] for x in parts] |
| if bestBase is not None: |
| parts[bestBase:bestBase + bestLen] = [''] |
| if parts[0] == '': |
| parts.insert(0, '') |
| if parts[-1] == '': |
| parts.insert(len(parts) - 1, '') |
| return ':'.join(parts) |
| else: |
| raise socket.error(97, 'Address family not supported by protocol') |
| |
| try: |
| socket.inet_pton(socket.AF_INET6, "::") |
| except (AttributeError, NameError, socket.error): |
| socket.inet_pton = inet_pton |
| socket.inet_ntop = inet_ntop |
| socket.AF_INET6 = 'AF_INET6' |
| |
| adict = dict |
| |
| # OpenSSL/__init__.py imports OpenSSL.tsafe. OpenSSL/tsafe.py imports |
| # threading. threading imports thread. All to make this stupid threadsafe |
| # version of its Connection class. We don't even care about threadsafe |
| # Connections. In the interest of not screwing over some crazy person |
| # calling into OpenSSL from another thread and trying to use Twisted's SSL |
| # support, we don't totally destroy OpenSSL.tsafe, but we will replace it |
| # with our own version which imports threading as late as possible. |
| |
| class tsafe(object): |
| class Connection: |
| """ |
| OpenSSL.tsafe.Connection, defined in such a way as to not blow. |
| """ |
| __module__ = 'OpenSSL.tsafe' |
| |
| def __init__(self, *args): |
| from OpenSSL import SSL as _ssl |
| self._ssl_conn = apply(_ssl.Connection, args) |
| from threading import _RLock |
| self._lock = _RLock() |
| |
| for f in ('get_context', 'pending', 'send', 'write', 'recv', |
| 'read', 'renegotiate', 'bind', 'listen', 'connect', |
| 'accept', 'setblocking', 'fileno', 'shutdown', |
| 'close', 'get_cipher_list', 'getpeername', |
| 'getsockname', 'getsockopt', 'setsockopt', |
| 'makefile', 'get_app_data', 'set_app_data', |
| 'state_string', 'sock_shutdown', |
| 'get_peer_certificate', 'want_read', 'want_write', |
| 'set_connect_state', 'set_accept_state', |
| 'connect_ex', 'sendall'): |
| |
| exec """def %s(self, *args): |
| self._lock.acquire() |
| try: |
| return apply(self._ssl_conn.%s, args) |
| finally: |
| self._lock.release()\n""" % (f, f) |
| sys.modules['OpenSSL.tsafe'] = tsafe |
| |
| import operator |
| try: |
| operator.attrgetter |
| except AttributeError: |
| class attrgetter(object): |
| def __init__(self, name): |
| self.name = name |
| def __call__(self, obj): |
| return getattr(obj, self.name) |
| operator.attrgetter = attrgetter |
| |
| |
| try: |
| set = set |
| except NameError: |
| from sets import Set as set |
| |
| |
| try: |
| frozenset = frozenset |
| except NameError: |
| from sets import ImmutableSet as frozenset |
| |
| |
| try: |
| from functools import reduce |
| except ImportError: |
| reduce = reduce |