| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| |
| # nghttp2 - HTTP/2 C Library |
| |
| # Copyright (c) 2015 Tatsuhiro Tsujikawa |
| |
| # Permission is hereby granted, free of charge, to any person obtaining |
| # a copy of this software and associated documentation files (the |
| # "Software"), to deal in the Software without restriction, including |
| # without limitation the rights to use, copy, modify, merge, publish, |
| # distribute, sublicense, and/or sell copies of the Software, and to |
| # permit persons to whom the Software is furnished to do so, subject to |
| # the following conditions: |
| |
| # The above copyright notice and this permission notice shall be |
| # included in all copies or substantial portions of the Software. |
| |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE |
| # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
| # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
| # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
| |
| # This program was translated from the program originally developed by |
| # h2o project (https://github.com/h2o/h2o), written in Perl. It had |
| # the following copyright notice: |
| |
| # Copyright (c) 2015 DeNA Co., Ltd. |
| # |
| # Permission is hereby granted, free of charge, to any person obtaining a copy |
| # of this software and associated documentation files (the "Software"), to |
| # deal in the Software without restriction, including without limitation the |
| # rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
| # sell copies of the Software, and to permit persons to whom the Software is |
| # furnished to do so, subject to the following conditions: |
| # |
| # The above copyright notice and this permission notice shall be included in |
| # all copies or substantial portions of the Software. |
| # |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
| # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| # IN THE SOFTWARE. |
| |
| from __future__ import unicode_literals |
| import argparse |
| import io |
| import os |
| import os.path |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| |
| # make this program work for both Python 3 and Python 2. |
| try: |
| from urllib.parse import urlparse |
| stdout_bwrite = sys.stdout.buffer.write |
| except ImportError: |
| from urlparse import urlparse |
| stdout_bwrite = sys.stdout.write |
| |
| |
| def die(msg): |
| sys.stderr.write(msg) |
| sys.stderr.write('\n') |
| sys.exit(255) |
| |
| |
| def tempfail(msg): |
| sys.stderr.write(msg) |
| sys.stderr.write('\n') |
| sys.exit(os.EX_TEMPFAIL) |
| |
| |
| def run_openssl(args, allow_tempfail=False): |
| buf = io.BytesIO() |
| try: |
| p = subprocess.Popen(args, stdout=subprocess.PIPE) |
| except Exception as e: |
| die('failed to invoke {}:{}'.format(args, e)) |
| try: |
| while True: |
| data = p.stdout.read() |
| if len(data) == 0: |
| break |
| buf.write(data) |
| if p.wait() != 0: |
| raise Exception('nonzero return code {}'.format(p.returncode)) |
| return buf.getvalue() |
| except Exception as e: |
| msg = 'OpenSSL exitted abnormally: {}:{}'.format(args, e) |
| tempfail(msg) if allow_tempfail else die(msg) |
| |
| |
| def read_file(path): |
| with open(path, 'rb') as f: |
| return f.read() |
| |
| |
| def write_file(path, data): |
| with open(path, 'wb') as f: |
| f.write(data) |
| |
| |
| def detect_openssl_version(cmd): |
| return run_openssl([cmd, 'version']).decode('utf-8').strip() |
| |
| |
| def extract_ocsp_uri(cmd, cert_fn): |
| # obtain ocsp uri |
| ocsp_uri = run_openssl( |
| [cmd, 'x509', '-in', cert_fn, '-noout', |
| '-ocsp_uri']).decode('utf-8').strip() |
| |
| if not re.match(r'^https?://', ocsp_uri): |
| die('failed to extract ocsp URI from {}'.format(cert_fn)) |
| |
| return ocsp_uri |
| |
| |
| def save_issuer_certificate(issuer_fn, cert_fn): |
| # save issuer certificate |
| chain = read_file(cert_fn).decode('utf-8') |
| m = re.match( |
| r'.*?-----END CERTIFICATE-----.*?(-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----)', |
| chain, re.DOTALL) |
| if not m: |
| die('--issuer option was not used, and failed to extract issuer certificate from the certificate') |
| write_file(issuer_fn, (m.group(1) + '\n').encode('utf-8')) |
| |
| |
| def send_and_receive_ocsp(respder_fn, cmd, cert_fn, issuer_fn, ocsp_uri, |
| ocsp_host, openssl_version): |
| # obtain response (without verification) |
| sys.stderr.write('sending OCSP request to {}\n'.format(ocsp_uri)) |
| args = [ |
| cmd, 'ocsp', '-issuer', issuer_fn, '-cert', cert_fn, '-url', ocsp_uri, |
| '-noverify', '-respout', respder_fn |
| ] |
| ver = openssl_version.lower() |
| if ver.startswith('openssl 1.0.') or ver.startswith('libressl '): |
| args.extend(['-header', 'Host', ocsp_host]) |
| resp = run_openssl(args, allow_tempfail=True) |
| return resp.decode('utf-8') |
| |
| |
| def verify_response(cmd, tempdir, issuer_fn, respder_fn): |
| # verify the response |
| sys.stderr.write('verifying the response signature\n') |
| |
| verify_fn = os.path.join(tempdir, 'verify.out') |
| |
| # try from exotic options |
| allextra = [ |
| # for comodo |
| ['-VAfile', issuer_fn], |
| # these options are only available in OpenSSL >= 1.0.2 |
| ['-partial_chain', '-trusted_first', '-CAfile', issuer_fn], |
| # for OpenSSL <= 1.0.1 |
| ['-CAfile', issuer_fn], |
| ] |
| |
| for extra in allextra: |
| with open(verify_fn, 'w+b') as f: |
| args = [cmd, 'ocsp', '-respin', respder_fn] |
| args.extend(extra) |
| p = subprocess.Popen(args, stdout=f, stderr=f) |
| if p.wait() == 0: |
| # OpenSSL <= 1.0.1, openssl ocsp still returns exit |
| # code 0 even if verification was failed. So check |
| # the error message in stderr output. |
| f.seek(0) |
| if f.read().decode('utf-8').find( |
| 'Response Verify Failure') != -1: |
| continue |
| sys.stderr.write('verify OK (used: {})\n'.format(extra)) |
| return True |
| |
| sys.stderr.write(read_file(verify_fn).decode('utf-8')) |
| return False |
| |
| |
| def fetch_ocsp_response(cmd, cert_fn, tempdir, issuer_fn=None): |
| openssl_version = detect_openssl_version(cmd) |
| |
| sys.stderr.write( |
| 'fetch-ocsp-response (using {})\n'.format(openssl_version)) |
| |
| ocsp_uri = extract_ocsp_uri(cmd, cert_fn) |
| ocsp_host = urlparse(ocsp_uri).netloc |
| |
| if not issuer_fn: |
| issuer_fn = os.path.join(tempdir, 'issuer.crt') |
| save_issuer_certificate(issuer_fn, cert_fn) |
| |
| respder_fn = os.path.join(tempdir, 'resp.der') |
| resp = send_and_receive_ocsp( |
| respder_fn, cmd, cert_fn, issuer_fn, ocsp_uri, ocsp_host, |
| openssl_version) |
| |
| sys.stderr.write('{}\n'.format(resp)) |
| |
| # OpenSSL 1.0.2 still returns exit code 0 even if ocsp responder |
| # returned error status (e.g., trylater(3)) |
| if resp.find('Responder Error:') != -1: |
| raise Exception('responder returned error') |
| |
| if not verify_response(cmd, tempdir, issuer_fn, respder_fn): |
| tempfail('failed to verify the response') |
| |
| # success |
| res = read_file(respder_fn) |
| stdout_bwrite(res) |
| |
| |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser( |
| description= |
| '''The command issues an OCSP request for given server certificate, verifies the response and prints the resulting DER.''', |
| epilog= |
| '''The command exits 0 if successful, or 75 (EX_TEMPFAIL) on temporary error. Other exit codes may be returned in case of hard errors.''') |
| parser.add_argument( |
| '--issuer', |
| metavar='FILE', |
| help= |
| 'issuer certificate (if omitted, is extracted from the certificate chain)') |
| parser.add_argument('--openssl', |
| metavar='CMD', |
| help='openssl command to use (default: "openssl")', |
| default='openssl') |
| parser.add_argument('certificate', |
| help='path to certificate file to validate') |
| args = parser.parse_args() |
| |
| tempdir = None |
| try: |
| # Python3.2 has tempfile.TemporaryDirectory, which has nice |
| # feature to delete its tree by cleanup() function. We have |
| # to support Python2.7, so we have to do this manually. |
| tempdir = tempfile.mkdtemp() |
| fetch_ocsp_response(args.openssl, args.certificate, tempdir, |
| args.issuer) |
| finally: |
| if tempdir: |
| shutil.rmtree(tempdir) |