관리-도구
편집 파일: server_id.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from uuid import uuid4 from clcommon.clconfpars import change_settings, load from lvestats.lib import config, lveinfolib from lvestats.orm.history import history, history_x60 from lvestats.orm.history_gov import history_gov from lvestats.orm.incident import incident from lvestats.orm.servers import servers from lvestats.orm.user import user class UpdateOrCreateServerID(object): """Change old-style server ID 'localhost' to auto-generated uuid User can set his own server ID """ restricted_server_id_list = ('localhost',) server_id_length = 10 def __init__(self, engine, config_path=None): """ :type engine: sqlalchemy.engine.base.Engine :type config_path: Union[str, None] """ self.engine = engine self.config_path = config_path or config.GLOBAL_CONFIG_LOCATION def _get_current_server_id_cfg(self): """ :rtype: Union[None, str] """ try: cfg = load(path=self.config_path) return cfg.get('server_id') except Exception as e: print(f'Error: {e}') return None def _set_server_id_cfg(self, server_id): """Sets option 'server_id' in config file :type server_id: str :rtype: Union[None, str] """ try: change_settings({'server_id': server_id}, self.config_path) return server_id except Exception as e: print(f'Error: {e}') return None def _update_server_id_table(self, conn, table, current_server_id, new_server_id): """ updates server_id in one of the tables :type conn: sqlalchemy.engine.base.Connection :type table: Union[history, history_gov, incident, user, history_x60] :type current_server_id: str :type new_server_id: str """ sql_update = table.__table__.update().where( table.server_id == current_server_id ) conn.execute(sql_update.values( {'server_id': new_server_id} )) def _update_server_id_db(self, current_server_id, new_server_id): """ updates server_id for all tables in the database :type current_server_id: str :type new_server_id: str :rtype: Union[None, str] """ sql_tables = [history, history_gov, incident, user, history_x60] lve_version = lveinfolib.get_lve_version(self.engine, current_server_id) conn = self.engine.connect() tx = conn.begin() try: for table in sql_tables: self._update_server_id_table(conn, table, current_server_id, new_server_id) # update servers table, where server_id is a primary key conn.execute(servers.__table__.insert().values( {'server_id': new_server_id, "lve_version": lve_version} )) conn.execute(servers.__table__.delete().where(servers.server_id == current_server_id)) except Exception as e: tx.rollback() conn.close() print(f'Error: {e}') return None else: tx.commit() conn.close() return new_server_id def _resolve_localhost(self, prompt=True, current_server_id=None): """ :type current_server_id: Union[None, str] :type prompt: bool :rtype: Union[None, str] """ server_id = str(uuid4())[:self.server_id_length] if prompt: res = input(f"Enter new server ID ['{server_id}']: ") print(res) if res: server_id = res.strip() if server_id in self.restricted_server_id_list: print(f"Server ID cannot be '{server_id}'") return None if server_id == current_server_id: print(f"Server ID is already set to '{server_id}'") return server_id if len(server_id) > 255: print("Server ID length should be less than 255") return None cur_server_id = self._get_current_server_id_cfg() if cur_server_id is not None and not prompt and \ cur_server_id not in self.restricted_server_id_list: return cur_server_id if self._update_server_id_db(current_server_id or cur_server_id or 'localhost', server_id): return self._set_server_id_cfg(server_id) return server_id def prompt(self): """ :rtype: Union[None, str] """ cur_cfg_server_id = self._get_current_server_id_cfg() # if server ID = 'localhost' if isinstance(cur_cfg_server_id, str) and \ cur_cfg_server_id.strip() in self.restricted_server_id_list: print(f"Server ID cannot be '{cur_cfg_server_id}'") return self._resolve_localhost(current_server_id=cur_cfg_server_id) def auto(self): """ :rtype: Union[None, str] """ return self._resolve_localhost(prompt=False)