Viewing file: base.py (5.72 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import pwd from abc import ABC, abstractmethod from pathlib import Path from typing import Dict, List, Optional, Set
from imav.contracts.config import MalwareTune
TCP_PORTS_COMMON = [ "20", "21", "22", "25", "53", "80", "110", "443", "587", "993", "995", ]
class PanelException(Exception): pass
class InvalidTokenException(Exception): pass
class AbstractPanel(ABC): """Abstract class that provides only basic hosting panel integration functionality."""
NAME = "MINIMAL" OPEN_PORTS = { "tcp": { "in": ["465"] + TCP_PORTS_COMMON, "out": ["113"] + TCP_PORTS_COMMON, }, "udp": { "in": ["20", "21", "53", "443"], "out": ["20", "21", "53", "113", "123"], }, } exception = PanelException smtp_allow_users = [] # type: List[str]
@classmethod @abstractmethod def is_installed(cls): """ Checks if hosting panel installed on the known path :return: bool: """ pass
@classmethod async def version(cls): return None
@abstractmethod async def enable_imunify360_plugin(self, name=None): """ Registers and enables Imunify360 UI plugin in hosting panel """ pass
@abstractmethod async def disable_imunify360_plugin(self, name=None): """ UnRegisters Imunify360 UI plugin in hosting panel """ pass
@abstractmethod async def get_user_domains(self): """ Returns domains hosted via control panel :return: list """ pass
@abstractmethod async def get_users(self) -> List[str]: """ Returns system users from hosting panel :return: list """ pass
@abstractmethod async def get_domain_to_owner(self) -> Dict[str, List[str]]: """ Returns dict with domain to list of users pairs """ pass
@abstractmethod async def get_domains_per_user(self) -> Dict[str, List[str]]: """ Returns dict with user to list of domains pairs """ pass
async def get_user_details(self) -> Dict[str, Dict[str, str]]: """ Returns dict with user to email pairs """
return { user: {"email": "", "locale": ""} for user in await self.get_users() }
async def users_count(self) -> int: return len(list(await self.get_users()))
def authenticate(self, protocol, data: dict): """ Performs actions to distinguish endusers from admins :param protocol: _RpcServerProtocol :param data: parsed params :returns (user_type, user_name) """ name = None if protocol._uid != 0: # we can get here if a non-root web panel user visits i360 UI # To emulate it: # su -s /bin/bash -c # $'echo \'{"command":["config", "show"],"params":{}}\' # | nc -U -w1 \ # /var/run/defence360agent/non_root_simple_rpc.sock' # fakeuser pw = pwd.getpwuid(protocol._uid) name = pw.pw_name return protocol.user, name
@classmethod def get_modsec_config_path(cls): raise NotImplementedError
def get_SMTP_conflict_status(self) -> bool: """ Return Conflict status """ return False
@abstractmethod def basedirs(self) -> Set[str]: pass
@classmethod def base_home_dir(cls, home_dir: str) -> Path: base_dir = Path(home_dir).resolve().parent return base_dir
@classmethod def get_rapid_scan_db_dir(cls, home_dir: str) -> Optional[str]: try: base_dir = cls.base_home_dir(home_dir) resolved_home = Path(home_dir).resolve() tail = resolved_home.relative_to(base_dir) # Symbolic link loop could cause runtime error except (ValueError, RuntimeError): return None
if rapid_scan_basedir_override := getattr( MalwareTune, "RAPID_SCAN_BASEDIR_OVERRIDE", None ): base_dir = rapid_scan_basedir_override
return str(base_dir / ".rapid-scan-db" / tail)
@classmethod async def retrieve_key(cls) -> str: """ Returns registration key from panel, if possible, raise PanelException if not successful (or wrong panel key provided), or NoImplemented if method not supported by the panel. """ raise NotImplementedError
@classmethod async def notify(cls, *, message_type, params, user=None): """ Notify a customer using the panel internal tooling """ return None
@abstractmethod async def list_docroots(self) -> Dict[str, str]: """ :return dict with docroot to domain """ pass
def ensure_valid_panel(**dec_kwargs): """ Run function only if hosting panel is installed, elsewhere raise PanelException
This method is intended to be used as a decorator on AbstractPanel instance methods.
:raise PanelException: :param dec_kwargs: kwargs passed to is_installed function :return: """
def real_decorator(fn): """ :param fn: coroutine """
async def wrapper(self, *args, **kwargs): if not self.is_installed(**dec_kwargs): raise self.exception( "%s is not valid!" % self.__class__.__name__ ) return await fn(self, *args, **kwargs)
return wrapper
return real_decorator
class ModsecVendorsError(Exception): """ Raises when its impossible to get modsec vendor """
pass
|