관리-도구
편집 파일: environments.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import from __future__ import print_function from __future__ import division import os import pwd from distutils.version import StrictVersion from clselect.clselectctl import get_directory from clselect.utils import check_call, check_output, list_dirs from .extensions import EXTENSION_PATTERN, ExtensionInfo from .interpreters import Interpreter DEFAULT_PREFIX = 'rubyvenv' RUBYVENV_BIN = os.path.join(os.path.dirname(__file__), 'rubyvenv.py') VERSION_DELIMITER = '#' PYTHON_PATH = '/opt/cloudlinux/venv/bin/python3' class Environment(object): def __init__(self, name, user=None, prefix=None): self.name = name if user: self.user = user else: self.user = pwd.getpwuid(os.getuid()).pw_name if prefix is None: self.prefix = DEFAULT_PREFIX else: self.prefix = prefix self.path = os.path.join(_abs_prefix(self.user, self.prefix), name) self._interpreter = None self._gem = None self.interpreter_name = 'ruby' + name def __repr__(self): return ("%s.%s(name='%s', user='%s', prefix='%s')" % ( self.__class__.__module__, self.__class__.__name__, self.name, self.user, self.prefix)) def _demote(self): user_pwd = pwd.getpwnam(self.user) def func(): os.setgid(user_pwd.pw_gid) os.setuid(user_pwd.pw_uid) return func def as_dict(self, key=None): e = { 'name': self.name, 'interpreter': self.interpreter(), 'extensions': self.extensions(), } if key: del e[key] return {getattr(self, key): e} return e def as_deepdict(self, key=None): e = { 'name': self.name, 'interpreter': self.interpreter().as_dict(), 'extensions': self.extensions(), } if key: del e[key] return {getattr(self, key): e} return e def create(self, interpreter=None): if not interpreter: interpreter = Interpreter(target_user=self.user) prompt = '(' + \ get_directory(os.path.basename(self.prefix)) + ':' + self.name + \ ')' check_call( PYTHON_PATH, RUBYVENV_BIN, '--prompt', prompt, '--ruby', interpreter.binary, self.path, preexec_fn=self._demote()) def destroy(self): check_call('/bin/rm', '-r', '--interactive=never', self.path, preexec_fn=self._demote()) def exists(self): return os.path.exists(self.path) def interpreter(self): if not self._interpreter: self._interpreter = Interpreter(prefix=self.path, target_user=self.user) return self._interpreter def gem(self): if not self._gem: self._gem = os.path.join(self.path, 'bin', 'gem') return self._gem def extension_install(self, extension): locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name) t = extension.split(VERSION_DELIMITER) extension, version = t[0], t[1:] or '' if StrictVersion(self.name) >= StrictVersion('2.6'): command = (self.gem(), 'install', '--no-document', extension) else: command = (self.gem(), 'install', '--no-rdoc', '--no-ri', extension) if version: version = version[0] command += ('-v', version) if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version): raise ValueError("Extension '%s' install is prohibited. System extension" % extension) check_call(args=command, preexec_fn=self._demote()) def extension_update(self, extension): check_call(self.gem(), 'update', extension, preexec_fn=self._demote()) def extension_uninstall(self, extension): locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name) t = extension.split(VERSION_DELIMITER) extension, version = t[0], t[1:] or '' command = (self.gem(), 'uninstall', extension, '-x', '-a') if version: version = version[0] command += ('-v', version) if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version): raise ValueError("Extension '%s' removal is prohibited" % extension) check_call(args=command, preexec_fn=self._demote()) def extensions(self): result = {} locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name) output = check_output(self.gem(), 'list', '--local', preexec_fn=self._demote()) extensions = EXTENSION_PATTERN.findall(output) docs = (ExtensionInfo.extension_doc(extension) for extension, _ in extensions) for (name, version), doc in zip(extensions, docs): if ExtensionInfo.is_extensions_locked(locked_extensions, name, version): version_diff = list(set([v.strip() for v in version.split(',')]) - set(locked_extensions.get(name))) if version_diff and len(locked_extensions.get(name)) != 0: result[name] = {'doc': doc, 'version': ', '.join(version_diff)} else: result[name] = {'doc': doc, 'version': version} return result def _abs_prefix(user=None, prefix=None): if not prefix: prefix = DEFAULT_PREFIX if user: return os.path.join(pwd.getpwnam(user).pw_dir, prefix) else: return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, prefix) def environments(user=None, prefix=None): venv_path = _abs_prefix(user, prefix) try: env_list = list_dirs(venv_path) except OSError: return [] envs = [] for env_name in env_list: envs.append(Environment(env_name, user, prefix)) return envs def environments_dict(key, user=None, prefix=None): return dict(list(e.as_dict(key=key).items()) for e in environments(user, prefix)) def environments_deepdict(key, user=None, prefix=None): return dict(list(e.as_deepdict(key=key).items()) for e in environments(user, prefix))