191 lines
7.4 KiB
Python
191 lines
7.4 KiB
Python
import inspect
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
|
|
from datetime import timedelta, datetime
|
|
from typing import List, Optional, Dict, Tuple, Union
|
|
|
|
from pyhttpd.certs import CertificateSpec
|
|
from pyhttpd.env import HttpdTestEnv, HttpdTestSetup
|
|
from pyhttpd.result import ExecResult
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class TlsTestSetup(HttpdTestSetup):
|
|
|
|
def __init__(self, env: 'HttpdTestEnv'):
|
|
super().__init__(env=env)
|
|
self.add_source_dir(os.path.dirname(inspect.getfile(TlsTestSetup)))
|
|
self.add_modules(["tls", "http2", "cgid", "watchdog", "proxy_http2"])
|
|
|
|
|
|
class TlsCipher:
|
|
|
|
def __init__(self, id: int, name: str, flavour: str,
|
|
min_version: float, max_version: float = None,
|
|
openssl: str = None):
|
|
self.id = id
|
|
self.name = name
|
|
self.flavour = flavour
|
|
self.min_version = min_version
|
|
self.max_version = max_version if max_version is not None else self.min_version
|
|
if openssl is None:
|
|
if name.startswith('TLS13_'):
|
|
openssl = re.sub(r'^TLS13_', 'TLS_', name)
|
|
else:
|
|
openssl = re.sub(r'^TLS_', '', name)
|
|
openssl = re.sub(r'_WITH_([^_]+)_', r'_\1_', openssl)
|
|
openssl = re.sub(r'_AES_(\d+)', r'_AES\1', openssl)
|
|
openssl = re.sub(r'(_POLY1305)_\S+$', r'\1', openssl)
|
|
openssl = re.sub(r'_', '-', openssl)
|
|
self.openssl_name = openssl
|
|
self.id_name = "TLS_CIPHER_0x{0:04x}".format(self.id)
|
|
|
|
def __repr__(self):
|
|
return self.name
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
class TlsTestEnv(HttpdTestEnv):
|
|
|
|
CURL_SUPPORTS_TLS_1_3 = None
|
|
|
|
@classmethod
|
|
def curl_supports_tls_1_3(cls) -> bool:
|
|
if cls.CURL_SUPPORTS_TLS_1_3 is None:
|
|
# Unfortunately, there is no reliable, platform-independant
|
|
# way to verify that TLSv1.3 is properly supported by curl.
|
|
#
|
|
# p = subprocess.run(['curl', '--tlsv1.3', 'https://shouldneverexistreally'],
|
|
# stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
|
# return code 6 means the site could not be resolved, but the
|
|
# tls parameter was recognized
|
|
cls.CURL_SUPPORTS_TLS_1_3 = False
|
|
return cls.CURL_SUPPORTS_TLS_1_3
|
|
|
|
|
|
# current rustls supported ciphers in their order of preference
|
|
# used to test cipher selection, see test_06_ciphers.py
|
|
RUSTLS_CIPHERS = [
|
|
TlsCipher(0x1303, "TLS13_CHACHA20_POLY1305_SHA256", "CHACHA", 1.3),
|
|
TlsCipher(0x1302, "TLS13_AES_256_GCM_SHA384", "AES", 1.3),
|
|
TlsCipher(0x1301, "TLS13_AES_128_GCM_SHA256", "AES", 1.3),
|
|
TlsCipher(0xcca9, "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256", "ECDSA", 1.2),
|
|
TlsCipher(0xcca8, "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256", "RSA", 1.2),
|
|
TlsCipher(0xc02c, "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "ECDSA", 1.2),
|
|
TlsCipher(0xc02b, "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "ECDSA", 1.2),
|
|
TlsCipher(0xc030, "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "RSA", 1.2),
|
|
TlsCipher(0xc02f, "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "RSA", 1.2),
|
|
]
|
|
|
|
def __init__(self, pytestconfig=None):
|
|
super().__init__(pytestconfig=pytestconfig)
|
|
self._domain_a = "a.mod-tls.test"
|
|
self._domain_b = "b.mod-tls.test"
|
|
self.add_httpd_conf([
|
|
f'<Directory "{self.server_dir}/htdocs/{self.domain_a}">',
|
|
' AllowOverride None',
|
|
' Require all granted',
|
|
' AddHandler cgi-script .py',
|
|
' Options +ExecCGI',
|
|
'</Directory>',
|
|
f'<Directory "{self.server_dir}/htdocs/{self.domain_b}">',
|
|
' AllowOverride None',
|
|
' Require all granted',
|
|
' AddHandler cgi-script .py',
|
|
' Options +ExecCGI',
|
|
'</Directory>',
|
|
f'<VirtualHost *:{self.http_port}>',
|
|
' ServerName localhost',
|
|
' DocumentRoot "htdocs"',
|
|
'</VirtualHost>',
|
|
f'<VirtualHost *:{self.http_port}>',
|
|
f' ServerName {self.domain_a}',
|
|
' DocumentRoot "htdocs/a.mod-tls.test"',
|
|
'</VirtualHost>',
|
|
f'<VirtualHost *:{self.http_port}>',
|
|
f' ServerName {self.domain_b}',
|
|
' DocumentRoot "htdocs/b.mod-tls.test"',
|
|
'</VirtualHost>',
|
|
])
|
|
self.add_cert_specs([
|
|
CertificateSpec(domains=[self.domain_a]),
|
|
CertificateSpec(domains=[self.domain_b], key_type='secp256r1', single_file=True),
|
|
CertificateSpec(domains=[self.domain_b], key_type='rsa4096'),
|
|
CertificateSpec(name="clientsX", sub_specs=[
|
|
CertificateSpec(name="user1", client=True, single_file=True),
|
|
CertificateSpec(name="user2", client=True, single_file=True),
|
|
CertificateSpec(name="user_expired", client=True,
|
|
single_file=True, valid_from=timedelta(days=-91),
|
|
valid_to=timedelta(days=-1)),
|
|
]),
|
|
CertificateSpec(name="clientsY", sub_specs=[
|
|
CertificateSpec(name="user1", client=True, single_file=True),
|
|
]),
|
|
CertificateSpec(name="user1", client=True, single_file=True),
|
|
])
|
|
self.add_httpd_log_modules(['tls'])
|
|
|
|
|
|
def setup_httpd(self, setup: TlsTestSetup = None):
|
|
if setup is None:
|
|
setup = TlsTestSetup(env=self)
|
|
super().setup_httpd(setup=setup)
|
|
|
|
@property
|
|
def domain_a(self) -> str:
|
|
return self._domain_a
|
|
|
|
@property
|
|
def domain_b(self) -> str:
|
|
return self._domain_b
|
|
|
|
def tls_get(self, domain, paths: Union[str, List[str]], options: List[str] = None, no_stdout_list = False) -> ExecResult:
|
|
if isinstance(paths, str):
|
|
paths = [paths]
|
|
urls = [f"https://{domain}:{self.https_port}{path}" for path in paths]
|
|
return self.curl_raw(urls=urls, options=options, no_stdout_list=no_stdout_list)
|
|
|
|
def tls_get_json(self, domain: str, path: str, options=None):
|
|
r = self.tls_get(domain=domain, paths=path, options=options)
|
|
return r.json
|
|
|
|
def run_diff(self, fleft: str, fright: str) -> ExecResult:
|
|
return self.run(['diff', '-u', fleft, fright])
|
|
|
|
def openssl(self, args: List[str]) -> ExecResult:
|
|
return self.run(['openssl'] + args)
|
|
|
|
def openssl_client(self, domain, extra_args: List[str] = None) -> ExecResult:
|
|
args = ["s_client", "-CAfile", self.ca.cert_file, "-servername", domain,
|
|
"-connect", "localhost:{port}".format(
|
|
port=self.https_port
|
|
)]
|
|
if extra_args:
|
|
args.extend(extra_args)
|
|
args.extend([])
|
|
return self.openssl(args)
|
|
|
|
OPENSSL_SUPPORTED_PROTOCOLS = None
|
|
|
|
@staticmethod
|
|
def openssl_supports_tls_1_3() -> bool:
|
|
if TlsTestEnv.OPENSSL_SUPPORTED_PROTOCOLS is None:
|
|
env = TlsTestEnv()
|
|
r = env.openssl(args=["ciphers", "-v"])
|
|
protos = set()
|
|
ciphers = set()
|
|
for line in r.stdout.splitlines():
|
|
m = re.match(r'^(\S+)\s+(\S+)\s+(.*)$', line)
|
|
if m:
|
|
ciphers.add(m.group(1))
|
|
protos.add(m.group(2))
|
|
TlsTestEnv.OPENSSL_SUPPORTED_PROTOCOLS = protos
|
|
TlsTestEnv.OPENSSL_SUPPORTED_CIPHERS = ciphers
|
|
return "TLSv1.3" in TlsTestEnv.OPENSSL_SUPPORTED_PROTOCOLS
|