관리-도구
편집 파일: config.py
#!/opt/cloudlinux/venv/bin/python3 import logging import os import json from dataclasses import dataclass, asdict, field from wmt.common.const import CONFIG_PATH, PING_CONNECTIONS from wmt.common.exceptions import WmtConfigException from wmt.common.url_parser import parse from clcommon.cpapi import get_admin_email from socket import gethostname from typing import List @dataclass class Cfg: """ Default values, in case config has not been specified yet """ ping_interval: int = 30 ping_timeout: int = 10 ping_connections: int = PING_CONNECTIONS report_email: str = None report_top: int = 4 ignore_list: List[str] = field(default_factory=list) summary_notification_enabled: bool = True alert_notifications_enabled: bool = False class ConfigManager: def __init__(self): self.allowed_params = Cfg.__dataclass_fields__.keys() self.from_email = f'web-monitoring-tool@{gethostname()}' self.default_report_email = get_admin_email() self.cfg = self._init_cfg() # self.ignored_domains is used as cache for checking domains if it is ignored or not self._ignored_domains = self.generate_ignored_domains() self.target_email = self._get_target_email() def _get_target_email(self): """ This function checks to see which email address to use for TO: field of smtp. If report_email has been defined by user then report_email will be used. By default (in case not defined by user) default_report_email will be used """ return self.cfg.report_email if self.cfg.report_email else self.default_report_email def to_dict(self): return asdict(self.cfg) def _init_cfg(self) -> Cfg: # if not present - use defaults if not self.is_present(): return Cfg() data = self.read() cfg = Cfg() for key, value in data.items(): if key not in self.allowed_params: logging.warning( f'unsupported parameter "{key}", please ensure config contains' f' only allowed parameters: {list(self.allowed_params)}' ) else: setattr(cfg, key, value) return cfg @staticmethod def is_present(): return os.path.isfile(CONFIG_PATH) def modify(self, new_json: str): """ Changes configuration of wmt Returns: self.to_dict() Raises: WmtConfigException Example: wmt-api-solo --config-change {'key': 'val'} """ try: new_config = json.loads(new_json) except json.JSONDecodeError as e: raise WmtConfigException(str(e)) if not set(new_config.keys()).issubset(self.allowed_params): raise WmtConfigException(f'some of passed params are unsupported, ' f'only allowed parameters: {list(self.allowed_params)}') config = { **self.to_dict(), **new_config } if config.get('ignore_list') and isinstance(config.get('ignore_list'), str): config['ignore_list'] = config['ignore_list'].split(',') # Write config to /etc/wmt/config.json file with open(CONFIG_PATH, 'w') as f: json.dump(config, f, indent=4) self.cfg = Cfg(**config) return self.to_dict() @staticmethod def read(): try: with open(CONFIG_PATH) as f: data = json.load(f) except json.JSONDecodeError as e: raise WmtConfigException(str(e)) return data def reload(self): self.cfg = self._init_cfg() def is_domain_ignored(self, domain) -> bool: """ Check if domain is in ignored list. """ return domain in self._ignored_domains def generate_ignored_domains(self) -> set: """ Generates ignored domains from self.ignore_list and returns it for using as cache in self.ignored_domains set(). Purpose of this function is to generate formatted url as "http://www.test.com" from user entered urls in self.ignore_list. It makes sense to check domains names with this cache before pings or reports """ domains = set() for d in self.cfg.ignore_list: domain_from_ignore_list = parse(d) if 'https' in domain_from_ignore_list: domain_from_ignore_list = domain_from_ignore_list.replace('https', 'http') domains.add(domain_from_ignore_list) return domains