Viewing file: configurations.py (9.81 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2022 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
# configurations.py - configuration helpers for AccelerateWP feature suites
import json import logging import pwd import os from typing import Optional, Tuple, Dict
from clcommon.cpapi import cpusers from clcommon.lib.cledition import is_cl_solo_edition from clwpos import gettext as _ from clwpos.cl_wpos_exceptions import WposError from clwpos.constants import ( CLWPOS_VAR_DIR, CLWPOS_UIDS_PATH, ALLOWED_MODULES_JSON ) from clwpos.utils import uid_by_name, acquire_lock
from .suites import ALL_SUITES, AWPSuite, PremiumSuite, OLD_NEW_SUITE_NAME_PAIRS
ALLOWED_SUITES_CONFIG_VERSION = 2 ALLOWED_SUITES_JSON = 'suites_allowed.json' IS_SUITE_ALLOWED_BY_DEFAULT = bool(is_cl_solo_edition())
def extract_features(suites: Dict[str, bool]) -> Dict[str, bool]: """ Construct feature dict based on given suites """ return {feature: state for suite, state in suites.items() for feature in ALL_SUITES[suite].feature_set}
def get_admin_config_directory(uid: Optional[int]) -> str: """ Get directory path in which admin's config files are stored. Hides logic of detecting current OS edition environment. :param uid: uid :return: admin's config directory path """ is_solo = is_cl_solo_edition() if is_solo: admin_config_dir = os.path.join(CLWPOS_VAR_DIR, 'solo') else: if uid is None: raise WposError( message=_('Internal error: obtaining config path without uid is only ' 'available for CloudLinux OS Solo. ' 'Please contact support for help: ' 'https://cloudlinux.zendesk.com')) admin_config_dir = os.path.join(CLWPOS_UIDS_PATH, str(uid)) return admin_config_dir
def get_suites_allowed_path(uid: Optional[int], old=False) -> str: """ Get suites_allowed file path for user. :param uid: uid :param old: if "old" allowed modules config needed :return: suites_allowed file path """ admin_config_dir = get_admin_config_directory(uid) if not old: suites_allowed_path = os.path.join(admin_config_dir, ALLOWED_SUITES_JSON) else: suites_allowed_path = os.path.join(admin_config_dir, ALLOWED_MODULES_JSON) return suites_allowed_path
def get_allowed_suites(uid: int) -> list: """ Reads configuration file (which is manipulated by admin) and returns only that suites which are allowed to be enabled by endusers. :param uid: uid (used only for CL Shared, not used on solo) @return: list of module unique ids """ suites_admin_config = get_admin_suites_config(uid) return [ suite for suite, is_allowed in suites_admin_config['suites'].items() if is_allowed ]
def is_suite_allowed_for_user(suite: str) -> bool: """ Checks whether <suite> enabled for at least one user """ if is_cl_solo_edition(skip_jwt_check=True): data = get_admin_suites_config(uid=None)['suites'] return data.get(suite) else: users = list(cpusers()) for username in users: uid = uid_by_name(username) if not uid: continue if get_admin_suites_config(uid)['suites'].get(suite): return True return False
def any_suite_allowed_on_server() -> bool: """ Check if there are any feature suite allowed on server """ return any(is_suite_allowed_for_user(suite) for suite in ALL_SUITES)
def get_admin_suites_config(uid=None): """ Reads suites statuses from .json. In case if config does not exist returns defaults. """ suites = dict.fromkeys(ALL_SUITES, IS_SUITE_ALLOWED_BY_DEFAULT)
if is_cl_solo_edition(): suites[PremiumSuite.name] = os.path.exists('/var/lve/enable-wpos.flag')
defaults = { "version": str(ALLOWED_SUITES_CONFIG_VERSION), "suites": suites, } suites_json_path = get_suites_allowed_path(uid) old_suited_json_path = get_suites_allowed_path(uid, old=True) if os.path.exists(suites_json_path): return read_json_with_allowed_suites(defaults, suites_json_path) elif os.path.exists(old_suited_json_path): return read_json_with_allowed_suites(defaults, old_suited_json_path, old_config=True) else: return defaults
def read_json_with_allowed_suites(defaults, suites_json_path, old_config=False): """ Reads json with suites statuses for new awp version: { "version": "2", "suites": { "accelerate_wp": true, "accelerate_wp_premium": true } } for old awp version: { "version": "1", "modules": { "object_cache": true, "site_optimization": true } } """ suites_key = 'suites' if not old_config else 'modules' # TODO: locking and tempfiles # https://cloudlinux.atlassian.net/browse/LU-2073 try: with open(suites_json_path, "r") as f: suites_from_file = json.load(f)
if not old_config: # update admin's config with modules that are not in it (values are taken from defaults) # case: new module was added in the lve-utils update and it is not in the config yet for suite, status in defaults[suites_key].items(): suites_from_file[suites_key].setdefault(suite, status)
if old_config: suites = {k: v for k, v in suites_from_file.items() if k != suites_key} suites['suites'] = {OLD_NEW_SUITE_NAME_PAIRS[k]: v for k, v in suites_from_file['modules'].items()} else: suites = suites_from_file
except (json.JSONDecodeError, KeyError) as e: logging.warning('Config %s is malformed, using defaults instead, error: %s', suites_json_path, e) return defaults
return suites
def write_suites_allowed(uid: int, gid: int, data_dict_to_write: dict, custom_allowed_path: str = None): """ Writes modules_allowed file for user :param uid: User uid :param gid: User gid :param data_dict_to_write: Data to write :param custom_allowed_path: custom path of allowed config """ modules_allowed_path = custom_allowed_path or get_suites_allowed_path(uid) json_data = json.dumps(data_dict_to_write, indent=4)
with open(modules_allowed_path, "w") as f: f.write(json_data)
owner, group, mode = get_admin_config_permissions(gid) os.chown(modules_allowed_path, owner, group) os.chmod(modules_allowed_path, mode)
def get_admin_config_permissions(gid: int) -> Tuple[int, int, int]: """ Return owner, group and permission which files inside admin's config directory should have. User should have rights to read (not write) config, so we set owner root, group depends on CL edition (see comment above) """ if is_cl_solo_edition(skip_jwt_check=True): # root:root 644 - CL Solo owner, group, mode = 0, 0, 0o644 else: # root:username 640 - CL Shared Pro owner, group, mode = 0, gid, 0o640 return owner, group, mode
def get_allowed_modules(uid: int) -> list: """ Reads configuration file (which is manipulated by admin) and returns only that modules which are allowed to be enabled by endusers. :param uid: uid (used only for CL Shared, not used on solo) @return: list of module unique ids """ suites_admin_config = get_admin_suites_config(uid) return [ feature for feature, is_allowed in extract_features(suites_admin_config['suites']).items() if is_allowed ]
def get_allowed_features_dict(uid: int): """ Dict with allowed features per feature-set """ allowed_features = get_allowed_modules(uid) free, premium = [], [] for feature in allowed_features: if feature in AWPSuite.feature_set: free.append(feature) elif feature in PremiumSuite.feature_set: premium.append(feature) return { 'accelerate_wp': free, 'accelerate_wp_premium': premium }
def is_module_allowed_for_user(module: str) -> bool: """ Checks whether <module> enabled for at least one user """ if is_cl_solo_edition(skip_jwt_check=True): data = extract_features(get_admin_suites_config(uid=None)['suites']) return data.get(module) else: users = list(cpusers()) for username in users: uid = uid_by_name(username) if not uid: continue if extract_features(get_admin_suites_config(uid)['suites']).get(module): return True return False
def _sync_allowed_configs(username): """ Syncing allowed configs (needed for downgrade) """ uid = pwd.getpwnam(username).pw_uid gid = pwd.getpwnam(username).pw_gid suites_json_path = get_suites_allowed_path(uid)
# means there is no custom settings in config if not os.path.exists(suites_json_path): return
suites_admin_config = get_admin_suites_config(uid)['suites'] config_to_sync = get_suites_allowed_path(uid, old=True)
modules_states = {'version': 1, 'modules': {'object_cache': suites_admin_config['accelerate_wp_premium'], 'site_optimization': suites_admin_config['accelerate_wp']} } with acquire_lock(config_to_sync): write_suites_allowed(uid, gid, modules_states, custom_allowed_path=config_to_sync)
def sync_allowed_configs(): users = list(cpusers()) for user in users: try: _sync_allowed_configs(user) except Exception: logging.exception('Error while syncing the allowed configs for user %s', user) continue
|