| #!/usr/bin/env python |
| |
| # Authors: |
| # Trevor Perrin |
| # Kees Bos - Added tests for XML-RPC |
| # Dimitris Moraitis - Anon ciphersuites |
| # Marcelo Fernandez - Added test for NPN |
| # Martin von Loewis - python 3 port |
| |
| # |
| # See the LICENSE file for legal information regarding use of this file. |
| from __future__ import print_function |
| import sys |
| import os |
| import os.path |
| import socket |
| import time |
| import getopt |
| try: |
| from BaseHTTPServer import HTTPServer |
| from SimpleHTTPServer import SimpleHTTPRequestHandler |
| except ImportError: |
| from http.server import HTTPServer, SimpleHTTPRequestHandler |
| |
| from tlslite import TLSConnection, Fault, HandshakeSettings, \ |
| X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \ |
| parsePEMKey, constants, \ |
| AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \ |
| POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \ |
| Checker, __version__ |
| |
| from tlslite.errors import * |
| from tlslite.utils.cryptomath import prngName |
| try: |
| import xmlrpclib |
| except ImportError: |
| # Python 3 |
| from xmlrpc import client as xmlrpclib |
| from tlslite import * |
| |
| try: |
| from tack.structures.Tack import Tack |
| |
| except ImportError: |
| pass |
| |
| def printUsage(s=None): |
| if m2cryptoLoaded: |
| crypto = "M2Crypto/OpenSSL" |
| else: |
| crypto = "Python crypto" |
| if s: |
| print("ERROR: %s" % s) |
| print("""\ntls.py version %s (using %s) |
| |
| Commands: |
| server HOST:PORT DIRECTORY |
| |
| client HOST:PORT DIRECTORY |
| """ % (__version__, crypto)) |
| sys.exit(-1) |
| |
| |
| def testConnClient(conn): |
| b1 = os.urandom(1) |
| b10 = os.urandom(10) |
| b100 = os.urandom(100) |
| b1000 = os.urandom(1000) |
| conn.write(b1) |
| conn.write(b10) |
| conn.write(b100) |
| conn.write(b1000) |
| assert(conn.read(min=1, max=1) == b1) |
| assert(conn.read(min=10, max=10) == b10) |
| assert(conn.read(min=100, max=100) == b100) |
| assert(conn.read(min=1000, max=1000) == b1000) |
| |
| def clientTestCmd(argv): |
| |
| address = argv[0] |
| dir = argv[1] |
| |
| #Split address into hostname/port tuple |
| address = address.split(":") |
| address = ( address[0], int(address[1]) ) |
| |
| def connect(): |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| if hasattr(sock, 'settimeout'): #It's a python 2.3 feature |
| sock.settimeout(5) |
| sock.connect(address) |
| c = TLSConnection(sock) |
| return c |
| |
| test = 0 |
| |
| badFault = False |
| |
| print("Test 0 - anonymous handshake") |
| connection = connect() |
| connection.handshakeClientAnonymous() |
| testConnClient(connection) |
| connection.close() |
| |
| print("Test 1 - good X509 (plus SNI)") |
| connection = connect() |
| connection.handshakeClientCert(serverName=address[0]) |
| testConnClient(connection) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| assert(connection.session.serverName == address[0]) |
| connection.close() |
| |
| print("Test 1.a - good X509, SSLv3") |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeClientCert(settings=settings) |
| testConnClient(connection) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| |
| print("Test 1.b - good X509, RC4-MD5") |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.macNames = ["md5"] |
| connection.handshakeClientCert(settings=settings) |
| testConnClient(connection) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5) |
| connection.close() |
| |
| if tackpyLoaded: |
| |
| settings = HandshakeSettings() |
| settings.useExperimentalTackExtension = True |
| |
| print("Test 2.a - good X.509, TACK") |
| connection = connect() |
| connection.handshakeClientCert(settings=settings) |
| assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3") |
| assert(connection.session.tackExt.activation_flags == 1) |
| testConnClient(connection) |
| connection.close() |
| |
| print("Test 2.b - good X.509, TACK unrelated to cert chain") |
| connection = connect() |
| try: |
| connection.handshakeClientCert(settings=settings) |
| assert(False) |
| except TLSLocalAlert as alert: |
| if alert.description != AlertDescription.illegal_parameter: |
| raise |
| connection.close() |
| |
| print("Test 3 - good SRP") |
| connection = connect() |
| connection.handshakeClientSRP("test", "password") |
| testConnClient(connection) |
| connection.close() |
| |
| print("Test 4 - SRP faults") |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientSRP("test", "password") |
| print(" Good Fault %s" % (Fault.faultNames[fault])) |
| except TLSFaultError as e: |
| print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| badFault = True |
| |
| print("Test 6 - good SRP: with X.509 certificate, TLSv1.0") |
| settings = HandshakeSettings() |
| settings.minVersion = (3,1) |
| settings.maxVersion = (3,1) |
| connection = connect() |
| connection.handshakeClientSRP("test", "password", settings=settings) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| testConnClient(connection) |
| connection.close() |
| |
| print("Test 7 - X.509 with SRP faults") |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientSRP("test", "password") |
| print(" Good Fault %s" % (Fault.faultNames[fault])) |
| except TLSFaultError as e: |
| print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| badFault = True |
| |
| print("Test 11 - X.509 faults") |
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientCert() |
| print(" Good Fault %s" % (Fault.faultNames[fault])) |
| except TLSFaultError as e: |
| print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| badFault = True |
| |
| print("Test 14 - good mutual X509") |
| x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()) |
| x509Chain = X509CertChain([x509Cert]) |
| s = open(os.path.join(dir, "clientX509Key.pem")).read() |
| x509Key = parsePEMKey(s, private=True) |
| |
| connection = connect() |
| connection.handshakeClientCert(x509Chain, x509Key) |
| testConnClient(connection) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| |
| print("Test 14.a - good mutual X509, SSLv3") |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeClientCert(x509Chain, x509Key, settings=settings) |
| testConnClient(connection) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| |
| print("Test 15 - mutual X.509 faults") |
| for fault in Fault.clientCertFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeClientCert(x509Chain, x509Key) |
| print(" Good Fault %s" % (Fault.faultNames[fault])) |
| except TLSFaultError as e: |
| print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| badFault = True |
| |
| print("Test 18 - good SRP, prepare to resume... (plus SNI)") |
| connection = connect() |
| connection.handshakeClientSRP("test", "password", serverName=address[0]) |
| testConnClient(connection) |
| connection.close() |
| session = connection.session |
| |
| print("Test 19 - resumption (plus SNI)") |
| connection = connect() |
| connection.handshakeClientSRP("test", "garbage", serverName=address[0], |
| session=session) |
| testConnClient(connection) |
| #Don't close! -- see below |
| |
| print("Test 20 - invalidated resumption (plus SNI)") |
| connection.sock.close() #Close the socket without a close_notify! |
| connection = connect() |
| try: |
| connection.handshakeClientSRP("test", "garbage", |
| serverName=address[0], session=session) |
| assert(False) |
| except TLSRemoteAlert as alert: |
| if alert.description != AlertDescription.bad_record_mac: |
| raise |
| connection.close() |
| |
| print("Test 21 - HTTPS test X.509") |
| address = address[0], address[1]+1 |
| if hasattr(socket, "timeout"): |
| timeoutEx = socket.timeout |
| else: |
| timeoutEx = socket.error |
| while 1: |
| try: |
| time.sleep(2) |
| htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8") |
| fingerprint = None |
| for y in range(2): |
| checker =Checker(x509Fingerprint=fingerprint) |
| h = HTTPTLSConnection(\ |
| address[0], address[1], checker=checker) |
| for x in range(3): |
| h.request("GET", "/index.html") |
| r = h.getresponse() |
| assert(r.status == 200) |
| b = bytearray(r.read()) |
| assert(b == htmlBody) |
| fingerprint = h.tlsSession.serverCertChain.getFingerprint() |
| assert(fingerprint) |
| time.sleep(2) |
| break |
| except timeoutEx: |
| print("timeout, retrying...") |
| pass |
| |
| address = address[0], address[1]+1 |
| |
| implementations = [] |
| if m2cryptoLoaded: |
| implementations.append("openssl") |
| if pycryptoLoaded: |
| implementations.append("pycrypto") |
| implementations.append("python") |
| |
| print("Test 22 - different ciphers, TLSv1.0") |
| for implementation in implementations: |
| for cipher in ["aes128", "aes256", "rc4"]: |
| |
| print("Test 22:", end=' ') |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| settings.minVersion = (3,1) |
| settings.maxVersion = (3,1) |
| connection.handshakeClientCert(settings=settings) |
| testConnClient(connection) |
| print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) |
| connection.close() |
| |
| print("Test 23 - throughput test") |
| for implementation in implementations: |
| for cipher in ["aes128", "aes256", "3des", "rc4"]: |
| if cipher == "3des" and implementation not in ("openssl", "pycrypto"): |
| continue |
| |
| print("Test 23:", end=' ') |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| connection.handshakeClientCert(settings=settings) |
| print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ') |
| |
| startTime = time.clock() |
| connection.write(b"hello"*10000) |
| h = connection.read(min=50000, max=50000) |
| stopTime = time.clock() |
| if stopTime-startTime: |
| print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))) |
| else: |
| print("100K exchanged very fast") |
| |
| assert(h == b"hello"*10000) |
| connection.close() |
| |
| print("Test 24.a - Next-Protocol Client Negotiation") |
| connection = connect() |
| connection.handshakeClientCert(nextProtos=[b"http/1.1"]) |
| #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| assert(connection.next_proto == b'http/1.1') |
| connection.close() |
| |
| print("Test 24.b - Next-Protocol Client Negotiation") |
| connection = connect() |
| connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
| #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| assert(connection.next_proto == b'spdy/2') |
| connection.close() |
| |
| print("Test 24.c - Next-Protocol Client Negotiation") |
| connection = connect() |
| connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
| #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| assert(connection.next_proto == b'spdy/2') |
| connection.close() |
| |
| print("Test 24.d - Next-Protocol Client Negotiation") |
| connection = connect() |
| connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) |
| #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| assert(connection.next_proto == b'spdy/2') |
| connection.close() |
| |
| print("Test 24.e - Next-Protocol Client Negotiation") |
| connection = connect() |
| connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) |
| #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| assert(connection.next_proto == b'spdy/3') |
| connection.close() |
| |
| print("Test 24.f - Next-Protocol Client Negotiation") |
| connection = connect() |
| connection.handshakeClientCert(nextProtos=[b"http/1.1"]) |
| #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| assert(connection.next_proto == b'http/1.1') |
| connection.close() |
| |
| print("Test 24.g - Next-Protocol Client Negotiation") |
| connection = connect() |
| connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
| #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| assert(connection.next_proto == b'spdy/2') |
| connection.close() |
| |
| print('Test 25 - good standard XMLRPC https client') |
| time.sleep(2) # Hack for lack of ability to set timeout here |
| address = address[0], address[1]+1 |
| server = xmlrpclib.Server('https://%s:%s' % address) |
| assert server.add(1,2) == 3 |
| assert server.pow(2,4) == 16 |
| |
| print('Test 26 - good tlslite XMLRPC client') |
| transport = XMLRPCTransport(ignoreAbruptClose=True) |
| server = xmlrpclib.Server('https://%s:%s' % address, transport) |
| assert server.add(1,2) == 3 |
| assert server.pow(2,4) == 16 |
| |
| print('Test 27 - good XMLRPC ignored protocol') |
| server = xmlrpclib.Server('http://%s:%s' % address, transport) |
| assert server.add(1,2) == 3 |
| assert server.pow(2,4) == 16 |
| |
| print("Test 28 - Internet servers test") |
| try: |
| i = IMAP4_TLS("cyrus.andrew.cmu.edu") |
| i.login("anonymous", "anonymous@anonymous.net") |
| i.logout() |
| print("Test 28: IMAP4 good") |
| p = POP3_TLS("pop.gmail.com") |
| p.quit() |
| print("Test 29: POP3 good") |
| except socket.error as e: |
| print("Non-critical error: socket error trying to reach internet server: ", e) |
| |
| if not badFault: |
| print("Test succeeded") |
| else: |
| print("Test failed") |
| |
| |
| |
| def testConnServer(connection): |
| count = 0 |
| while 1: |
| s = connection.read() |
| count += len(s) |
| if len(s) == 0: |
| break |
| connection.write(s) |
| if count == 1111: |
| break |
| |
| def serverTestCmd(argv): |
| |
| address = argv[0] |
| dir = argv[1] |
| |
| #Split address into hostname/port tuple |
| address = address.split(":") |
| address = ( address[0], int(address[1]) ) |
| |
| #Connect to server |
| lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| lsock.bind(address) |
| lsock.listen(5) |
| |
| def connect(): |
| return TLSConnection(lsock.accept()[0]) |
| |
| x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) |
| x509Chain = X509CertChain([x509Cert]) |
| s = open(os.path.join(dir, "serverX509Key.pem")).read() |
| x509Key = parsePEMKey(s, private=True) |
| |
| print("Test 0 - Anonymous server handshake") |
| connection = connect() |
| connection.handshakeServer(anon=True) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 1 - good X.509") |
| connection = connect() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| assert(connection.session.serverName == address[0]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 1.a - good X.509, SSL v3") |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 1.b - good X.509, RC4-MD5") |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.macNames = ["sha", "md5"] |
| settings.cipherNames = ["rc4"] |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) |
| testConnServer(connection) |
| connection.close() |
| |
| if tackpyLoaded: |
| tack = Tack.createFromPem(open("./TACK1.pem", "rU").read()) |
| tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read()) |
| |
| settings = HandshakeSettings() |
| settings.useExperimentalTackExtension = True |
| |
| print("Test 2.a - good X.509, TACK") |
| connection = connect() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| tacks=[tack], activationFlags=1, settings=settings) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 2.b - good X.509, TACK unrelated to cert chain") |
| connection = connect() |
| try: |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| tacks=[tackUnrelated], settings=settings) |
| assert(False) |
| except TLSRemoteAlert as alert: |
| if alert.description != AlertDescription.illegal_parameter: |
| raise |
| |
| print("Test 3 - good SRP") |
| verifierDB = VerifierDB() |
| verifierDB.create() |
| entry = VerifierDB.makeVerifier("test", "password", 1536) |
| verifierDB["test"] = entry |
| |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 4 - SRP faults") |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(verifierDB=verifierDB) |
| assert() |
| except: |
| pass |
| connection.close() |
| |
| print("Test 6 - good SRP: with X.509 cert") |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB, \ |
| certChain=x509Chain, privateKey=x509Key) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 7 - X.509 with SRP faults") |
| for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(verifierDB=verifierDB, \ |
| certChain=x509Chain, privateKey=x509Key) |
| assert() |
| except: |
| pass |
| connection.close() |
| |
| print("Test 11 - X.509 faults") |
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| assert() |
| except: |
| pass |
| connection.close() |
| |
| print("Test 14 - good mutual X.509") |
| connection = connect() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) |
| testConnServer(connection) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| |
| print("Test 14a - good mutual X.509, SSLv3") |
| connection = connect() |
| settings = HandshakeSettings() |
| settings.minVersion = (3,0) |
| settings.maxVersion = (3,0) |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) |
| testConnServer(connection) |
| assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| connection.close() |
| |
| print("Test 15 - mutual X.509 faults") |
| for fault in Fault.clientCertFaults + Fault.genericFaults: |
| connection = connect() |
| connection.fault = fault |
| try: |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) |
| assert() |
| except: |
| pass |
| connection.close() |
| |
| print("Test 18 - good SRP, prepare to resume") |
| sessionCache = SessionCache() |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| assert(connection.session.serverName == address[0]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 19 - resumption") |
| connection = connect() |
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| assert(connection.session.serverName == address[0]) |
| testConnServer(connection) |
| #Don't close! -- see next test |
| |
| print("Test 20 - invalidated resumption") |
| try: |
| connection.read(min=1, max=1) |
| assert() #Client is going to close the socket without a close_notify |
| except TLSAbruptCloseError as e: |
| pass |
| connection = connect() |
| try: |
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| except TLSLocalAlert as alert: |
| if alert.description != AlertDescription.bad_record_mac: |
| raise |
| connection.close() |
| |
| print("Test 21 - HTTPS test X.509") |
| |
| #Close the current listening socket |
| lsock.close() |
| |
| #Create and run an HTTP Server using TLSSocketServerMixIn |
| class MyHTTPServer(TLSSocketServerMixIn, |
| HTTPServer): |
| def handshake(self, tlsConnection): |
| tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| return True |
| cd = os.getcwd() |
| os.chdir(dir) |
| address = address[0], address[1]+1 |
| httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) |
| for x in range(6): |
| httpd.handle_request() |
| httpd.server_close() |
| cd = os.chdir(cd) |
| |
| #Re-connect the listening socket |
| lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| address = address[0], address[1]+1 |
| lsock.bind(address) |
| lsock.listen(5) |
| |
| implementations = [] |
| if m2cryptoLoaded: |
| implementations.append("openssl") |
| if pycryptoLoaded: |
| implementations.append("pycrypto") |
| implementations.append("python") |
| |
| print("Test 22 - different ciphers") |
| for implementation in ["python"] * len(implementations): |
| for cipher in ["aes128", "aes256", "rc4"]: |
| |
| print("Test 22:", end=' ') |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings) |
| print(connection.getCipherName(), connection.getCipherImplementation()) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 23 - throughput test") |
| for implementation in implementations: |
| for cipher in ["aes128", "aes256", "3des", "rc4"]: |
| if cipher == "3des" and implementation not in ("openssl", "pycrypto"): |
| continue |
| |
| print("Test 23:", end=' ') |
| connection = connect() |
| |
| settings = HandshakeSettings() |
| settings.cipherNames = [cipher] |
| settings.cipherImplementations = [implementation, "python"] |
| |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings) |
| print(connection.getCipherName(), connection.getCipherImplementation()) |
| h = connection.read(min=50000, max=50000) |
| assert(h == b"hello"*10000) |
| connection.write(h) |
| connection.close() |
| |
| print("Test 24.a - Next-Protocol Server Negotiation") |
| connection = connect() |
| settings = HandshakeSettings() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings, nextProtos=[b"http/1.1"]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 24.b - Next-Protocol Server Negotiation") |
| connection = connect() |
| settings = HandshakeSettings() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 24.c - Next-Protocol Server Negotiation") |
| connection = connect() |
| settings = HandshakeSettings() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings, nextProtos=[b"http/1.1", b"spdy/2"]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 24.d - Next-Protocol Server Negotiation") |
| connection = connect() |
| settings = HandshakeSettings() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 24.e - Next-Protocol Server Negotiation") |
| connection = connect() |
| settings = HandshakeSettings() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 24.f - Next-Protocol Server Negotiation") |
| connection = connect() |
| settings = HandshakeSettings() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings, nextProtos=[b"spdy/3", b"spdy/2"]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Test 24.g - Next-Protocol Server Negotiation") |
| connection = connect() |
| settings = HandshakeSettings() |
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| settings=settings, nextProtos=[]) |
| testConnServer(connection) |
| connection.close() |
| |
| print("Tests 25-27 - XMLRPXC server") |
| address = address[0], address[1]+1 |
| class Server(TLSXMLRPCServer): |
| |
| def handshake(self, tlsConnection): |
| try: |
| tlsConnection.handshakeServer(certChain=x509Chain, |
| privateKey=x509Key, |
| sessionCache=sessionCache) |
| tlsConnection.ignoreAbruptClose = True |
| return True |
| except TLSError as error: |
| print("Handshake failure:", str(error)) |
| return False |
| |
| class MyFuncs: |
| def pow(self, x, y): return pow(x, y) |
| def add(self, x, y): return x + y |
| |
| server = Server(address) |
| server.register_instance(MyFuncs()) |
| #sa = server.socket.getsockname() |
| #print "Serving HTTPS on", sa[0], "port", sa[1] |
| for i in range(6): |
| server.handle_request() |
| |
| print("Test succeeded") |
| |
| |
| if __name__ == '__main__': |
| if len(sys.argv) < 2: |
| printUsage("Missing command") |
| elif sys.argv[1] == "client"[:len(sys.argv[1])]: |
| clientTestCmd(sys.argv[2:]) |
| elif sys.argv[1] == "server"[:len(sys.argv[1])]: |
| serverTestCmd(sys.argv[2:]) |
| else: |
| printUsage("Unknown command: %s" % sys.argv[1]) |