관리-도구
편집 파일: parser.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains SSA specific config parser: - has defaults - adds default values for ones missing in config file - allows duplicates, the latest value is loaded - performs value normalization during loading items: - to int - to float - to bool - 'correlation' is case-insensitive and allowed to be on, off, 1, 0, yes, no, true, false - loads default configuration in case of missing config file """ import logging import os import tempfile from configparser import ConfigParser, NoSectionError from typing import Callable, List from ..internal.constants import config_file default_section = 'ssa_conf' logger = logging.getLogger('ssa_configuration_parser') class SSAConfigParser(ConfigParser): """ SSA specific configparser implementation """ def __init__(self): defaults = { 'domains_number': 10, 'urls_number': 5, 'request_number': 10, 'requests_duration': 3.0, 'time': 1, 'correlation': True, 'correlation_coefficient': 0.8, 'ignore_list': '', 'summary_notification_enabled': True } self._comments = { 'domains_number': 'Size of TOP list for slow domains', 'urls_number': 'Size of TOP list for slow urls', 'request_number': 'The threshold value of slow requests number in the period of time to mark URL as a slow one', 'requests_duration': 'The threshold value of request duration in seconds', 'time': 'Period of time in hours required to analyze these requests', 'correlation': 'Flag to enable or disable correlation (On/Off)', 'correlation_coefficient': 'The threshold value of correlation coefficient', 'ignore_list': 'List of URLs or domains that should not be included in a daily report', 'summary_notification_enabled': 'Flag to enable or disable sending daily reports by e-mail (On/Off)' } self.ssa_defaults = defaults.copy() super().__init__(defaults, strict=False) @property def int_options(self) -> tuple: return 'domains_number', 'urls_number', 'request_number', 'time' @property def float_options(self) -> tuple: return 'requests_duration', 'correlation_coefficient' @property def bool_options(self) -> tuple: return 'correlation', 'summary_notification_enabled' def empty_value_fallback(self, meth: Callable, section: str, option: str) -> str: """ If the value is missing in config file, e.g. time= set the default one """ try: val = meth(section, option) except (ValueError, NoSectionError): val = meth(section, option, vars=self.defaults(), fallback=self.ssa_defaults[option]) return val def items(self, section: str = default_section, raw: bool = False, vars: dict = None) -> List[tuple]: """Return a list of (name, value) tuples for each option in a section. Adds special conversions. """ d = self._defaults.copy() try: d.update(self._sections[section]) except KeyError: if section != self.default_section: logger.info('Malformed ssa.conf, section "%s" is missed. ' 'Default will be applied ', section) self.restore_default_conf() orig_keys = list(d.keys()) def value_getter(option): if option in self.bool_options: return self.empty_value_fallback(self.getboolean, section, option) elif option in self.int_options: return self.empty_value_fallback(self.getint, section, option) elif option in self.float_options: return self.empty_value_fallback(self.getfloat, section, option) else: return d[option] return [(option, value_getter(option)) for option in orig_keys] def read_ssa_conf(self): """ Try to read configuration from file, load defaults if file not present """ try: with open(config_file): pass self.read(config_file) except FileNotFoundError: self.read_dict({default_section: self.defaults()}) def write_ssa_conf(self) -> None: """ Try to write current configuration to file, """ temp_file = None try: temp_file = tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(config_file), delete=False) # Write the configuration to the temporary file self.write(temp_file, space_around_delimiters=False) # If the writing process completes without any errors, rename the temporary file to the config file os.rename(temp_file.name, config_file) finally: if temp_file is not None: temp_file.close() if os.path.exists(temp_file.name): os.remove(temp_file.name) def override(self, args: dict) -> None: """ Load options from the dict into the main section of the config. """ section = default_section if default_section in self.sections() else self.sections()[0] for option, value in args.items(): self.set(section, option, str(value)) def write(self, fp, space_around_delimiters=True): """ Parent method is overrided in order to: - do not print default section - call its own write_section method instead of parent one. """ if space_around_delimiters: d = ' {} '.format(self._delimiters[0]) else: d = self._delimiters[0] for section in self._sections: self.write_section(fp, section, self._sections[section].items(), d) def write_section(self, fp, section_name, section_items, delimiter): """ The difference between this one and the parent _write_section method is in writing the comments to every option also """ comment_prefix = self._comment_prefixes[0] if self._comment_prefixes else '#' fp.write('[{}]\n'.format(section_name)) for key, value in section_items: value = self._interpolation.before_write(self, section_name, key, value) if value is not None or not self._allow_no_value: value = delimiter + str(value).replace('\n', '\n\t') else: value = '' comment = self._comments.get(key, None) if comment is None: fp.write(f'{key}{value}\n\n') else: fp.write(f'{comment_prefix} {comment}\n{key}{value}\n\n') fp.write('\n') def restore_default_conf(self): """ Clear all sections in config parser, try to copy settings values if they have it. Add new default section and pass all settings to it. """ previous_settings = self.ssa_defaults for section in self.sections(): for option in self.options(section): if option in previous_settings: previous_settings[option] = self.get(section, option) self.remove_section(section) self.add_section(default_section) for option, value in previous_settings.items(): self.set(default_section, option, str(value))