Edit: limits.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """Kernel-side LVP/LVE operations via pylve bindings.""" from lve_utils.pylve_wrapper import PyLve from .exceptions import LvdError try: import pylve except ImportError: pylve = None def get_pylve(): """Initialize and return a PyLve instance.""" if pylve is None: raise LvdError("pylve module not available") py = PyLve(pylve) py.initialize() if not py.domains_supported(): raise LvdError("kernel does not support per-domain limits " "(requires lve_lvp_create2)") return py def settings_from_limits(py, limits): """Create liblve_settings from a limits dict.""" settings = py.liblve_settings() if 'cpu' in limits: settings.ls_cpu = int(limits['cpu']) if 'pmem' in limits: settings.ls_memory_phy = int(limits['pmem']) if 'io' in limits: settings.ls_io = int(limits['io']) if 'nproc' in limits: settings.ls_nproc = int(limits['nproc']) if 'iops' in limits: settings.ls_iops = int(limits['iops']) return settings def settings_to_dict(settings): """Convert liblve_settings to a dict of limit fields.""" return { 'cpu': settings.ls_cpu, 'pmem': settings.ls_memory_phy, 'io': settings.ls_io, 'nproc': settings.ls_nproc, 'iops': settings.ls_iops, } def lvp_setup_direct(domain_id, settings): """Call pylve.lve_lvp_setup directly, bypassing the lveapi wrapper. The lveapi wrapper iterates /proc/lve/resellers/ which doesn't exist for nested domain LVPs (they are leaf containers).""" rc = pylve.lve_lvp_setup(domain_id, settings) return rc
Cancel