"""Nginx Configuration""" import logging import os import re import shutil import socket import subprocess import sys import OpenSSL import zope.interface from acme import challenges from acme import crypto_util as acme_crypto_util from letsencrypt import achallenges from letsencrypt import constants as core_constants from letsencrypt import crypto_util from letsencrypt import errors from letsencrypt import interfaces from letsencrypt import le_util from letsencrypt import reverter from letsencrypt.plugins import common from letsencrypt_nginx import constants from letsencrypt_nginx import dvsni from letsencrypt_nginx import obj from letsencrypt_nginx import parser logger = logging.getLogger(__name__) class NginxConfigurator(common.Plugin): # pylint: disable=too-many-instance-attributes,too-many-public-methods """Nginx configurator. .. todo:: Add proper support for comments in the config. Currently, config files modified by the configurator will lose all their comments. :ivar config: Configuration. :type config: :class:`~letsencrypt.interfaces.IConfig` :ivar parser: Handles low level parsing :type parser: :class:`~letsencrypt_nginx.parser` :ivar str save_notes: Human-readable config change notes :ivar reverter: saves and reverts checkpoints :type reverter: :class:`letsencrypt.reverter.Reverter` :ivar tup version: version of Nginx """ zope.interface.implements(interfaces.IAuthenticator, interfaces.IInstaller) zope.interface.classProvides(interfaces.IPluginFactory) description = "Nginx Web Server" @classmethod def add_parser_arguments(cls, add): add("server-root", default=constants.CLI_DEFAULTS["server_root"], help="Nginx server root directory.") add("ctl", default=constants.CLI_DEFAULTS["ctl"], help="Path to the " "'nginx' binary, used for 'configtest' and retrieving nginx " "version number.") @property def nginx_conf(self): """Nginx config file path.""" return os.path.join(self.conf("server_root"), "nginx.conf") def __init__(self, *args, **kwargs): """Initialize an Nginx Configurator. :param tup version: version of Nginx as a tuple (1, 4, 7) (used mostly for unittesting) """ version = kwargs.pop("version", None) super(NginxConfigurator, self).__init__(*args, **kwargs) # Verify that all directories and files exist with proper permissions self._verify_setup() # Files to save self.save_notes = "" # Add number of outstanding challenges self._chall_out = 0 # These will be set in the prepare function self.parser = None self.version = version self._enhance_func = {} # TODO: Support at least redirects # Set up reverter self.reverter = reverter.Reverter(self.config) self.reverter.recovery_routine() @property def mod_ssl_conf(self): """Full absolute path to SSL configuration file.""" return os.path.join(self.config.config_dir, constants.MOD_SSL_CONF_DEST) # This is called in determine_authenticator and determine_installer def prepare(self): """Prepare the authenticator/installer.""" self.parser = parser.NginxParser( self.conf('server-root'), self.mod_ssl_conf) # Set Version if self.version is None: self.version = self.get_version() temp_install(self.mod_ssl_conf) # Entry point in main.py for installing cert def deploy_cert(self, domain, cert_path, key_path, chain_path=None): # pylint: disable=unused-argument """Deploys certificate to specified virtual host. .. note:: Aborts if the vhost is missing ssl_certificate or ssl_certificate_key. .. note:: Nginx doesn't have a cert chain directive, so the last parameter is always ignored. It expects the cert file to have the concatenated chain. .. note:: This doesn't save the config files! """ vhost = self.choose_vhost(domain) directives = [['ssl_certificate', cert_path], ['ssl_certificate_key', key_path]] try: self.parser.add_server_directives(vhost.filep, vhost.names, directives, True) logger.info("Deployed Certificate to VirtualHost %s for %s", vhost.filep, vhost.names) except errors.MisconfigurationError: logger.warn( "Cannot find a cert or key directive in %s for %s. " "VirtualHost was not modified.", vhost.filep, vhost.names) # Presumably break here so that the virtualhost is not modified return False self.save_notes += ("Changed vhost at %s with addresses of %s\n" % (vhost.filep, ", ".join(str(addr) for addr in vhost.addrs))) self.save_notes += "\tssl_certificate %s\n" % cert_path self.save_notes += "\tssl_certificate_key %s\n" % key_path ####################### # Vhost parsing methods ####################### def choose_vhost(self, target_name): """Chooses a virtual host based on the given domain name. .. note:: This makes the vhost SSL-enabled if it isn't already. Follows Nginx's server block selection rules preferring blocks that are already SSL. .. todo:: This should maybe return list if no obvious answer is presented. .. todo:: The special name "$hostname" corresponds to the machine's hostname. Currently we just ignore this. :param str target_name: domain name :returns: ssl vhost associated with name :rtype: :class:`~letsencrypt_nginx.obj.VirtualHost` """ vhost = None matches = self._get_ranked_matches(target_name) if not matches: # No matches. Create a new vhost with this name in nginx.conf. filep = self.parser.loc["root"] new_block = [['server'], [['server_name', target_name]]] self.parser.add_http_directives(filep, new_block) vhost = obj.VirtualHost(filep, set([]), False, True, set([target_name]), list(new_block[1])) elif matches[0]['rank'] in xrange(2, 6): # Wildcard match - need to find the longest one rank = matches[0]['rank'] wildcards = [x for x in matches if x['rank'] == rank] vhost = max(wildcards, key=lambda x: len(x['name']))['vhost'] else: vhost = matches[0]['vhost'] if vhost is not None: if not vhost.ssl: self._make_server_ssl(vhost) return vhost def _get_ranked_matches(self, target_name): """Returns a ranked list of vhosts that match target_name. :param str target_name: The name to match :returns: list of dicts containing the vhost, the matching name, and the numerical rank :rtype: list """ # Nginx chooses a matching server name for a request with precedence: # 1. exact name match # 2. longest wildcard name starting with * # 3. longest wildcard name ending with * # 4. first matching regex in order of appearance in the file matches = [] for vhost in self.parser.get_vhosts(): name_type, name = parser.get_best_match(target_name, vhost.names) if name_type == 'exact': matches.append({'vhost': vhost, 'name': name, 'rank': 0 if vhost.ssl else 1}) elif name_type == 'wildcard_start': matches.append({'vhost': vhost, 'name': name, 'rank': 2 if vhost.ssl else 3}) elif name_type == 'wildcard_end': matches.append({'vhost': vhost, 'name': name, 'rank': 4 if vhost.ssl else 5}) elif name_type == 'regex': matches.append({'vhost': vhost, 'name': name, 'rank': 6 if vhost.ssl else 7}) return sorted(matches, key=lambda x: x['rank']) def get_all_names(self): """Returns all names found in the Nginx Configuration. :returns: All ServerNames, ServerAliases, and reverse DNS entries for virtual host addresses :rtype: set """ all_names = set() # Kept in same function to avoid multiple compilations of the regex priv_ip_regex = (r"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|" r"(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)") private_ips = re.compile(priv_ip_regex) hostname_regex = r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$" hostnames = re.compile(hostname_regex, re.IGNORECASE) for vhost in self.parser.get_vhosts(): all_names.update(vhost.names) for addr in vhost.addrs: host = addr.get_addr() if hostnames.match(host): # If it's a hostname, add it to the names. all_names.add(host) elif not private_ips.match(host): # If it isn't a private IP, do a reverse DNS lookup # TODO: IPv6 support try: socket.inet_aton(host) all_names.add(socket.gethostbyaddr(host)[0]) except (socket.error, socket.herror, socket.timeout): continue return all_names def _get_snakeoil_paths(self): # TODO: generate only once tmp_dir = os.path.join(self.config.work_dir, "snakeoil") le_key = crypto_util.init_save_key( key_size=1024, key_dir=tmp_dir, keyname="key.pem") key = OpenSSL.crypto.load_privatekey( OpenSSL.crypto.FILETYPE_PEM, le_key.pem) cert = acme_crypto_util.gen_ss_cert(key, domains=[socket.gethostname()]) cert_path = os.path.join(tmp_dir, "cert.pem") cert_pem = OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, cert) with open(cert_path, 'w') as cert_file: cert_file.write(cert_pem) return cert_path, le_key.file def _make_server_ssl(self, vhost): """Make a server SSL. Make a server SSL based on server_name and filename by adding a ``listen IConfig.dvsni_port ssl`` directive to the server block. .. todo:: Maybe this should create a new block instead of modifying the existing one? :param vhost: The vhost to add SSL to. :type vhost: :class:`~letsencrypt_nginx.obj.VirtualHost` """ snakeoil_cert, snakeoil_key = self._get_snakeoil_paths() ssl_block = [['listen', '{0} ssl'.format(self.config.dvsni_port)], # access and error logs necessary for integration # testing (non-root) ['access_log', os.path.join( self.config.work_dir, 'access.log')], ['error_log', os.path.join( self.config.work_dir, 'error.log')], ['ssl_certificate', snakeoil_cert], ['ssl_certificate_key', snakeoil_key], ['include', self.parser.loc["ssl_options"]]] self.parser.add_server_directives( vhost.filep, vhost.names, ssl_block) vhost.ssl = True vhost.raw.extend(ssl_block) vhost.addrs.add(obj.Addr('', str(self.config.dvsni_port), True, False)) def get_all_certs_keys(self): """Find all existing keys, certs from configuration. :returns: list of tuples with form [(cert, key, path)] cert - str path to certificate file key - str path to associated key file path - File path to configuration file. :rtype: set """ return self.parser.get_all_certs_keys() ################################## # enhancement methods (IInstaller) ################################## def supported_enhancements(self): # pylint: disable=no-self-use """Returns currently supported enhancements.""" return [] def enhance(self, domain, enhancement, options=None): """Enhance configuration. :param str domain: domain to enhance :param str enhancement: enhancement type defined in :const:`~letsencrypt.constants.ENHANCEMENTS` :param options: options for the enhancement See :const:`~letsencrypt.constants.ENHANCEMENTS` documentation for appropriate parameter. """ try: return self._enhance_func[enhancement]( self.choose_vhost(domain), options) except (KeyError, ValueError): raise errors.PluginError( "Unsupported enhancement: {0}".format(enhancement)) except errors.PluginError: logger.warn("Failed %s for %s", enhancement, domain) ###################################### # Nginx server management (IInstaller) ###################################### def restart(self): """Restarts nginx server. :returns: Success :rtype: bool """ return nginx_restart(self.conf('ctl'), self.nginx_conf) def config_test(self): # pylint: disable=no-self-use """Check the configuration of Nginx for errors. :returns: Success :rtype: bool """ try: proc = subprocess.Popen( [self.conf('ctl'), "-c", self.nginx_conf, "-t"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() except (OSError, ValueError): logger.fatal("Unable to run nginx config test") sys.exit(1) if proc.returncode != 0: # Enter recovery routine... logger.error("Config test failed\n%s\n%s", stdout, stderr) return False return True def _verify_setup(self): """Verify the setup to ensure safe operating environment. Make sure that files/directories are setup with appropriate permissions Aim for defensive coding... make sure all input files have permissions of root. """ uid = os.geteuid() le_util.make_or_verify_dir( self.config.work_dir, core_constants.CONFIG_DIRS_MODE, uid) le_util.make_or_verify_dir( self.config.backup_dir, core_constants.CONFIG_DIRS_MODE, uid) le_util.make_or_verify_dir( self.config.config_dir, core_constants.CONFIG_DIRS_MODE, uid) def get_version(self): """Return version of Nginx Server. Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7)) :returns: version :rtype: tuple :raises .PluginError: Unable to find Nginx version or version is unsupported """ try: proc = subprocess.Popen( [self.conf('ctl'), "-c", self.nginx_conf, "-V"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) text = proc.communicate()[1] # nginx prints output to stderr except (OSError, ValueError) as error: logging.debug(error, exc_info=True) raise errors.PluginError( "Unable to run %s -V" % self.conf('ctl')) version_regex = re.compile(r"nginx/([0-9\.]*)", re.IGNORECASE) version_matches = version_regex.findall(text) sni_regex = re.compile(r"TLS SNI support enabled", re.IGNORECASE) sni_matches = sni_regex.findall(text) ssl_regex = re.compile(r" --with-http_ssl_module") ssl_matches = ssl_regex.findall(text) if not version_matches: raise errors.PluginError("Unable to find Nginx version") if not ssl_matches: raise errors.PluginError( "Nginx build is missing SSL module (--with-http_ssl_module).") if not sni_matches: raise errors.PluginError("Nginx build doesn't support SNI") nginx_version = tuple([int(i) for i in version_matches[0].split(".")]) # nginx < 0.8.48 uses machine hostname as default server_name instead of # the empty string if nginx_version < (0, 8, 48): raise errors.PluginError("Nginx version must be 0.8.48+") return nginx_version def more_info(self): """Human-readable string to help understand the module""" return ( "Configures Nginx to authenticate and install HTTPS.{0}" "Server root: {root}{0}" "Version: {version}".format( os.linesep, root=self.parser.loc["root"], version=".".join(str(i) for i in self.version)) ) ################################################### # Wrapper functions for Reverter class (IInstaller) ################################################### def save(self, title=None, temporary=False): """Saves all changes to the configuration files. :param str title: The title of the save. If a title is given, the configuration will be saved as a new checkpoint and put in a timestamped directory. :param bool temporary: Indicates whether the changes made will be quickly reversed in the future (ie. challenges) """ save_files = set(self.parser.parsed.keys()) # Create Checkpoint if temporary: self.reverter.add_to_temp_checkpoint( save_files, self.save_notes) else: self.reverter.add_to_checkpoint(save_files, self.save_notes) # Change 'ext' to something else to not override existing conf files self.parser.filedump(ext='') if title and not temporary: self.reverter.finalize_checkpoint(title) return True def recovery_routine(self): """Revert all previously modified files. Reverts all modified files that have not been saved as a checkpoint """ self.reverter.recovery_routine() self.parser.load() def revert_challenge_config(self): """Used to cleanup challenge configurations.""" self.reverter.revert_temporary_config() self.parser.load() def rollback_checkpoints(self, rollback=1): """Rollback saved checkpoints. :param int rollback: Number of checkpoints to revert """ self.reverter.rollback_checkpoints(rollback) self.parser.load() def view_config_changes(self): """Show all of the configuration changes that have taken place.""" self.reverter.view_config_changes() ########################################################################### # Challenges Section for IAuthenticator ########################################################################### def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use """Return list of challenge preferences.""" return [challenges.DVSNI] # Entry point in main.py for performing challenges def perform(self, achalls): """Perform the configuration related challenge. This function currently assumes all challenges will be fulfilled. If this turns out not to be the case in the future. Cleanup and outstanding challenges will have to be designed better. """ self._chall_out += len(achalls) responses = [None] * len(achalls) nginx_dvsni = dvsni.NginxDvsni(self) for i, achall in enumerate(achalls): if isinstance(achall, achallenges.DVSNI): # Currently also have dvsni hold associated index # of the challenge. This helps to put all of the responses back # together when they are all complete. nginx_dvsni.add_chall(achall, i) sni_response = nginx_dvsni.perform() # Must restart in order to activate the challenges. # Handled here because we may be able to load up other challenge types self.restart() # Go through all of the challenges and assign them to the proper place # in the responses return value. All responses must be in the same order # as the original challenges. for i, resp in enumerate(sni_response): responses[nginx_dvsni.indices[i]] = resp return responses # called after challenges are performed def cleanup(self, achalls): """Revert all challenges.""" self._chall_out -= len(achalls) # If all of the challenges have been finished, clean up everything if self._chall_out <= 0: self.revert_challenge_config() self.restart() def nginx_restart(nginx_ctl, nginx_conf="/etc/nginx.conf"): """Restarts the Nginx Server. .. todo:: Nginx restart is fatal if the configuration references non-existent SSL cert/key files. Remove references to /etc/letsencrypt before restart. :param str nginx_ctl: Path to the Nginx binary. """ try: proc = subprocess.Popen([nginx_ctl, "-c", nginx_conf, "-s", "reload"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() if proc.returncode != 0: # Maybe Nginx isn't running nginx_proc = subprocess.Popen([nginx_ctl, "-c", nginx_conf], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = nginx_proc.communicate() if nginx_proc.returncode != 0: # Enter recovery routine... logger.error("Nginx Restart Failed!\n%s\n%s", stdout, stderr) return False except (OSError, ValueError): logger.fatal("Nginx Restart Failed - Please Check the Configuration") sys.exit(1) return True def temp_install(options_ssl): """Temporary install for convenience.""" # WARNING: THIS IS A POTENTIAL SECURITY VULNERABILITY # THIS SHOULD BE HANDLED BY THE PACKAGE MANAGER # AND TAKEN OUT BEFORE RELEASE, INSTEAD # SHOWING A NICE ERROR MESSAGE ABOUT THE PROBLEM. # Check to make sure options-ssl.conf is installed if not os.path.isfile(options_ssl): shutil.copyfile(constants.MOD_SSL_CONF_SRC, options_ssl)