blob: bda49df22ca52a1a00c19aaf66253234db19f56f [file] [log] [blame]
## This file is part of Scapy
## Copyright (C) 2007, 2008, 2009 Arnaud Ebalard
## 2015, 2016, 2017 Maxence Tury
## This program is published under a GPLv2 license
"""
TLS client automaton. This makes for a primitive TLS stack.
Obviously you need rights for network access.
We support versions SSLv2 to TLS 1.2, along with many features.
There is no session resumption mechanism for now.
In order to run a client to tcp/50000 with one cipher suite of your choice:
> from scapy.all import *
> ch = TLSClientHello(ciphers=<int code of the cipher suite>)
> t = TLSClientAutomaton(dport=50000, client_hello=ch)
> t.run()
"""
from __future__ import print_function
import socket
from scapy.pton_ntop import inet_pton
from scapy.utils import randstring
from scapy.automaton import ATMT
from scapy.layers.tls.automaton import _TLSAutomaton
from scapy.layers.tls.basefields import _tls_version, _tls_version_options
from scapy.layers.tls.session import tlsSession
from scapy.layers.tls.extensions import (TLS_Ext_SupportedGroups,
TLS_Ext_SupportedVersions,
TLS_Ext_SignatureAlgorithms,
TLS_Ext_ServerName, ServerName)
from scapy.layers.tls.handshake import *
from scapy.layers.tls.handshake_sslv2 import *
from scapy.layers.tls.keyexchange_tls13 import (TLS_Ext_KeyShare_CH,
KeyShareEntry)
from scapy.layers.tls.record import (TLS, TLSAlert, TLSChangeCipherSpec,
TLSApplicationData)
from scapy.modules import six
class TLSClientAutomaton(_TLSAutomaton):
"""
A simple TLS test client automaton. Try to overload some states or
conditions and see what happens on the other side.
Rather than with an interruption, the best way to stop this client is by
typing 'quit'. This won't be a message sent to the server.
_'mycert' and 'mykey' may be provided as filenames. They will be used in
the handshake, should the server ask for client authentication.
_'server_name' does not need to be set.
_'client_hello' may hold a TLSClientHello or SSLv2ClientHello to be sent
to the server. This is particularly useful for extensions tweaking.
_'version' is a quicker way to advertise a protocol version ("sslv2",
"tls1", "tls12", etc.) It may be overriden by the previous 'client_hello'.
_'data' is a list of raw data to be sent to the server once the handshake
has been completed. Both 'stop_server' and 'quit' will work this way.
"""
def parse_args(self, server="127.0.0.1", dport=4433, server_name=None,
mycert=None, mykey=None,
client_hello=None, version=None,
data=None,
**kargs):
super(TLSClientAutomaton, self).parse_args(mycert=mycert,
mykey=mykey,
**kargs)
tmp = socket.getaddrinfo(server, dport)
self.remote_name = None
try:
if ':' in server:
inet_pton(socket.AF_INET6, server)
else:
inet_pton(socket.AF_INET, server)
except:
self.remote_name = socket.getfqdn(server)
if self.remote_name != server:
tmp = socket.getaddrinfo(self.remote_name, dport)
if server_name:
self.remote_name = server_name
self.remote_family = tmp[0][0]
self.remote_ip = tmp[0][4][0]
self.remote_port = dport
self.local_ip = None
self.local_port = None
self.socket = None
self.client_hello = client_hello
self.advertised_tls_version = None
if version:
v = _tls_version_options.get(version, None)
if not v:
self.vprint("Unrecognized TLS version option.")
else:
self.advertised_tls_version = v
self.linebreak = False
if isinstance(data, bytes):
self.data_to_send = [data]
elif isinstance(data, six.string_types):
self.data_to_send = [raw(data)]
elif isinstance(data, list):
self.data_to_send = list(raw(d) for d in reversed(data))
else:
self.data_to_send = []
def vprint_sessioninfo(self):
if self.verbose:
s = self.cur_session
v = _tls_version[s.tls_version]
self.vprint("Version : %s" % v)
cs = s.wcs.ciphersuite.name
self.vprint("Cipher suite : %s" % cs)
if s.tls_version >= 0x0304:
ms = s.tls13_master_secret
else:
ms = s.master_secret
self.vprint("Master secret : %s" % repr_hex(ms))
if s.server_certs:
self.vprint("Server certificate chain: %r" % s.server_certs)
self.vprint()
@ATMT.state(initial=True)
def INITIAL(self):
self.vprint("Starting TLS client automaton.")
raise self.INIT_TLS_SESSION()
@ATMT.state()
def INIT_TLS_SESSION(self):
self.cur_session = tlsSession(connection_end="client")
self.cur_session.client_certs = self.mycert
self.cur_session.client_key = self.mykey
v = self.advertised_tls_version
if v:
self.cur_session.advertised_tls_version = v
else:
default_version = self.cur_session.advertised_tls_version
self.advertised_tls_version = default_version
raise self.CONNECT()
@ATMT.state()
def CONNECT(self):
s = socket.socket(self.remote_family, socket.SOCK_STREAM)
self.vprint()
self.vprint("Trying to connect on %s:%d" % (self.remote_ip,
self.remote_port))
s.connect((self.remote_ip, self.remote_port))
self.socket = s
self.local_ip, self.local_port = self.socket.getsockname()[:2]
self.vprint()
if self.cur_session.advertised_tls_version in [0x0200, 0x0002]:
raise self.SSLv2_PREPARE_CLIENTHELLO()
elif self.cur_session.advertised_tls_version >= 0x0304:
raise self.TLS13_START()
else:
raise self.PREPARE_CLIENTFLIGHT1()
########################### TLS handshake #################################
@ATMT.state()
def PREPARE_CLIENTFLIGHT1(self):
self.add_record()
@ATMT.condition(PREPARE_CLIENTFLIGHT1)
def should_add_ClientHello(self):
self.add_msg(self.client_hello or TLSClientHello())
raise self.ADDED_CLIENTHELLO()
@ATMT.state()
def ADDED_CLIENTHELLO(self):
pass
@ATMT.condition(ADDED_CLIENTHELLO)
def should_send_ClientFlight1(self):
self.flush_records()
raise self.SENT_CLIENTFLIGHT1()
@ATMT.state()
def SENT_CLIENTFLIGHT1(self):
raise self.WAITING_SERVERFLIGHT1()
@ATMT.state()
def WAITING_SERVERFLIGHT1(self):
self.get_next_msg()
raise self.RECEIVED_SERVERFLIGHT1()
@ATMT.state()
def RECEIVED_SERVERFLIGHT1(self):
pass
@ATMT.condition(RECEIVED_SERVERFLIGHT1, prio=1)
def should_handle_ServerHello(self):
"""
XXX We should check the ServerHello attributes for discrepancies with
our own ClientHello.
"""
self.raise_on_packet(TLSServerHello,
self.HANDLED_SERVERHELLO)
@ATMT.state()
def HANDLED_SERVERHELLO(self):
pass
@ATMT.condition(RECEIVED_SERVERFLIGHT1, prio=2)
def missing_ServerHello(self):
raise self.MISSING_SERVERHELLO()
@ATMT.state()
def MISSING_SERVERHELLO(self):
self.vprint("Missing TLS ServerHello message!")
raise self.CLOSE_NOTIFY()
@ATMT.condition(HANDLED_SERVERHELLO, prio=1)
def should_handle_ServerCertificate(self):
if not self.cur_session.prcs.key_exchange.anonymous:
self.raise_on_packet(TLSCertificate,
self.HANDLED_SERVERCERTIFICATE)
raise self.HANDLED_SERVERCERTIFICATE()
@ATMT.state()
def HANDLED_SERVERCERTIFICATE(self):
pass
@ATMT.condition(HANDLED_SERVERHELLO, prio=2)
def missing_ServerCertificate(self):
raise self.MISSING_SERVERCERTIFICATE()
@ATMT.state()
def MISSING_SERVERCERTIFICATE(self):
self.vprint("Missing TLS Certificate message!")
raise self.CLOSE_NOTIFY()
@ATMT.state()
def HANDLED_CERTIFICATEREQUEST(self):
self.vprint("Server asked for a certificate...")
if not self.mykey or not self.mycert:
self.vprint("No client certificate to send!")
self.vprint("Will try and send an empty Certificate message...")
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=1)
def should_handle_ServerKeyExchange_from_ServerCertificate(self):
"""
XXX We should check the ServerKeyExchange attributes for discrepancies
with our own ClientHello, along with the ServerHello and Certificate.
"""
self.raise_on_packet(TLSServerKeyExchange,
self.HANDLED_SERVERKEYEXCHANGE)
@ATMT.state(final=True)
def MISSING_SERVERKEYEXCHANGE(self):
pass
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=2)
def missing_ServerKeyExchange(self):
if not self.cur_session.prcs.key_exchange.no_ske:
raise self.MISSING_SERVERKEYEXCHANGE()
@ATMT.state()
def HANDLED_SERVERKEYEXCHANGE(self):
pass
def should_handle_CertificateRequest(self):
"""
XXX We should check the CertificateRequest attributes for discrepancies
with the cipher suite, etc.
"""
self.raise_on_packet(TLSCertificateRequest,
self.HANDLED_CERTIFICATEREQUEST)
@ATMT.condition(HANDLED_SERVERKEYEXCHANGE, prio=2)
def should_handle_CertificateRequest_from_ServerKeyExchange(self):
self.should_handle_CertificateRequest()
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=3)
def should_handle_CertificateRequest_from_ServerCertificate(self):
self.should_handle_CertificateRequest()
def should_handle_ServerHelloDone(self):
self.raise_on_packet(TLSServerHelloDone,
self.HANDLED_SERVERHELLODONE)
@ATMT.condition(HANDLED_SERVERKEYEXCHANGE, prio=1)
def should_handle_ServerHelloDone_from_ServerKeyExchange(self):
return self.should_handle_ServerHelloDone()
@ATMT.condition(HANDLED_CERTIFICATEREQUEST, prio=4)
def should_handle_ServerHelloDone_from_CertificateRequest(self):
return self.should_handle_ServerHelloDone()
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=4)
def should_handle_ServerHelloDone_from_ServerCertificate(self):
return self.should_handle_ServerHelloDone()
@ATMT.state()
def HANDLED_SERVERHELLODONE(self):
raise self.PREPARE_CLIENTFLIGHT2()
@ATMT.state()
def PREPARE_CLIENTFLIGHT2(self):
self.add_record()
@ATMT.condition(PREPARE_CLIENTFLIGHT2, prio=1)
def should_add_ClientCertificate(self):
"""
If the server sent a CertificateRequest, we send a Certificate message.
If no certificate is available, an empty Certificate message is sent:
- this is a SHOULD in RFC 4346 (Section 7.4.6)
- this is a MUST in RFC 5246 (Section 7.4.6)
XXX We may want to add a complete chain.
"""
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
if not TLSCertificateRequest in hs_msg:
return
certs = []
if self.mycert:
certs = [self.mycert]
self.add_msg(TLSCertificate(certs=certs))
raise self.ADDED_CLIENTCERTIFICATE()
@ATMT.state()
def ADDED_CLIENTCERTIFICATE(self):
pass
def should_add_ClientKeyExchange(self):
self.add_msg(TLSClientKeyExchange())
raise self.ADDED_CLIENTKEYEXCHANGE()
@ATMT.condition(PREPARE_CLIENTFLIGHT2, prio=2)
def should_add_ClientKeyExchange_from_ClientFlight2(self):
return self.should_add_ClientKeyExchange()
@ATMT.condition(ADDED_CLIENTCERTIFICATE)
def should_add_ClientKeyExchange_from_ClientCertificate(self):
return self.should_add_ClientKeyExchange()
@ATMT.state()
def ADDED_CLIENTKEYEXCHANGE(self):
pass
@ATMT.condition(ADDED_CLIENTKEYEXCHANGE, prio=1)
def should_add_ClientVerify(self):
"""
XXX Section 7.4.7.1 of RFC 5246 states that the CertificateVerify
message is only sent following a client certificate that has signing
capability (i.e. not those containing fixed DH params).
We should verify that before adding the message. We should also handle
the case when the Certificate message was empty.
"""
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
if (not TLSCertificateRequest in hs_msg or
self.mycert is None or
self.mykey is None):
return
self.add_msg(TLSCertificateVerify())
raise self.ADDED_CERTIFICATEVERIFY()
@ATMT.state()
def ADDED_CERTIFICATEVERIFY(self):
pass
@ATMT.condition(ADDED_CERTIFICATEVERIFY)
def should_add_ChangeCipherSpec_from_CertificateVerify(self):
self.add_record()
self.add_msg(TLSChangeCipherSpec())
raise self.ADDED_CHANGECIPHERSPEC()
@ATMT.condition(ADDED_CLIENTKEYEXCHANGE, prio=2)
def should_add_ChangeCipherSpec_from_ClientKeyExchange(self):
self.add_record()
self.add_msg(TLSChangeCipherSpec())
raise self.ADDED_CHANGECIPHERSPEC()
@ATMT.state()
def ADDED_CHANGECIPHERSPEC(self):
pass
@ATMT.condition(ADDED_CHANGECIPHERSPEC)
def should_add_ClientFinished(self):
self.add_record()
self.add_msg(TLSFinished())
raise self.ADDED_CLIENTFINISHED()
@ATMT.state()
def ADDED_CLIENTFINISHED(self):
pass
@ATMT.condition(ADDED_CLIENTFINISHED)
def should_send_ClientFlight2(self):
self.flush_records()
raise self.SENT_CLIENTFLIGHT2()
@ATMT.state()
def SENT_CLIENTFLIGHT2(self):
raise self.WAITING_SERVERFLIGHT2()
@ATMT.state()
def WAITING_SERVERFLIGHT2(self):
self.get_next_msg()
raise self.RECEIVED_SERVERFLIGHT2()
@ATMT.state()
def RECEIVED_SERVERFLIGHT2(self):
pass
@ATMT.condition(RECEIVED_SERVERFLIGHT2)
def should_handle_ChangeCipherSpec(self):
self.raise_on_packet(TLSChangeCipherSpec,
self.HANDLED_CHANGECIPHERSPEC)
@ATMT.state()
def HANDLED_CHANGECIPHERSPEC(self):
pass
@ATMT.condition(HANDLED_CHANGECIPHERSPEC)
def should_handle_Finished(self):
self.raise_on_packet(TLSFinished,
self.HANDLED_SERVERFINISHED)
@ATMT.state()
def HANDLED_SERVERFINISHED(self):
self.vprint("TLS handshake completed!")
self.vprint_sessioninfo()
self.vprint("You may send data or use 'quit'.")
####################### end of TLS handshake ##############################
@ATMT.condition(HANDLED_SERVERFINISHED)
def should_wait_ClientData(self):
raise self.WAIT_CLIENTDATA()
@ATMT.state()
def WAIT_CLIENTDATA(self):
pass
@ATMT.condition(WAIT_CLIENTDATA, prio=1)
def add_ClientData(self):
"""
The user may type in:
GET / HTTP/1.1\r\nHost: testserver.com\r\n\r\n
Special characters are handled so that it becomes a valid HTTP request.
"""
if not self.data_to_send:
data = six.moves.input().replace('\\r', '\r').replace('\\n', '\n').encode()
else:
data = self.data_to_send.pop()
if data == b"quit":
return
if self.linebreak:
data += b"\n"
self.add_record()
self.add_msg(TLSApplicationData(data=data))
raise self.ADDED_CLIENTDATA()
@ATMT.condition(WAIT_CLIENTDATA, prio=2)
def no_more_ClientData(self):
raise self.CLOSE_NOTIFY()
@ATMT.state()
def ADDED_CLIENTDATA(self):
pass
@ATMT.condition(ADDED_CLIENTDATA)
def should_send_ClientData(self):
self.flush_records()
raise self.SENT_CLIENTDATA()
@ATMT.state()
def SENT_CLIENTDATA(self):
raise self.WAITING_SERVERDATA()
@ATMT.state()
def WAITING_SERVERDATA(self):
self.get_next_msg(0.3, 1)
raise self.RECEIVED_SERVERDATA()
@ATMT.state()
def RECEIVED_SERVERDATA(self):
pass
@ATMT.condition(RECEIVED_SERVERDATA, prio=1)
def should_handle_ServerData(self):
if not self.buffer_in:
raise self.WAIT_CLIENTDATA()
p = self.buffer_in[0]
if isinstance(p, TLSApplicationData):
print("> Received: %r" % p.data)
elif isinstance(p, TLSAlert):
print("> Received: %r" % p)
raise self.CLOSE_NOTIFY()
else:
print("> Received: %r" % p)
self.buffer_in = self.buffer_in[1:]
raise self.HANDLED_SERVERDATA()
@ATMT.state()
def HANDLED_SERVERDATA(self):
raise self.WAIT_CLIENTDATA()
@ATMT.state()
def CLOSE_NOTIFY(self):
self.vprint()
self.vprint("Trying to send a TLSAlert to the server...")
@ATMT.condition(CLOSE_NOTIFY)
def close_session(self):
self.add_record()
self.add_msg(TLSAlert(level=1, descr=0))
try:
self.flush_records()
except:
self.vprint("Could not send termination Alert, maybe the server stopped?")
raise self.FINAL()
########################## SSLv2 handshake ################################
@ATMT.state()
def SSLv2_PREPARE_CLIENTHELLO(self):
pass
@ATMT.condition(SSLv2_PREPARE_CLIENTHELLO)
def sslv2_should_add_ClientHello(self):
self.add_record(is_sslv2=True)
p = self.client_hello or SSLv2ClientHello(challenge=randstring(16))
self.add_msg(p)
raise self.SSLv2_ADDED_CLIENTHELLO()
@ATMT.state()
def SSLv2_ADDED_CLIENTHELLO(self):
pass
@ATMT.condition(SSLv2_ADDED_CLIENTHELLO)
def sslv2_should_send_ClientHello(self):
self.flush_records()
raise self.SSLv2_SENT_CLIENTHELLO()
@ATMT.state()
def SSLv2_SENT_CLIENTHELLO(self):
raise self.SSLv2_WAITING_SERVERHELLO()
@ATMT.state()
def SSLv2_WAITING_SERVERHELLO(self):
self.get_next_msg()
raise self.SSLv2_RECEIVED_SERVERHELLO()
@ATMT.state()
def SSLv2_RECEIVED_SERVERHELLO(self):
pass
@ATMT.condition(SSLv2_RECEIVED_SERVERHELLO, prio=1)
def sslv2_should_handle_ServerHello(self):
self.raise_on_packet(SSLv2ServerHello,
self.SSLv2_HANDLED_SERVERHELLO)
@ATMT.state()
def SSLv2_HANDLED_SERVERHELLO(self):
pass
@ATMT.condition(SSLv2_RECEIVED_SERVERHELLO, prio=2)
def sslv2_missing_ServerHello(self):
raise self.SSLv2_MISSING_SERVERHELLO()
@ATMT.state()
def SSLv2_MISSING_SERVERHELLO(self):
self.vprint("Missing SSLv2 ServerHello message!")
raise self.SSLv2_CLOSE_NOTIFY()
@ATMT.condition(SSLv2_HANDLED_SERVERHELLO)
def sslv2_should_add_ClientMasterKey(self):
self.add_record(is_sslv2=True)
self.add_msg(SSLv2ClientMasterKey())
raise self.SSLv2_ADDED_CLIENTMASTERKEY()
@ATMT.state()
def SSLv2_ADDED_CLIENTMASTERKEY(self):
pass
@ATMT.condition(SSLv2_ADDED_CLIENTMASTERKEY)
def sslv2_should_send_ClientMasterKey(self):
self.flush_records()
raise self.SSLv2_SENT_CLIENTMASTERKEY()
@ATMT.state()
def SSLv2_SENT_CLIENTMASTERKEY(self):
raise self.SSLv2_WAITING_SERVERVERIFY()
@ATMT.state()
def SSLv2_WAITING_SERVERVERIFY(self):
# We give the server 0.5 second to send his ServerVerify.
# Else we assume that he's waiting for our ClientFinished.
self.get_next_msg(0.5, 0)
raise self.SSLv2_RECEIVED_SERVERVERIFY()
@ATMT.state()
def SSLv2_RECEIVED_SERVERVERIFY(self):
pass
@ATMT.condition(SSLv2_RECEIVED_SERVERVERIFY, prio=1)
def sslv2_should_handle_ServerVerify(self):
self.raise_on_packet(SSLv2ServerVerify,
self.SSLv2_HANDLED_SERVERVERIFY,
get_next_msg=False)
@ATMT.state()
def SSLv2_HANDLED_SERVERVERIFY(self):
pass
def sslv2_should_add_ClientFinished(self):
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
if SSLv2ClientFinished in hs_msg:
return
self.add_record(is_sslv2=True)
self.add_msg(SSLv2ClientFinished())
raise self.SSLv2_ADDED_CLIENTFINISHED()
@ATMT.condition(SSLv2_HANDLED_SERVERVERIFY, prio=1)
def sslv2_should_add_ClientFinished_from_ServerVerify(self):
return self.sslv2_should_add_ClientFinished()
@ATMT.condition(SSLv2_HANDLED_SERVERVERIFY, prio=2)
def sslv2_should_wait_ServerFinished_from_ServerVerify(self):
raise self.SSLv2_WAITING_SERVERFINISHED()
@ATMT.condition(SSLv2_RECEIVED_SERVERVERIFY, prio=2)
def sslv2_should_add_ClientFinished_from_NoServerVerify(self):
return self.sslv2_should_add_ClientFinished()
@ATMT.condition(SSLv2_RECEIVED_SERVERVERIFY, prio=3)
def sslv2_missing_ServerVerify(self):
raise self.SSLv2_MISSING_SERVERVERIFY()
@ATMT.state(final=True)
def SSLv2_MISSING_SERVERVERIFY(self):
self.vprint("Missing SSLv2 ServerVerify message!")
raise self.SSLv2_CLOSE_NOTIFY()
@ATMT.state()
def SSLv2_ADDED_CLIENTFINISHED(self):
pass
@ATMT.condition(SSLv2_ADDED_CLIENTFINISHED)
def sslv2_should_send_ClientFinished(self):
self.flush_records()
raise self.SSLv2_SENT_CLIENTFINISHED()
@ATMT.state()
def SSLv2_SENT_CLIENTFINISHED(self):
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
if SSLv2ServerVerify in hs_msg:
raise self.SSLv2_WAITING_SERVERFINISHED()
else:
self.get_next_msg()
raise self.SSLv2_RECEIVED_SERVERVERIFY()
@ATMT.state()
def SSLv2_WAITING_SERVERFINISHED(self):
self.get_next_msg()
raise self.SSLv2_RECEIVED_SERVERFINISHED()
@ATMT.state()
def SSLv2_RECEIVED_SERVERFINISHED(self):
pass
@ATMT.condition(SSLv2_RECEIVED_SERVERFINISHED, prio=1)
def sslv2_should_handle_ServerFinished(self):
self.raise_on_packet(SSLv2ServerFinished,
self.SSLv2_HANDLED_SERVERFINISHED)
####################### SSLv2 client authentication #######################
@ATMT.condition(SSLv2_RECEIVED_SERVERFINISHED, prio=2)
def sslv2_should_handle_RequestCertificate(self):
self.raise_on_packet(SSLv2RequestCertificate,
self.SSLv2_HANDLED_REQUESTCERTIFICATE)
@ATMT.state()
def SSLv2_HANDLED_REQUESTCERTIFICATE(self):
self.vprint("Server asked for a certificate...")
if not self.mykey or not self.mycert:
self.vprint("No client certificate to send!")
raise self.SSLv2_CLOSE_NOTIFY()
@ATMT.condition(SSLv2_HANDLED_REQUESTCERTIFICATE)
def sslv2_should_add_ClientCertificate(self):
self.add_record(is_sslv2=True)
self.add_msg(SSLv2ClientCertificate(certdata=self.mycert))
raise self.SSLv2_ADDED_CLIENTCERTIFICATE()
@ATMT.state()
def SSLv2_ADDED_CLIENTCERTIFICATE(self):
pass
@ATMT.condition(SSLv2_ADDED_CLIENTCERTIFICATE)
def sslv2_should_send_ClientCertificate(self):
self.flush_records()
raise self.SSLv2_SENT_CLIENTCERTIFICATE()
@ATMT.state()
def SSLv2_SENT_CLIENTCERTIFICATE(self):
raise self.SSLv2_WAITING_SERVERFINISHED()
################### end of SSLv2 client authentication ####################
@ATMT.state()
def SSLv2_HANDLED_SERVERFINISHED(self):
self.vprint("SSLv2 handshake completed!")
self.vprint_sessioninfo()
self.vprint("You may send data or use 'quit'.")
@ATMT.condition(SSLv2_RECEIVED_SERVERFINISHED, prio=3)
def sslv2_missing_ServerFinished(self):
raise self.SSLv2_MISSING_SERVERFINISHED()
@ATMT.state()
def SSLv2_MISSING_SERVERFINISHED(self):
self.vprint("Missing SSLv2 ServerFinished message!")
raise self.SSLv2_CLOSE_NOTIFY()
######################## end of SSLv2 handshake ###########################
@ATMT.condition(SSLv2_HANDLED_SERVERFINISHED)
def sslv2_should_wait_ClientData(self):
raise self.SSLv2_WAITING_CLIENTDATA()
@ATMT.state()
def SSLv2_WAITING_CLIENTDATA(self):
pass
@ATMT.condition(SSLv2_WAITING_CLIENTDATA, prio=1)
def sslv2_add_ClientData(self):
if not self.data_to_send:
data = six.moves.input().replace('\\r', '\r').replace('\\n', '\n').encode()
else:
data = self.data_to_send.pop()
self.vprint("> Read from list: %s" % data)
if data == "quit":
return
if self.linebreak:
data += "\n"
self.add_record(is_sslv2=True)
self.add_msg(Raw(data))
raise self.SSLv2_ADDED_CLIENTDATA()
@ATMT.condition(SSLv2_WAITING_CLIENTDATA, prio=2)
def sslv2_no_more_ClientData(self):
raise self.SSLv2_CLOSE_NOTIFY()
@ATMT.state()
def SSLv2_ADDED_CLIENTDATA(self):
pass
@ATMT.condition(SSLv2_ADDED_CLIENTDATA)
def sslv2_should_send_ClientData(self):
self.flush_records()
raise self.SSLv2_SENT_CLIENTDATA()
@ATMT.state()
def SSLv2_SENT_CLIENTDATA(self):
raise self.SSLv2_WAITING_SERVERDATA()
@ATMT.state()
def SSLv2_WAITING_SERVERDATA(self):
self.get_next_msg(0.3, 1)
raise self.SSLv2_RECEIVED_SERVERDATA()
@ATMT.state()
def SSLv2_RECEIVED_SERVERDATA(self):
pass
@ATMT.condition(SSLv2_RECEIVED_SERVERDATA)
def sslv2_should_handle_ServerData(self):
if not self.buffer_in:
raise self.SSLv2_WAITING_CLIENTDATA()
p = self.buffer_in[0]
print("> Received: %r" % p.load)
if p.load.startswith(b"goodbye"):
raise self.SSLv2_CLOSE_NOTIFY()
self.buffer_in = self.buffer_in[1:]
raise self.SSLv2_HANDLED_SERVERDATA()
@ATMT.state()
def SSLv2_HANDLED_SERVERDATA(self):
raise self.SSLv2_WAITING_CLIENTDATA()
@ATMT.state()
def SSLv2_CLOSE_NOTIFY(self):
"""
There is no proper way to end an SSLv2 session.
We try and send a 'goodbye' message as a substitute.
"""
self.vprint()
self.vprint("Trying to send a 'goodbye' to the server...")
@ATMT.condition(SSLv2_CLOSE_NOTIFY)
def sslv2_close_session(self):
self.add_record()
self.add_msg(Raw('goodbye'))
try:
self.flush_records()
except:
self.vprint("Could not send our goodbye. The server probably stopped.")
self.socket.close()
raise self.FINAL()
######################### TLS 1.3 handshake ###############################
@ATMT.state()
def TLS13_START(self):
pass
@ATMT.condition(TLS13_START)
def tls13_should_add_ClientHello(self):
# we have to use the legacy, plaintext TLS record here
self.add_record(is_tls13=False)
if self.client_hello:
p = self.client_hello
else:
# When trying to connect to a public TLS 1.3 server,
# you will most likely need to provide an SNI extension.
#sn = ServerName(servername="<put server name here>")
ext = [TLS_Ext_SupportedGroups(groups=["secp256r1"]),
#TLS_Ext_ServerName(servernames=[sn]),
TLS_Ext_KeyShare_CH(client_shares=[KeyShareEntry(group=23)]),
TLS_Ext_SupportedVersions(versions=["TLS 1.3-d18"]),
TLS_Ext_SignatureAlgorithms(sig_algs=["sha256+rsapss",
"sha256+rsa"]) ]
p = TLSClientHello(ciphers=0x1301, ext=ext)
self.add_msg(p)
raise self.TLS13_ADDED_CLIENTHELLO()
@ATMT.state()
def TLS13_ADDED_CLIENTHELLO(self):
pass
@ATMT.condition(TLS13_ADDED_CLIENTHELLO)
def tls13_should_send_ClientHello(self):
self.flush_records()
raise self.TLS13_SENT_CLIENTHELLO()
@ATMT.state()
def TLS13_SENT_CLIENTHELLO(self):
raise self.TLS13_WAITING_SERVERHELLO()
@ATMT.state()
def TLS13_WAITING_SERVERHELLO(self):
self.get_next_msg()
@ATMT.condition(TLS13_WAITING_SERVERHELLO)
def tls13_should_handle_ServerHello(self):
self.raise_on_packet(TLS13ServerHello,
self.TLS13_WAITING_ENCRYPTEDEXTENSIONS)
@ATMT.state()
def TLS13_WAITING_ENCRYPTEDEXTENSIONS(self):
self.get_next_msg()
@ATMT.condition(TLS13_WAITING_ENCRYPTEDEXTENSIONS)
def tls13_should_handle_EncryptedExtensions(self):
self.raise_on_packet(TLSEncryptedExtensions,
self.TLS13_WAITING_CERTIFICATE)
@ATMT.state()
def TLS13_WAITING_CERTIFICATE(self):
self.get_next_msg()
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=1)
def tls13_should_handle_Certificate(self):
self.raise_on_packet(TLS13Certificate,
self.TLS13_WAITING_CERTIFICATEVERIFY)
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=2)
def tls13_should_handle_CertificateRequest(self):
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
if TLSCertificateRequest in hs_msg:
self.vprint("TLSCertificateRequest already received!")
self.raise_on_packet(TLSCertificateRequest,
self.TLS13_WAITING_CERTIFICATE)
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=3)
def tls13_should_handle_ServerFinished_from_EncryptedExtensions(self):
self.raise_on_packet(TLSFinished,
self.TLS13_CONNECTED)
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=4)
def tls13_missing_Certificate(self):
self.vprint("Missing TLS 1.3 message after EncryptedExtensions!")
raise self.FINAL()
@ATMT.state()
def TLS13_WAITING_CERTIFICATEVERIFY(self):
self.get_next_msg()
@ATMT.condition(TLS13_WAITING_CERTIFICATEVERIFY)
def tls13_should_handle_CertificateVerify(self):
self.raise_on_packet(TLSCertificateVerify,
self.TLS13_WAITING_SERVERFINISHED)
@ATMT.state()
def TLS13_WAITING_SERVERFINISHED(self):
self.get_next_msg()
@ATMT.condition(TLS13_WAITING_SERVERFINISHED)
def tls13_should_handle_ServerFinished_from_CertificateVerify(self):
self.raise_on_packet(TLSFinished,
self.TLS13_PREPARE_CLIENTFLIGHT2)
@ATMT.state()
def TLS13_PREPARE_CLIENTFLIGHT2(self):
self.add_record(is_tls13=True)
#raise self.FINAL()
@ATMT.condition(TLS13_PREPARE_CLIENTFLIGHT2)
def tls13_should_add_ClientFinished(self):
self.add_msg(TLSFinished())
raise self.TLS13_ADDED_CLIENTFINISHED()
@ATMT.state()
def TLS13_ADDED_CLIENTFINISHED(self):
pass
@ATMT.condition(TLS13_ADDED_CLIENTFINISHED)
def tls13_should_send_ClientFlight2(self):
self.flush_records()
raise self.TLS13_SENT_CLIENTFLIGHT2()
@ATMT.state()
def TLS13_SENT_CLIENTFLIGHT2(self):
raise self.HANDLED_SERVERFINISHED()
@ATMT.state(final=True)
def FINAL(self):
# We might call shutdown, but it may happen that the server
# did not wait for us to shutdown after answering our data query.
#self.socket.shutdown(1)
self.vprint("Closing client socket...")
self.socket.close()
self.vprint("Ending TLS client automaton.")