cve-2024-38477/httpd-2.4.59/test/modules/md/md_acme.py
2025-06-05 15:09:30 +02:00

126 lines
3.7 KiB
Python
Executable File

import logging
import os
import shutil
import subprocess
import time
from abc import ABCMeta, abstractmethod
from datetime import datetime, timedelta
from threading import Thread
from typing import Dict
from .md_env import MDTestEnv
log = logging.getLogger(__name__)
def monitor_proc(env: MDTestEnv, proc):
_env = env
proc.wait()
class ACMEServer:
__metaclass__ = ABCMeta
@abstractmethod
def start(self):
raise NotImplementedError
@abstractmethod
def stop(self):
raise NotImplementedError
@abstractmethod
def install_ca_bundle(self, dest):
raise NotImplementedError
class MDPebbleRunner(ACMEServer):
def __init__(self, env: MDTestEnv, configs: Dict[str, str]):
self.env = env
self.configs = configs
self._current = 'default'
self._pebble = None
self._challtestsrv = None
self._log = None
def start(self, config: str = None):
if config is not None and config != self._current:
# change, tear down and start again
assert config in self.configs
self.stop()
self._current = config
elif self._pebble is not None:
# already running
return
args = ['pebble', '-config', self.configs[self._current], '-dnsserver', ':8053']
env = {}
env.update(os.environ)
env['PEBBLE_VA_NOSLEEP'] = '1'
self._log = open(f'{self.env.gen_dir}/pebble.log', 'w')
self._pebble = subprocess.Popen(args=args, env=env,
stdout=self._log, stderr=self._log)
t = Thread(target=monitor_proc, args=(self.env, self._pebble))
t.start()
args = ['pebble-challtestsrv', '-http01', '', '-https01', '', '-tlsalpn01', '']
self._challtestsrv = subprocess.Popen(args, stdout=self._log, stderr=self._log)
t = Thread(target=monitor_proc, args=(self.env, self._challtestsrv))
t.start()
self.install_ca_bundle(self.env.acme_ca_pemfile)
# disable ipv6 default address, this gives trouble inside docker
end = datetime.now() + timedelta(seconds=5)
while True:
r = self.env.run(['curl', 'localhost:8055/'])
if r.exit_code == 0:
break
if datetime.now() > end:
raise TimeoutError(f'unable to contact pebble-challtestsrv on localhost:8055')
time.sleep(.1)
r = self.env.run(['curl', '-d', f'{{"ip":""}}',
'localhost:8055/set-default-ipv6'])
assert r.exit_code == 0, f"{r}"
def stop(self):
if self._pebble:
self._pebble.terminate()
self._pebble = None
if self._challtestsrv:
self._challtestsrv.terminate()
self._challtestsrv = None
if self._log:
self._log.close()
self._log = None
def install_ca_bundle(self, dest):
shutil.copyfile(self.env.ca.cert_file, dest)
end = datetime.now() + timedelta(seconds=20)
while datetime.now() < end:
r = self.env.curl_get('https://localhost:15000/roots/0', insecure=True)
if r.exit_code == 0:
with open(dest, 'a') as fd:
fd.write(r.stdout)
break
class MDBoulderRunner(ACMEServer):
def __init__(self, env: MDTestEnv):
self.env = env
self.install_ca_bundle(self.env.acme_ca_pemfile)
def start(self, config=None):
pass
def stop(self):
pass
def install_ca_bundle(self, dest):
r = self.env.run([
'docker', 'exec', 'boulder_boulder_1', 'bash', '-c', "cat /tmp/root*.pem"
])
assert r.exit_code == 0
with open(dest, 'w') as fd:
fd.write(r.stdout)