diff --git a/README.md b/README.md index 91c6f19..8725479 100644 --- a/README.md +++ b/README.md @@ -1,78 +1,76 @@ # acme-chief acme-chief is a Python 3 application that is to be used to centrally request configured TLS certificates from ACME servers, then make them available to authorised API users. The API is intended to sit behind uwsgi and nginx running TLS client certificate checking based on a private CA. It can support http-01 and dns-01 challenges. acme-chief itself consists of two parts: * The backend in acme_chief.py, which is responsible for generating initial certificates and then replacing them with live ones from the specified ACME server. * The API in api.py/uwsgi.py, which is responsible for taking requests from users, and distributing the certificates saved by the backend. It is intended for use in multi-server environments where any one of several actual servers with no shared filesystem are required to terminate TLS connections, where it is not feasible to have each server requesting their own certificates from ACME servers. One thing to note is that there are two stages when acme-chief is outputting certificates: the initial, self-signed certificate, and the trusted one issued through ACME. The initial stage is done to help with cases where servers need a dummy certificate to start up, which may be required in order to *get* the publicly-trusted certificates at all (thus resolving a chicken-and-egg problem). A side-effect of this is zero-byte PEM files for the chain, which for self-signed certificates is empty. One variant of the API permits simple use by puppet. It is hoped that eventually this will be used to handle certificates for wikipedia.org and co. The license in use is GPL v3+ and the main developers are Alex Monk and Valentin Gutierrez . ## Requirements acme-chief requires python 3.7 or higher. ## Configuration file example acme-chief expects its configuration file in /etc/acme-chief/config.yaml by default ```yaml accounts: - id: account_id_here directory: "https://acme-v02.api.letsencrypt.org/directory" certificates: testing: CN: acmechieftest.beta.wmflabs.org SNI: - acmechieftest.beta.wmflabs.org staging_time: 3600 challenge: http-01 authorized_hosts: - cp1008.eqiad.wmnet authorized_regexes: - '^cp100[1-9]\.eqiad\.wmnet$' challenges: dns-01: validation_dns_servers: - 127.0.0.1 sync_dns_servers: - 127.0.0.1 resolver_port: 53 zone_update_cmd: /bin/echo zone_update_cmd_timeout: 60.0 issuing_ca: 'letsencrypt.org' ns_records: - ns0.wikimedia.org. - ns1.wikimedia.org. - ns2.wikimedia.org. http-01: issuing_ca: 'letsencrypt.org' api: clients_root_directory: /etc/acmecerts -watchdog: - systemd: true ``` It also supports per-certificate configuration in /etc/acme-chief/conf.d. conf.d file example: ```yaml certname: default_account_certificate hostname: deployment-acmechief-testclient02.deployment-prep.eqiad.wmflabs ``` diff --git a/acme_chief/config.py b/acme_chief/config.py index 59558d7..4822823 100644 --- a/acme_chief/config.py +++ b/acme_chief/config.py @@ -1,184 +1,184 @@ """ Module containing configuration handling classes Alex Monk 2018 Valentin Gutierrez 2018-2021 """ import collections import datetime import logging import os import re import yaml from acme_chief.acme_requests import ACMEChallengeType logger = logging.getLogger(__name__) # pylint: disable=invalid-name # default values that can be customized via the config file. Check the README for a valid example DEFAULT_DNS_ZONE_UPDATE_CMD = '/bin/echo' DEFAULT_DNS_ZONE_UPDATE_CMD_TIMEOUT = 60.0 DEFAULT_DNS_RESOLVER_PORT = 53 DEFAULT_CERTIFICATE_STAGING_TIME = 3600 DEFAULT_OCSP_RESPONSE_UPDATE_THRESHOLD = 172800 DEFAULT_API_CLIENTS_ROOT_DIRECTORY = '/etc/acmecerts' class ACMEChiefConfig: """Class representing ACMEChief configuration""" def __init__(self, *, accounts, certificates, default_account, authorized_hosts, authorized_regexes, challenges, api, watchdog): self.accounts = accounts self.certificates = certificates self.default_account = default_account self.authorized_hosts = authorized_hosts self.authorized_regexes = authorized_regexes self.challenges = {} self.api = api self.watchdog = watchdog for challenge_type, challenge_config in challenges.items(): if challenge_type == 'dns-01': if not ('zone_update_cmd' in challenge_config and os.access(challenge_config['zone_update_cmd'], os.X_OK)): logger.warning("Missing/invalid DNS zone updater CMD, using the default one: %s", DEFAULT_DNS_ZONE_UPDATE_CMD) challenge_config['zone_update_cmd'] = DEFAULT_DNS_ZONE_UPDATE_CMD try: challenge_config['zone_update_cmd_timeout'] = float(challenge_config['zone_update_cmd_timeout']) except (KeyError, ValueError): logger.warning("Missing/invalid DNS zone updater CMD timeout, using the default one: %.2f", DEFAULT_DNS_ZONE_UPDATE_CMD_TIMEOUT) challenge_config['zone_update_cmd_timeout'] = DEFAULT_DNS_ZONE_UPDATE_CMD_TIMEOUT try: challenge_config['resolver_port'] = int(challenge_config['resolver_port']) except (KeyError, ValueError): logger.warning("Missing/invalid DNS port, using the default one: %i", DEFAULT_DNS_RESOLVER_PORT) challenge_config['resolver_port'] = DEFAULT_DNS_RESOLVER_PORT self.challenges[ACMEChallengeType.DNS01] = challenge_config elif challenge_type == 'http-01': self.challenges[ACMEChallengeType.HTTP01] = challenge_config else: logger.warning("Unexpected challenge type found in configuration: %s", challenge_type) if ACMEChallengeType.DNS01 not in self.challenges: logger.warning('Missing dns-01 challenge configuration') @staticmethod def load(file_name, confd_path=None): # pylint: disable=too-many-locals,too-many-branches,too-many-statements """Load a config from the specified file_name and an optional conf.d path""" logger.debug("Loading config file: %s", file_name) if confd_path is None: confd_path = os.path.dirname(file_name) with open(file_name, encoding='ascii') as config_file: config = yaml.safe_load(config_file) default_account = ACMEChiefConfig._get_default_account(config['accounts']) authorized_hosts = collections.defaultdict(set) authorized_regexes = collections.defaultdict(set) # TODO: Consider getting rid of conf.d/ support in the future for fname in os.listdir(confd_path): file_path = os.path.join(confd_path, fname) logger.debug("Loading config file: %s", file_path) with open(file_path, encoding='ascii') as conf_f: conf_data = yaml.safe_load(conf_f) if conf_data['certname'] not in config['certificates']: logger.warning("Certificate %s referenced on %s not found in general config", conf_data['certname'], file_path) continue authorized_hosts[conf_data['certname']].add(conf_data['hostname']) for cert_name, cert_config in config['certificates'].items(): staging_time_seconds = cert_config.get('staging_time', DEFAULT_CERTIFICATE_STAGING_TIME) try: cert_config['staging_time'] = datetime.timedelta(seconds=int(staging_time_seconds)) except TypeError: logger.warning("Ignoring invalid staging time %s for certificate %s. Using the default one: %s", staging_time_seconds, cert_name, DEFAULT_CERTIFICATE_STAGING_TIME) cert_config['staging_time'] = datetime.timedelta(seconds=DEFAULT_CERTIFICATE_STAGING_TIME) ocsp_update_threshold_seconds = cert_config.get('ocsp_update_threshold', DEFAULT_OCSP_RESPONSE_UPDATE_THRESHOLD) try: cert_config['ocsp_update_threshold'] = datetime.timedelta(seconds=int(ocsp_update_threshold_seconds)) except TypeError: logger.warning("Ignoring bogus OCSP update threshold %s for certificate %s. Using the default one: %s", staging_time_seconds, cert_name, DEFAULT_CERTIFICATE_STAGING_TIME) cert_config['ocsp_update_threshold'] = datetime.timedelta(seconds=int(ocsp_update_threshold_seconds)) if cert_config['CN'] not in cert_config['SNI']: cert_config['SNI'].append(cert_config['CN']) logger.warning("Appending CN to SNI list for certificate %s", cert_name) if 'prevalidate' not in cert_config: cert_config['prevalidate'] = False if 'skip_invalid_snis' in cert_config: if not cert_config['prevalidate']: logging.warning("Ignoring skip_invalid_snis because prevalidate is False for certificate %s", cert_name) cert_config['skip_invalid_snis'] = False else: cert_config['skip_invalid_snis'] = False if 'authorized_hosts' in cert_config: authorized_hosts[cert_name].update(cert_config['authorized_hosts']) if 'authorized_regexes' in cert_config: for regex in cert_config['authorized_regexes']: try: authorized_regexes[cert_name].add(re.compile(regex)) except (re.error, TypeError): logger.warning("Ignoring invalid authorized regex %s for certificate %s", regex, cert_name) continue api = config.get('api', {'clients_root_directory': DEFAULT_API_CLIENTS_ROOT_DIRECTORY}) - watchdog = config.get('watchdog', {'systemd': False}) - - if 'systemd' in watchdog: - watchdog['systemd'] = bool(watchdog['systemd']) - else: - watchdog['systemd'] = False + watchdog = {'systemd': False} + try: + if int(os.environ['WATCHDOG_USEC']) > 0: + watchdog['systemd'] = True + except (KeyError, ValueError): + pass return ACMEChiefConfig(accounts=config['accounts'], certificates=config['certificates'], default_account=default_account, authorized_hosts=dict(authorized_hosts), authorized_regexes=dict(authorized_regexes), challenges=config['challenges'], api=api, watchdog=watchdog) @staticmethod def _get_default_account(accounts): for account in accounts: if 'default' in account and account['default'] is True: return account['id'] return accounts[0]['id'] def check_access(self, hostname, cert_name): """Returns True if hostname is allowed to fetch the specified certificate. False otherwise""" if hostname in self.authorized_hosts.get(cert_name, ()): return True try: for regex in self.authorized_regexes[cert_name]: if regex.fullmatch(hostname) is not None: return True except (KeyError, TypeError, re.error): return False return False diff --git a/tests/test_config.py b/tests/test_config.py index b24dc58..3e960cf 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,165 +1,191 @@ import os import tempfile import unittest from datetime import timedelta from unittest import mock from acme_chief.acme_chief import ACMEChief from acme_chief.acme_requests import ACMEChallengeType from acme_chief.config import ACMEChiefConfig VALID_CONFIG_EXAMPLE = ''' accounts: - id: ee566f9e436e120082f0770c0d58dd6d directory: https://acme-staging-v02.api.letsencrypt.org/directory default: true - id: 621b49f9c6ccbbfbff9acb6e18f71205 directory: https://127.0.0.1:14000/dir certificates: default_account_certificate: CN: acmechieftest.beta.wmflabs.org SNI: - acmechieftest.beta.wmflabs.org challenge: http-01 authorized_hosts: - deployment-acmechief-testclient03.deployment-prep.eqiad.wmflabs non_default_account_certificate: account: 621b49f9c6ccbbfbff9acb6e18f71205 CN: 'test.wmflabs.org' SNI: - '*.test.wmflabs.org' challenge: dns-01 staging_time: 7200 prevalidate: true skip_invalid_snis: true certificate_auth_by_regex: CN: regex.test.wmflabs.org SNI: - regex.test.wmflabs.org challenge: http-01 authorized_regexes: - '^deployment-acmechief-testclient0[1-3]\.deployment-prep\.eqiad\.wmflabs$' staging_time: 3600 challenges: dns-01: validation_dns_servers: - 127.0.0.1 sync_dns_servers: - 127.0.0.1 zone_update_cmd: /usr/bin/dns-update-zone zone_update_cmd_timeout: 30.5 issuing_ca: 'letsencrypt.org' ns_records: - ns0.wikimedia.org. - ns1.wikimedia.org. - ns2.wikimedia.org. api: clients_root_directory: /etc/custom-root-directory ''' VALID_CONFIG_EXAMPLE_WITHOUT_DEFAULT_ACCOUNT = ''' accounts: - id: 621b49f9c6ccbbfbff9acb6e18f71205 directory: https://127.0.0.1:14000/dir - id: ee566f9e436e120082f0770c0d58dd6d directory: https://acme-staging-v02.api.letsencrypt.org/directory certificates: default_account_certificate: CN: acmechieftest.beta.wmflabs.org SNI: - acmechieftest.beta.wmflabs.org challenge: http-01 staging_time: 3600 non_default_account_certificate: account: 621b49f9c6ccbbfbff9acb6e18f71205 CN: 'test.wmflabs.org' SNI: - '*.test.wmflabs.org' challenge: dns-01 staging_time: 3600 challenges: dns-01: validation_dns_servers: - 127.0.0.1 sync_dns_servers: - 127.0.0.1 ''' CONFD_VALID_FILE_EXAMPLE = ''' certname: default_account_certificate hostname: deployment-acmechief-testclient02.deployment-prep.eqiad.wmflabs ''' class ACMEChiefConfigTest(unittest.TestCase): def setUp(self): self.base_path = tempfile.TemporaryDirectory() self.config_path = os.path.join(self.base_path.name, ACMEChief.config_path) self.confd_path = os.path.join(self.base_path.name, ACMEChief.confd_path) os.mkdir(self.confd_path) with open(os.path.join(self.confd_path, 'confd_file_example.yaml'), 'w') as confd_file: confd_file.write(CONFD_VALID_FILE_EXAMPLE) def tearDown(self): self.base_path.cleanup() @mock.patch('os.access', return_value=True) def test_config_parsing(self, access_mock): with open(self.config_path, 'w') as config_file: config_file.write(VALID_CONFIG_EXAMPLE) config = ACMEChiefConfig.load(self.config_path, confd_path=self.confd_path) self.assertEqual(len(config.accounts), 2) self.assertEqual(len(config.certificates), 3) self.assertEqual(config.default_account, 'ee566f9e436e120082f0770c0d58dd6d') self.assertIn('default_account_certificate', config.authorized_hosts) self.assertIn('deployment-acmechief-testclient02.deployment-prep.eqiad.wmflabs', config.authorized_hosts['default_account_certificate']) self.assertIn('deployment-acmechief-testclient03.deployment-prep.eqiad.wmflabs', config.authorized_hosts['default_account_certificate']) self.assertIn(ACMEChallengeType.DNS01, config.challenges) self.assertEqual(config.certificates['default_account_certificate']['staging_time'], timedelta(seconds=3600)) self.assertEqual(config.certificates['non_default_account_certificate']['staging_time'], timedelta(seconds=7200)) self.assertFalse(config.certificates['default_account_certificate']['prevalidate']) self.assertTrue(config.certificates['non_default_account_certificate']['prevalidate']) self.assertFalse(config.certificates['default_account_certificate']['skip_invalid_snis']) self.assertTrue(config.certificates['non_default_account_certificate']['skip_invalid_snis']) self.assertIn(config.certificates['non_default_account_certificate']['CN'], config.certificates['non_default_account_certificate']['SNI']) self.assertEqual(config.challenges[ACMEChallengeType.DNS01]['zone_update_cmd'], '/usr/bin/dns-update-zone') self.assertEqual(config.challenges[ACMEChallengeType.DNS01]['zone_update_cmd_timeout'], 30.5) access_mock.assert_called_once_with('/usr/bin/dns-update-zone', os.X_OK) self.assertEqual(config.challenges[ACMEChallengeType.DNS01]['issuing_ca'], 'letsencrypt.org') self.assertEqual(config.challenges[ACMEChallengeType.DNS01]['ns_records'], ['ns0.wikimedia.org.', 'ns1.wikimedia.org.', 'ns2.wikimedia.org.']) self.assertEqual(config.challenges[ACMEChallengeType.DNS01]['resolver_port'], 53) self.assertEqual(config.api['clients_root_directory'], '/etc/custom-root-directory') def test_config_without_explicit_default(self): with open(self.config_path, 'w') as config_file: config_file.write(VALID_CONFIG_EXAMPLE_WITHOUT_DEFAULT_ACCOUNT) config = ACMEChiefConfig.load(self.config_path, confd_path=self.confd_path) self.assertEqual(config.default_account, '621b49f9c6ccbbfbff9acb6e18f71205') def test_access_check(self): with open(self.config_path, 'w') as config_file: config_file.write(VALID_CONFIG_EXAMPLE) config = ACMEChiefConfig.load(self.config_path, confd_path=self.confd_path) self.assertTrue(config.check_access('deployment-acmechief-testclient03.deployment-prep.eqiad.wmflabs', 'default_account_certificate')) self.assertTrue(config.check_access('deployment-acmechief-testclient02.deployment-prep.eqiad.wmflabs', 'default_account_certificate')) self.assertFalse(config.check_access('deployment-acmechief-testclient04.deployment-prep.eqiad.wmflabs', 'default_account_certificate')) self.assertTrue(config.check_access('deployment-acmechief-testclient03.deployment-prep.eqiad.wmflabs', 'certificate_auth_by_regex')) self.assertTrue(config.check_access('deployment-acmechief-testclient02.deployment-prep.eqiad.wmflabs', 'certificate_auth_by_regex')) self.assertFalse(config.check_access('deployment-acmechief-testclient04.deployment-prep.eqiad.wmflabs', 'certificate_auth_by_regex')) + + @mock.patch.dict('os.environ', {}, clear=True) + def test_watchdog_usec_not_present(self): + self.assertNotIn('WATCHDOG_USEC', os.environ) # sanity check + + with open(self.config_path, 'w') as config_file: + config_file.write(VALID_CONFIG_EXAMPLE) + + config = ACMEChiefConfig.load(self.config_path, confd_path=self.confd_path) + self.assertFalse(config.watchdog['systemd']) + + @mock.patch.dict('os.environ', {'WATCHDOG_USEC': '0'}, clear=True) + def test_watchdog_usec_is_zero(self): + with open(self.config_path, 'w') as config_file: + config_file.write(VALID_CONFIG_EXAMPLE) + + config = ACMEChiefConfig.load(self.config_path, confd_path=self.confd_path) + self.assertFalse(config.watchdog['systemd']) + + @mock.patch.dict('os.environ', {'WATCHDOG_USEC': '1000'}, clear=True) + def test_watchdog_usec_enabled(self): + with open(self.config_path, 'w') as config_file: + config_file.write(VALID_CONFIG_EXAMPLE) + + config = ACMEChiefConfig.load(self.config_path, confd_path=self.confd_path) + self.assertTrue(config.watchdog['systemd'])