boulder/test/helpers.py

121 lines
4.5 KiB
Python

#!/usr/bin/env python2.7
import base64
import os
import urllib2
import time
import re
import requests
import tempfile
import shutil
import atexit
import subprocess
tempdir = tempfile.mkdtemp()
@atexit.register
def stop():
shutil.rmtree(tempdir)
def run(cmd, **kwargs):
return subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, **kwargs)
def fetch_ocsp(request_bytes, url):
"""Fetch an OCSP response using POST, GET, and GET with URL encoding.
Returns a tuple of the responses.
"""
ocsp_req_b64 = base64.b64encode(request_bytes)
# Make the OCSP request three different ways: by POST, by GET, and by GET with
# URL-encoded parameters. All three should have an identical response.
get_response = urllib2.urlopen("%s/%s" % (url, ocsp_req_b64)).read()
get_encoded_response = urllib2.urlopen("%s/%s" % (url, urllib2.quote(ocsp_req_b64, safe = ""))).read()
post_response = urllib2.urlopen("%s/" % (url), request_bytes).read()
return (post_response, get_response, get_encoded_response)
def make_ocsp_req(cert_file, issuer_file):
"""Return the bytes of an OCSP request for the given certificate file."""
ocsp_req_file = os.path.join(tempdir, "ocsp.req")
# First generate the OCSP request in DER form
run("openssl ocsp -no_nonce -issuer %s -cert %s -reqout %s" % (
issuer_file, cert_file, ocsp_req_file))
with open(ocsp_req_file) as f:
ocsp_req = f.read()
return ocsp_req
def fetch_until(cert_file, issuer_file, url, initial, final):
"""Fetch OCSP for cert_file until OCSP status goes from initial to final.
Initial and final are treated as regular expressions. Any OCSP response
whose OpenSSL OCSP verify output doesn't match either initial or final is
a fatal error.
If OCSP responses by the three methods (POST, GET, URL-encoded GET) differ
from each other, that is a fatal error.
If we loop for more than five seconds, that is a fatal error.
Returns nothing on success.
"""
ocsp_request = make_ocsp_req(cert_file, issuer_file)
timeout = time.time() + 5
while True:
time.sleep(0.25)
if time.time() > timeout:
raise Exception("Timed out waiting for OCSP to go from '%s' to '%s'" % (
initial, final))
responses = fetch_ocsp(ocsp_request, url)
# This variable will be true at the end of the loop if all the responses
# matched the final state.
all_final = True
for resp in responses:
verify_output = ocsp_verify(cert_file, issuer_file, resp)
if re.search(initial, verify_output):
all_final = False
break
elif re.search(final, verify_output):
continue
else:
print verify_output
raise Exception("OCSP response didn't match '%s' or '%s'" %(
initial, final))
if all_final:
# Check that all responses were equal to each other.
for resp in responses:
if resp != responses[0]:
raise Exception("OCSP responses differed: %s vs %s" %(
base64.b64encode(responses[0]), base64.b64encode(resp)))
return
def ocsp_verify(cert_file, issuer_file, ocsp_response):
ocsp_resp_file = os.path.join(tempdir, "ocsp.resp")
with open(ocsp_resp_file, "w") as f:
f.write(ocsp_response)
output = run("openssl ocsp -no_nonce -issuer %s -cert %s \
-verify_other %s -CAfile test/test-root.pem \
-respin %s" % (issuer_file, cert_file, issuer_file, ocsp_resp_file))
# OpenSSL doesn't always return non-zero when response verify fails, so we
# also look for the string "Response Verify Failure"
verify_failure = "Response Verify Failure"
if re.search(verify_failure, output):
print output
raise Exception("OCSP verify failure")
return output
def wait_for_ocsp_good(cert_file, issuer_file, url):
fetch_until(cert_file, issuer_file, url, " unauthorized", ": good")
def wait_for_ocsp_revoked(cert_file, issuer_file, url):
fetch_until(cert_file, issuer_file, url, ": good", ": revoked")
def reset_akamai_purges():
requests.post("http://localhost:6789/debug/reset-purges")
def verify_akamai_purge():
response = requests.get("http://localhost:6789/debug/get-purges")
purgeData = response.json()
if len(purgeData["V3"]) is not 1:
raise Exception("Unexpected number of Akamai v3 purges")
reset_akamai_purges()