Viewing file: wpos_user.py (33.52 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
# wpos_user.py - work code for clwpos-user utility
from __future__ import absolute_import
import argparse import os
from typing import Dict, List, Optional, Tuple from itertools import chain
from pkg_resources import parse_version
import cpanel from clcommon.clpwd import drop_user_privileges from clwpos.cl_wpos_exceptions import WposError from clwpos.parse import ArgumentParser, CustomFormatter from clwpos.optimization_features import ( OBJECT_CACHE_FEATURE, SITE_OPTIMIZATION_FEATURE, ALL_OPTIMIZATION_FEATURES, CompatibilityIssue, Feature, MisconfigurationIssue, UniqueId, convert_feature_list_to_interface ) from clwpos.feature_suites import get_allowed_modules, get_allowed_features_dict from clwpos.user.config import UserConfig from clwpos.user.redis_lib import RedisLibUser from clwpos import gettext as _
from clwpos.utils import ( USER_WPOS_DIR, check_license_decorator, print_data, catch_error, error_and_exit, daemon_communicate, home_dir, user_name, user_uid, check_domain, is_run_under_user, supported_php_handlers, _run_clwpos_as_user_in_cagefs, is_redis_configuration_running, get_status_from_daemon, redis_is_running, litespeed_is_running )
from clwpos.logsetup import setup_logging, USER_LOGFILE_PATH, init_wpos_sentry_safely from clwpos import constants from clwpos.user.progress_check import ( CommandProgress, track_command_progress, update_command_progress, ) from clwpos.user.website_check import post_site_check, RollbackException from clcommon.const import Feature as ModuleFeature from clcommon.cpapi import is_panel_feature_supported
logger = setup_logging(__name__)
parser = ArgumentParser( "/usr/bin/clwpos-user", "Utility for control CL AccelerateWP under user", formatter_class=CustomFormatter ) if not is_run_under_user(): parser.add_argument('--user', default=None, required=True)
class MaxCacheMemory(int): """ Class to validate format and values of cache memory setted by user. """
def __new__(cls, *args, **kwargs): try: instance = super(MaxCacheMemory, cls).__new__(cls, *args, **kwargs) except ValueError: raise argparse.ArgumentTypeError("invalid value type, must be integer")
min_memory = constants.MINIMUM_MAX_CACHE_MEMORY max_memory = constants.MAXIMUM_MAX_CACHE_MEMORY
if not min_memory <= instance <= max_memory: raise argparse.ArgumentTypeError( f"value must be in range: [{min_memory}, {max_memory}]")
return instance
class CloudlinuxWposUser(object): """ Class for run clwpos-user utility commands """ COMMAND_RELOAD_DICT = {"command": "reload"}
def __init__(self): self._is_json = False self._opts = None self._wp_config_suffix = "wp-config.php" self._is_redis_running = None self._is_litespeed_running = None self._supported_handlers = None self._supported_php_versions = None self._minimum_wordpress_versions = None self._php_versions_with_redis_not_loaded = None self.command_progress: Optional[CommandProgress] = None init_wpos_sentry_safely(logger)
@property def redis_socket_path(self): return os.path.join(home_dir(), USER_WPOS_DIR, 'redis.sock')
@property def supported_handlers(self): if self._supported_handlers is None: self._supported_handlers = supported_php_handlers() return self._supported_handlers
@property def is_redis_running(self): if self._is_redis_running is None: self._is_redis_running = redis_is_running() return self._is_redis_running
@property def is_litespeed_running(self): if self._is_litespeed_running is None: self._is_litespeed_running = litespeed_is_running() return self._is_litespeed_running
@property def supported_php_versions(self): """ Return dict where key is optimization feature and values is list of supported php version (for object-cache with redis enabled). """ if self._supported_php_versions is None: supported_php_versions = { OBJECT_CACHE_FEATURE: [ php_version for php_version in cpanel.get_cached_php_versions_with_redis_loaded() if OBJECT_CACHE_FEATURE.is_php_supported(php_version) ], SITE_OPTIMIZATION_FEATURE: [ php_version for php_version in cpanel.get_cached_php_installed_versions() if SITE_OPTIMIZATION_FEATURE.is_php_supported(php_version) ] } self._supported_php_versions = supported_php_versions return self._supported_php_versions
@catch_error def run(self, argv): """ Run command action :param argv: sys.argv[1:] :return: clwpos-user utility retcode """ self._parse_args(argv)
if not is_run_under_user(): _run_clwpos_as_user_in_cagefs(self._opts.user) else: _run_clwpos_as_user_in_cagefs()
if not is_run_under_user() and not is_panel_feature_supported(ModuleFeature.CAGEFS): drop_user_privileges(self._opts.user, effective_or_real=False, set_env=True)
result = getattr(self, self._opts.command.replace("-", "_"))() print_data(self._is_json, result)
def _parse_args(self, argv): """ Parse command line arguments :param argv: sys.argv[1:] """ self._opts = parser.parse_args(argv) self._is_json = True
def _check_monitoring_daemon_and_exit(self, feature): """ Ensures monitoring socket is present, otherwise - exit """ if feature == OBJECT_CACHE_FEATURE \ and not os.path.exists(constants.WPOS_DAEMON_SOCKET_FILE): error_and_exit( self._is_json, { "result": _("Unable to find monitoring daemon socket %(daemon_file)s. Please, contact your " "system administrator to ensure service %(service_name)s is currently running"), "context": {"daemon_file": constants.WPOS_DAEMON_SOCKET_FILE, "service_name": constants.MONIROTING_SERVICE}, } )
def _check_redis_configuration_process_and_exit(self, feature): """ Ensures redis configuration processes are not in progress. """ if feature == OBJECT_CACHE_FEATURE and is_redis_configuration_running(): error_and_exit( self._is_json, { "result": _("Configuration of PHP redis extension is in progress. " "Please wait until the end of this process to use AccelerateWP."), } )
def _is_redis_daemon_reload_needed(self, user_config: UserConfig, feature: Feature) -> bool: """ Determine if it is needed to send reload command to Wpos redis daemon. Such reload should be done after user has enabled object caching for the first of all his sites, or disable for the last. """ command = self._opts.command if feature != OBJECT_CACHE_FEATURE: return False
sites_count_with_enabled_module = user_config.get_enabled_sites_count_by_modules([feature]) if (command == 'enable' and sites_count_with_enabled_module == 1) \ or (command == 'disable' and sites_count_with_enabled_module == 0): return True
return False
@staticmethod def _get_php_versions_handlers_pair_for_docroot(domains_per_docroot: List, php_data: List) -> Tuple: """ Returns pair (all_php_versions, all_php_handlers) for domains in docroot """ versions, handlers = set(), set() for item in php_data: domain = item['vhost'] if domain not in domains_per_docroot: continue versions.add(item['version']) if item['php_fpm']: handlers.add('php-fpm') else: handlers.add(cpanel._get_php_handler(domain)) return versions, handlers
def collect_general_issues(self, module_name: str): """ Collects general (not depending on worpress/docroot setup) server misconfigurations/incompatibilities with WPOS """ issues = [] if module_name == OBJECT_CACHE_FEATURE and self.is_litespeed_running: issues.append( CompatibilityIssue( header=_('Litespeed is unsupported'), description=_('Litespeed webserver is running on your server, ' 'it is not currently supported by AccelerateWP.'), fix_tip=_('Please ask your system administrator to switch webserver to Apache ' 'using the following instruction: %(switch_lsws_link)s ' 'or keep watching our blog: %(blog_link)s ' 'for news about LiteSpeed support.'), context=dict( switch_lsws_link='https://www.interserver.net/tips/kb/switch-apache-litespeed/', blog_link='https://blog.cloudlinux.com/' ), unique_id=UniqueId.WEBSERVER_NOT_SUPPORTED, telemetry=dict( webserver='litespeed' ) )) return issues
@update_command_progress def collect_docroot_issues(self, module: Feature, doc_root_info: Dict, php_info: List, allowed_modules: List): """ Collects incompatibilities related to docroot (non-supported handler, etc) """ issues = [] domains_per_docroot = doc_root_info['domains'] versions, handlers = self._get_php_versions_handlers_pair_for_docroot(domains_per_docroot, php_info) if len(versions) > 1: issues.append( CompatibilityIssue( header=_('Different PHP versions for domains'), description=_('Those domains: %(domains)s are in same docroot, ' 'but using different PHP version'), fix_tip=_('Set or ask your system administrator to set same PHP version on those domains'), context={ 'domains': ', '.join(domains_per_docroot) }, unique_id=UniqueId.PHP_MISCONFIGURATION, telemetry=dict( reason='PHP_VERSION_BADLY_CONFIGURED', domains=domains_per_docroot ) ) ) if len(handlers) > 1: issues.append( CompatibilityIssue( header=_('Different PHP handlers for domains'), description=_('Those domains: %(domains)s are in same docroot, ' 'but using different PHP handlers'), fix_tip=_('Set or ask your system administrator to set same PHP handler on those domains'), context={ 'domains': ', '.join(domains_per_docroot) }, unique_id=UniqueId.PHP_MISCONFIGURATION, telemetry=dict( reason='PHP_HANDLER_BADLY_CONFIGURED', domains=domains_per_docroot ) ) )
additional_issues = module.collect_docroot_issues(self, doc_root_info, allowed_modules=allowed_modules) issues.extend(additional_issues)
return issues
@staticmethod def _is_our_roc_plugin(wp_path: str) -> bool: """ Checks that WP's WP_REDIS_PATH is defined and contains our path """ wp_redis_path = cpanel.wp_get_constant(wp_path, 'WP_REDIS_PATH', raise_exception=True) if wp_redis_path and "/.clwpos/redis.sock" in wp_redis_path: return True return False
def check_installed_roc_plugin(self, wp_path: str) -> bool: """ Checks that ROC plugin was installed by us, or not exists at all Returns False if the plugin was found but our config modifications was not """ if os.path.exists(os.path.join(wp_path, 'wp-content', 'plugins', 'redis-cache')): if not self._is_our_roc_plugin(wp_path): return False return True
@update_command_progress def collect_wordpress_issues( self, module_name: Feature, wordpress_info: Dict, docroot: str, module_is_enabled: bool): """ Collects incompatibilities related to wordpress setup (conflicting plugin enabled, etc) """ issues = [] minimum_supported_wp_version = module_name.minimum_supported_wp_version() if parse_version(wordpress_info["version"]) < parse_version(minimum_supported_wp_version): issues.append( CompatibilityIssue( header=_('Unsupported WordPress version'), description=_('Optimization feature is incompatible with ' 'WordPress version %(wp_version)s being used.'), fix_tip=_('The minimal supported WordPress version is %(minimum_supported_wp_version)s. ' 'Upgrade your WordPress installation or reinstall it from the scratch.'), context=dict( minimum_supported_wp_version=minimum_supported_wp_version, wp_version=wordpress_info["version"] ), unique_id=UniqueId.UNCOMPATIBLE_WORDPRESS_VERSION, telemetry=dict( minimum_supported_wp_version=minimum_supported_wp_version, wp_version=wordpress_info["version"] ) ))
issues.extend(module_name.collect_wordpress_issues(self, wordpress_info, docroot, module_is_enabled)) return issues
@parser.command(help="Output AccelerateWP user config (with Redis status and Redis current memory)") @track_command_progress def get(self): allowed_features_dict = get_allowed_features_dict(user_uid()) allowed_features = list(chain(*allowed_features_dict.values()))
converted_allowed_features = {feature_set: convert_feature_list_to_interface(features) for feature_set, features in allowed_features_dict.items()}
php_info = cpanel.php_info()
if not php_info: if allowed_features: # empty php_info with any module allowed means that something went wrong error_and_exit( self._is_json, {"result": _("Failed to retrieve data about php version which is currently used. " "Contact your administrator if the issue persists.")}, ) # no modules allowed and the fact that we cant retrieve php_info # probably means that Wpos was never allowed for user return {"docroots": [], "allowed_features": converted_allowed_features}
self.command_progress.update()
user_info = self._get_user_info()
self.command_progress.recalculate_number_of_total_stages(user_info) self.command_progress.update()
for optimization_feature in ALL_OPTIMIZATION_FEATURES: general_issues = self.collect_general_issues(optimization_feature)
for docroot, doc_root_info in user_info.items(): docroot_issues = self.collect_docroot_issues(optimization_feature, doc_root_info, php_info, allowed_features)
for wp in doc_root_info["wps"]: is_enabled = self._is_enabled(doc_root_info["domains"][0], wp["path"], optimization_feature) wordpress_issues = self.collect_wordpress_issues( optimization_feature, wp, docroot, module_is_enabled=is_enabled)
module_info = {"enabled": is_enabled} issues = [*general_issues, *docroot_issues, *wordpress_issues] if issues: module_info.update({"issues": [ issue.dict_repr for issue in issues ]})
wp.setdefault("features", {}).update({optimization_feature.to_interface_name(): module_info})
return { "docroots": list(user_info.values()), "allowed_features": converted_allowed_features, "used_memory": RedisLibUser(self.redis_socket_path).get_redis_used_memory(), "max_cache_memory": UserConfig(user_name()).get_config().get( "max_cache_memory", UserConfig.DEFAULT_MAX_CACHE_MEMORY ) }
@parser.argument('--website', type=str, help='Website to scan', required=True) @parser.command(help="Scan user for website incompatibilities [x-ray smart advice only]") def scan(self): php_info = cpanel.php_info() allowed_modules = get_allowed_modules(user_uid())
if not php_info: return {"issues": {}}
user_info = self._get_user_info()
wps = {} for module_name in ALL_OPTIMIZATION_FEATURES: general_issues = self.collect_general_issues(module_name)
for docroot, doc_root_info in user_info.items(): if self._opts.website not in doc_root_info["domains"]: continue docroot_issues = self.collect_docroot_issues(module_name, doc_root_info, php_info, allowed_modules)
for wp in doc_root_info["wps"]: is_enabled = self._is_enabled(doc_root_info["domains"][0], wp["path"], module_name) if is_enabled: wps.setdefault(wp['path'], []).append({ 'type': UniqueId.CLOUDLINUX_MODULE_ALREADY_ENABLED, 'context': dict(), 'advice_type': module_name.NAME }) else: wordpress_issues = self.collect_wordpress_issues( module_name, wp, docroot, module_is_enabled=is_enabled)
incompatibilities = [*general_issues, *docroot_issues, *wordpress_issues] wps.setdefault(wp['path'], []).extend([ { 'type': issue.unique_id, 'context': issue.telemetry, 'advice_type': module_name.NAME } for issue in incompatibilities if isinstance(issue, CompatibilityIssue) ])
return { "issues": wps }
def _get_user_info(self): user_info = cpanel.get_user_info() for doc_root_info in user_info.values(): if doc_root_info["php"]["fpm"]: doc_root_info["php"]["handler"] = "php-fpm" php = doc_root_info.pop("php") doc_root_info["php_version"] = php["version"] doc_root_info["php_handler"] = php["handler"] return user_info
def get_current_issues_by_docroot( self, module_name: Feature, wordpress_path: str, docroot: str, is_module_enabled: bool ): """ Obtains issues for special docroot and wordpress """ wordpress_issues = [] allowed_modules = get_allowed_modules(user_uid()) general_issues = self.collect_general_issues(module_name) php_info = cpanel.php_info() user_info_by_docroot = cpanel.get_user_info()[docroot] if user_info_by_docroot["php"]["fpm"]: user_info_by_docroot["php"]["handler"] = "php-fpm" php = user_info_by_docroot.pop("php") user_info_by_docroot["php_version"] = php["version"] user_info_by_docroot["php_handler"] = php["handler"] docroot_issues = self.collect_docroot_issues(module_name, user_info_by_docroot, php_info, allowed_modules) for wp in user_info_by_docroot["wps"]: if wp['path'] != wordpress_path: continue wordpress_issues = self.collect_wordpress_issues(module_name, wp, docroot, module_is_enabled=is_module_enabled) return [issue.dict_repr for issue in (*general_issues, *docroot_issues, *wordpress_issues)]
def _is_enabled(self, domain: str, wp_path: str, module: str) -> bool: uc = UserConfig(user_name()) return uc.is_module_enabled(domain, wp_path, module) and module in get_allowed_modules(user_uid())
@parser.argument('--max_cache_memory', help='Maximum cache memory to use in MB', type=MaxCacheMemory, default=constants.DEFAULT_MAX_CACHE_MEMORY) @parser.command(help='Set parameters of global modules in AccelerateWP user config') @check_license_decorator def set(self): uc = UserConfig(user_name()) # TODO: this method must probably UPDATE config and not REPLACE it completely params = {"max_cache_memory": f"{self._opts.max_cache_memory}mb"} uc.set_params(params) daemon_communicate(self.COMMAND_RELOAD_DICT) return params
@parser.argument("--feature", help="Optimization feature to disable", type=Feature, required=True, choices=[feature.to_interface_name() for feature in ALL_OPTIMIZATION_FEATURES]) @parser.argument('--wp-path', help='Path to user\'s wordpress', type=str, default='') @parser.argument('--domain', help='User\'s wordpress domain', type=str, required=True) @parser.command(help='Disables and uninstalls module on wordpress') @track_command_progress def disable(self): username, doc_root = check_domain(self._opts.domain) wp_path = self._opts.wp_path.strip("/") uc = UserConfig(user_name()) errors = []
feature_name = self._opts.feature.optimization_feature()
if not uc.is_module_enabled(self._opts.domain, wp_path, feature_name): return {"warning": _('Optimization feature %(feature)s is already disabled on the domain %(domain)s. ' 'Nothing to be done.'), "context": {"domain": self._opts.domain, "feature": self._opts.feature}}
self._check_monitoring_daemon_and_exit(feature=feature_name)
self.command_progress.update()
last_error = cpanel.disable_without_config_affecting( cpanel.DomainName(self._opts.domain), wp_path, module=feature_name)
if last_error: logger.error(last_error.message % last_error.context) errors.append(last_error) else: # do not change config values in case if disable reported errors try: uc.disable_module(self._opts.domain, wp_path, feature_name) except Exception as e: logger.exception("unable to disable module in config") errors.append(e)
self.command_progress.update()
if self._is_redis_daemon_reload_needed(uc, feature=feature_name): try: cpanel.reload_redis() except WposError as e: s_details = e.details % e.context logger.exception("CLWPOS daemon error: '%s'; details: '%s'", e.message, s_details) errors.append(e) except Exception as e: logger.exception("unable to reload cache backend") errors.append(e)
self.command_progress.update()
is_module_enabled = self._is_enabled(self._opts.domain, wp_path, feature_name)
try: last_error = errors.pop(-1) except IndexError: last_error = None
if is_module_enabled: if last_error: raise WposError( message=_("Optimization feature disabling failed because one or more steps reported error. " "Caching is still active, but may work unstable. Try disabling it again. " "Contact your system administrator if this issue persists. " "Detailed information you can find in log file '%(log_path)s'"), details=last_error.message, context={ 'log_path': USER_LOGFILE_PATH.format(homedir=home_dir()), **getattr(last_error, 'context', {}) } ) else: error_and_exit( self._is_json, {"result": _("WordPress caching module is still enabled, but no errors happened. " "Try again and contact your system administrator if this issue persists.")}, ) else: response = { "feature": { "enabled": is_module_enabled }, "used_memory": RedisLibUser(self.redis_socket_path).get_redis_used_memory() } if last_error: response.update( { "warning": _("Optimization feature disabled, but one or more steps reported error. " "Detailed information you can find in log file '%(log_path)s'"), "context": { "log_path": USER_LOGFILE_PATH.format(homedir=home_dir()) } } ) else: issues = self.get_current_issues_by_docroot(feature_name, wp_path, doc_root, is_module_enabled) if issues: response["feature"]["issues"] = issues
return response
@parser.argument('--feature', type=str, help='List of AccelerateWP optimization features separated by commas ' 'on which to perform an action', choices=ALL_OPTIMIZATION_FEATURES, default=OBJECT_CACHE_FEATURE) @parser.argument('--action', type=str, help='Action to perform on AccelerateWP optimization features', required=True) @parser.command(help='Perform action on AccelerateWP optimization features') def do_action(self): """ Perform action on selected AccelerateWP features """ action = self._opts.action feature = self._opts.feature # dictionary where key - action, value - function that provides that action action_to_func = {"purge": self.redis_purge} if action not in action_to_func: error_and_exit( self._is_json, { "result": f'Invalid action "{action}", currently only "purge" action is supported' }, )
return action_to_func[action](feature) # perform action on module
def redis_purge(self, *args): """ Clean entire redis cache for user. """ return RedisLibUser(self.redis_socket_path).purge_redis()
@parser.argument( "--ignore-errors", help="ignore site check results after plugin install and enable", action="store_true", ) @parser.argument("--wp-path", help="Path to user's wordpress", type=str, default="") @parser.argument("--domain", help="User's wordpress domain", type=str, required=True) @parser.argument("--feature", help="Optimization feature to enable", type=Feature, required=True, choices=[feature.to_interface_name() for feature in ALL_OPTIMIZATION_FEATURES]) @parser.command(help="Installs and enables module on wordpress") @check_license_decorator @track_command_progress def enable(self): """ Enable object_cache for user with compliance with end-user spec. :return: """ wp_path = self._opts.wp_path.strip("/")
username, doc_root = check_domain(self._opts.domain)
abs_wp_path = os.path.join(doc_root, wp_path) uc = UserConfig(username) feature_name = self._opts.feature.optimization_feature()
is_module_enabled = self._is_enabled(self._opts.domain, wp_path, feature_name) if is_module_enabled: return {"warning": _("The %(feature)s optimization feature is already enabled on " "the domain %(domain)s. Nothing to be done."), "context": {"domain": self._opts.domain, "feature": self._opts.feature}}
# check that enabling of module is allowed by admin if feature_name not in get_allowed_modules(user_uid()): error_and_exit( self._is_json, { "result": _("Usage of the optimization feature %(feature)s is prohibited by admin."), "context": {"feature": self._opts.feature}, } )
self._check_redis_configuration_process_and_exit(feature=feature_name) self._check_monitoring_daemon_and_exit(feature=feature_name)
self.command_progress.update()
# check that user's wordpress fits to requirements issues = self.get_current_issues_by_docroot( feature_name, wp_path, doc_root, is_module_enabled) if issues: error_and_exit( self._is_json, {'feature': {'enabled': is_module_enabled, 'issues': issues}, 'result': _('Website "%(domain)s/%(wp_path)s" has compatibility ' 'issues and optimization feature cannot be enabled.'), 'context': {'domain': self._opts.domain, 'wp_path': wp_path}}, )
# check user's site before installation if not self._opts.ignore_errors: self._website_check(abs_wp_path, self._opts.domain, uc, wp_path, feature_name)
self.command_progress.update()
# try to enable module with wp-cli without adding info into user's wpos config ok, enable_failed_info = cpanel.enable_without_config_affecting( cpanel.DomainName(self._opts.domain), wp_path, module=feature_name, ) if not ok: raise WposError(**enable_failed_info)
uc.enable_module(self._opts.domain, wp_path, feature_name)
self.command_progress.update()
if self._is_redis_daemon_reload_needed(uc, feature=feature_name): try: cpanel.reload_redis() except Exception: uc.disable_module(self._opts.domain, wp_path, feature_name) cpanel.disable_module(feature_name, abs_wp_path) raise
self.command_progress.update()
# check user's site after installation and enabling if not self._opts.ignore_errors: self._website_check(abs_wp_path, self._opts.domain, uc, wp_path, feature_name, rollback=True)
self.command_progress.update()
is_module_enabled = self._is_enabled(self._opts.domain, wp_path, feature_name) issues = self.get_current_issues_by_docroot(feature_name, wp_path, doc_root, is_module_enabled)
module_data = { 'enabled': is_module_enabled } if issues: module_data.update({ 'issues': issues })
return { 'feature': module_data, 'used_memory': RedisLibUser(self.redis_socket_path).get_redis_used_memory() }
def _website_check(self, abs_wp_path, domain, uc, wp_path, module, rollback=False): """ Performs website availability checks and raises errors in case of problems
:param abs_wp_path: absolute path to wordpress installation :param domain: domain that wordpress installation belongs to :param uc: user config instance :param wp_path: relative path to wordpress installation :param rollback: whether to roll redis and plugin changes back :return: """
try: post_site_check(domain, wp_path, abs_wp_path) except WposError as e: if rollback: uc.disable_module(domain, wp_path, module) cpanel.reload_redis() cpanel.disable_module(module, abs_wp_path)
if isinstance(e, RollbackException): raise
error_and_exit( is_json=True, message={ "context": {}, "result": _("Unexpected error occurred during plugin installation for WordPress. " "Try again and contact your system administrator if the issue persists."), "details": str(e) }, )
@parser.command(help="Detect progress of the currently performed command") def get_progress(self): """ Return current progress of the currently performed command. Provides amount of: - total stages - how many stages the command execution process is divided into, - completed stages - how many stages have been already completed. """ return CommandProgress.get_status()
|