Source code for pyrocore.scripts.base

# -*- coding: utf-8 -*-
# pylint: disable=
""" Command Line Script Support.

    Copyright (c) 2009, 2010 The PyroScope Project <pyroscope.project@gmail.com>
"""
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from __future__ import with_statement

import re
import sys
import glob
import time
import errno
import random
import signal
import textwrap
import logging.config
from optparse import OptionParser

import pkg_resources

from pyrocore import error, config
from pyrocore.util import os, fmt, pymagic, load_config


[docs]class ScriptBase(object): """ Base class for command line interfaces. """ # logging configuration LOGGING_CFG = "~/.pyroscope/logging.%s.ini" # log level for user-visible standard logging STD_LOG_LEVEL = logging.INFO # argument description for the usage information ARGS_HELP = "<log-base>..." # additonal stuff appended after the command handler's docstring ADDITIONAL_HELP = [] # Can be empty or None in derived classes COPYRIGHT = "Copyright (c) 2009 - 2018 Pyroscope Project" # Can be made explicit in derived classes (for external tools) VERSION = None
[docs] @classmethod def setup(cls, cron_cfg="cron"): """ Set up the runtime environment. """ random.seed() logging_cfg = cls.LOGGING_CFG if "%s" in logging_cfg: logging_cfg = logging_cfg % (cron_cfg if "--cron" in sys.argv[1:] else "scripts",) logging_cfg = os.path.expanduser(logging_cfg) if os.path.exists(logging_cfg): logging.HERE = os.path.dirname(logging_cfg) logging.config.fileConfig(logging_cfg) else: logging.basicConfig(level=logging.INFO) logging.getLogger().debug("Logging config read from '%s'" % logging_cfg)
def _get_pkg_meta(self): """ Try to find package metadata. """ logger = logging.getLogger('pyrocore.scripts.base.version_info') pkg_info = None warnings = [] for info_ext, info_name in (('.dist-info', 'METADATA'), ('.egg-info', 'PKG-INFO')): try: # Development setup pkg_path = os.path.join( __file__.split(__name__.replace('.', os.sep))[0], # containing path __name__.split(".")[0] # package name ) if os.path.exists(pkg_path + info_ext): pkg_path += info_ext else: globbed_paths = glob.glob(pkg_path + "-*-py%d.%d" % sys.version_info[:2] + info_ext) if len(globbed_paths) == 1: pkg_path = globbed_paths[0] elif globbed_paths: warnings.append("Found {} release-specific candidate versions in *{}" .format(len(globbed_paths), info_ext)) pkg_path = None else: globbed_paths = glob.glob(pkg_path + "-*" + info_ext) if len(globbed_paths) == 1: pkg_path = globbed_paths[0] else: warnings.append("Found {} candidate versions in *{}" .format(len(globbed_paths), info_ext)) pkg_path = None if pkg_path: with open(os.path.join(pkg_path, info_name)) as handle: pkg_info = handle.read() break except IOError: continue if not pkg_info: logger.warn("Software version cannot be determined! ({})".format(', '.join(warnings))) return pkg_info or "Version: 0.0.0\n" def __init__(self): """ Initialize CLI. """ self.startup = time.time() self.LOG = pymagic.get_class_logger(self) # Get version number self.version = self.VERSION if not self.version: # Take version from package provider = pkg_resources.get_provider(__name__) pkg_meta = (provider.get_metadata("PKG-INFO") or provider.get_metadata("METADATA") or self._get_pkg_meta()) pkg_dict = dict(line.split(": ", 1) for line in pkg_meta.splitlines() if ": " in line ) self.version = pkg_dict.get("Version", "DEV") where = os.path.commonprefix([__file__, os.path.realpath(sys.argv[0]), sys.prefix]) where = re.sub('^' + os.path.expanduser('~') + os.sep, '~' + os.sep, where + os.sep).rstrip(os.sep) self.version_info = '{}{}{} on Python {}'.format( self.version, ' from ' if where else '', where, sys.version.split()[0]) self.args = None self.options = None self.return_code = 0 self.parser = OptionParser( "%prog [options] " + self.ARGS_HELP + "\n\n" "%prog " + self.version_info + ('\n' + self.COPYRIGHT if self.COPYRIGHT else "") + "\n\n" + textwrap.dedent(self.__doc__.rstrip()).lstrip('\n') + '\n'.join(self.ADDITIONAL_HELP) + "\n\nFor more details, see the full documentation at" + "\n\n https://pyrocore.readthedocs.io/", version="%prog " + self.version_info)
[docs] def add_bool_option(self, *args, **kwargs): """ Add a boolean option. @keyword help: Option description. """ dest = [o for o in args if o.startswith("--")][0].replace("--", "").replace("-", "_") self.parser.add_option(dest=dest, action="store_true", default=False, help=kwargs['help'], *args)
[docs] def add_value_option(self, *args, **kwargs): """ Add a value option. @keyword dest: Destination attribute, derived from long option name if not given. @keyword action: How to handle the option. @keyword help: Option description. @keyword default: If given, add this value to the help string. """ kwargs['metavar'] = args[-1] if 'dest' not in kwargs: kwargs['dest'] = [o for o in args if o.startswith("--")][0].replace("--", "").replace("-", "_") if 'default' in kwargs and kwargs['default']: kwargs['help'] += " [%s]" % kwargs['default'] self.parser.add_option(*args[:-1], **kwargs)
[docs] def get_options(self): """ Get program options. """ self.add_bool_option("-q", "--quiet", help="omit informational logging") self.add_bool_option("-v", "--verbose", help="increase informational logging") self.add_bool_option("--debug", help="always show stack-traces for errors") self.add_bool_option("--cron", help="run in cron mode (with different logging configuration)") # Template method to add options of derived class self.add_options() self.handle_completion() self.options, self.args = self.parser.parse_args() # Override logging options in debug mode if self.options.debug: self.options.verbose = True self.options.quiet = False # Set logging levels if self.options.cron: self.STD_LOG_LEVEL = logging.DEBUG # pylint: disable=invalid-name if self.options.verbose and self.options.quiet: self.parser.error("Don't know how to be quietly verbose!") elif self.options.quiet: logging.getLogger().setLevel(logging.WARNING) elif self.options.verbose: logging.getLogger().setLevel(logging.DEBUG) self.LOG.debug("Options: %s" % ", ".join("%s=%r" % i for i in sorted(vars(self.options).items())))
[docs] def handle_completion(self): """ Handle shell completion stuff. """ # We don't want these in the help, so handle them explicitely if len(sys.argv) > 1 and sys.argv[1].startswith("--help-completion-"): handler = getattr(self, sys.argv[1][2:].replace('-', '_'), None) if handler: print '\n'.join(sorted(handler())) self.STD_LOG_LEVEL = logging.DEBUG sys.exit(error.EX_OK)
[docs] def help_completion_options(self): """ Return options of this command. """ for opt in self.parser.option_list: for lopt in opt._long_opts: yield lopt
[docs] def fatal(self, msg, exc=None): """ Exit on a fatal error. """ if exc is not None: self.LOG.fatal("%s (%s)" % (msg, exc)) if self.options.debug: return # let the caller re-raise it else: self.LOG.fatal(msg) sys.exit(error.EX_SOFTWARE)
[docs] def run(self): """ The main program skeleton. """ log_total = True try: try: # Preparation steps self.get_options() # Template method with the tool's main loop self.mainloop() except error.LoggableError, exc: if self.options.debug: raise # Log errors caused by invalid user input try: msg = str(exc) except UnicodeError: msg = unicode(exc, "UTF-8") self.LOG.error(msg) sys.exit(error.EX_SOFTWARE) except KeyboardInterrupt, exc: if self.options.debug: raise sys.stderr.write("\n\nAborted by CTRL-C!\n") sys.stderr.flush() # See https://www.cons.org/cracauer/sigint.html signal.signal(signal.SIGINT, signal.SIG_DFL) os.kill(os.getpid(), signal.SIGINT) sys.exit(error.EX_TEMPFAIL) # being paranoid except IOError, exc: # [Errno 32] Broken pipe? if exc.errno == errno.EPIPE: sys.stderr.write("\n%s, exiting!\n" % exc) sys.stderr.flush() # Monkey patch to prevent an exception during logging shutdown try: handlers = logging._handlerList except AttributeError: pass else: for handler in handlers: try: handler.flush = lambda *_: None except AttributeError: pass # skip special handlers log_total = False sys.exit(error.EX_IOERR) else: raise finally: # Shut down if log_total and self.options: ## No time logging on --version and such running_time = time.time() - self.startup self.LOG.log(self.STD_LOG_LEVEL, "Total time: %.3f seconds." % running_time) logging.shutdown() # Special exit code? if self.return_code: sys.exit(self.return_code)
[docs] def add_options(self): """ Add program options. """
[docs] def mainloop(self): """ The main loop. """ raise NotImplementedError()
[docs]class ScriptBaseWithConfig(ScriptBase): # pylint: disable=abstract-method """ CLI tool with configuration support. """ CONFIG_DIR_DEFAULT = '~/.pyroscope' OPTIONAL_CFG_FILES = []
[docs] def add_options(self): """ Add configuration options. """ super(ScriptBaseWithConfig, self).add_options() self.add_value_option("--config-dir", "DIR", help="configuration directory [{}]".format(os.environ.get('PYRO_CONFIG_DIR', self.CONFIG_DIR_DEFAULT))) self.add_value_option("--config-file", "PATH", action="append", default=[], help="additional config file(s) to read") self.add_value_option("-D", "--define", "KEY=VAL [-D ...]", default=[], action="append", dest="defines", help="override configuration attributes")
[docs] def get_options(self): """ Get program options. """ super(ScriptBaseWithConfig, self).get_options() self.config_dir = os.path.abspath(os.path.expanduser(self.options.config_dir or os.environ.get('PYRO_CONFIG_DIR', None) or self.CONFIG_DIR_DEFAULT)) load_config.ConfigLoader(self.config_dir).load(self.OPTIONAL_CFG_FILES + self.options.config_file) if self.options.debug: config.debug = True for key_val in self.options.defines: try: key, val = key_val.split('=', 1) except ValueError, exc: raise error.UserError("Bad config override %r (%s)" % (key_val, exc)) else: setattr(config, key, load_config.validate(key, val))
[docs] def check_for_connection(self, maxpos=0): """ Scan arguments for a `@name` one. """ for idx, arg in enumerate(self.args[:maxpos] if maxpos else self.args): if arg.startswith('@'): if arg[1:] not in config.connections: self.parser.error("Undefined connection '{}'!".format(arg[1:])) config.scgi_url = config.connections[arg[1:]] self.LOG.debug("Switched to connection %s (%s)", arg[1:], config.scgi_url) del self.args[idx] break
[docs]class PromptDecorator(object): """ Decorator for interactive commands. """ # Return code for Q)uit choice QUIT_RC = error.EX_TEMPFAIL def __init__(self, script_obj): """ Initialize with containing tool object. """ self.script = script_obj
[docs] def add_options(self): """ Add program options, must be called in script's addOptions(). """ # These options need to be conflict-free to the containing # script, i.e. avoid short options if possible. self.script.add_bool_option("-i", "--interactive", help="interactive mode (prompt before changing things)") self.script.add_bool_option("--yes", help="positively answer all prompts (e.g. --delete --yes)")
[docs] def ask_bool(self, question, default=True): """ Ask the user for Y)es / N)o / Q)uit. If "Q" ist entered, this method will exit with RC=3. Else, the user's choice is returned. Note that the options --non-interactive and --defaults also influence the outcome. """ if self.script.options.yes: return True elif self.script.options.dry_run or not self.script.options.interactive: return default else: # Let the user decide choice = '*' while choice not in "YNAQ": choice = raw_input("%s? [%s)es, %s)o, a)ll yes, q)uit]: " % ( fmt.to_console(question), "yY"[int(default)], "Nn"[int(default)], )) choice = choice[:1].upper() or "NY"[int(default)] if choice == 'Q': self.quit() if choice == 'A': self.script.options.yes = True choice = 'Y' return choice == 'Y'
[docs] def quit(self): """ Exit the program due to user's choices. """ self.script.LOG.warn("Abort due to user choice!") sys.exit(self.QUIT_RC)