diff --git a/hyperglass/constants.py b/hyperglass/constants.py index 6d0718f..e903d74 100644 --- a/hyperglass/constants.py +++ b/hyperglass/constants.py @@ -78,4 +78,6 @@ SCRAPE_HELPERS = { DRIVER_MAP = { "frr_legacy": "hyperglass_agent", "bird_legacy": "hyperglass_agent", + "bird": "netmiko", + "frr": "netmiko", } diff --git a/hyperglass/exceptions/private.py b/hyperglass/exceptions/private.py index 86fb1a0..80b5c8d 100644 --- a/hyperglass/exceptions/private.py +++ b/hyperglass/exceptions/private.py @@ -31,8 +31,8 @@ class UnsupportedDevice(PrivateHyperglassError): # Project from hyperglass.constants import DRIVER_MAP - drivers = ("", *[*DRIVER_MAP.keys(), *CLASS_MAPPER.keys()].sort()) - driver_list = "\n - ".join(drivers) + sorted_drivers = sorted([*DRIVER_MAP.keys(), *CLASS_MAPPER.keys()]) + driver_list = "\n - ".join(("", *sorted_drivers)) super().__init__(message=f"'{platform}' is not supported. Must be one of:{driver_list}") diff --git a/hyperglass/models/config/devices.py b/hyperglass/models/config/devices.py index 708add1..11f1d88 100644 --- a/hyperglass/models/config/devices.py +++ b/hyperglass/models/config/devices.py @@ -8,18 +8,14 @@ from ipaddress import IPv4Address, IPv6Address # Third Party from pydantic import FilePath, StrictInt, StrictStr, StrictBool, validator +from netmiko.ssh_dispatcher import CLASS_MAPPER # type: ignore # Project from hyperglass.log import log -from hyperglass.util import ( - get_driver, - get_fmt_keys, - resolve_hostname, - validate_platform, -) +from hyperglass.util import get_driver, get_fmt_keys, resolve_hostname from hyperglass.state import use_state from hyperglass.settings import Settings -from hyperglass.constants import SCRAPE_HELPERS, SUPPORTED_STRUCTURED_OUTPUT +from hyperglass.constants import DRIVER_MAP, SCRAPE_HELPERS, SUPPORTED_STRUCTURED_OUTPUT from hyperglass.exceptions.private import ConfigError, UnsupportedDevice # Local @@ -31,6 +27,8 @@ from ..fields import SupportedDriver from ..directive import Directives from .credential import Credential +ALL_DEVICE_TYPES = {*DRIVER_MAP.keys(), *CLASS_MAPPER.keys()} + class DirectiveOptions(HyperglassModel, extra="ignore"): """Per-device directive options.""" @@ -181,8 +179,29 @@ class Device(HyperglassModelWithId, extra="allow"): src.save(target) return value + @validator("platform", pre=True, always=True) + def validate_platform(cls: "Device", value: Any, values: Dict[str, Any]) -> str: + """Validate & rewrite device platform, set default `directives`.""" + + if value is None: + # Ensure device platform is defined. + raise ConfigError( + "Device '{device}' is missing a 'platform' (Network Operating System) property", + device=values["name"], + ) + + if value in SCRAPE_HELPERS.keys(): + # Rewrite platform to helper value if needed. + value = SCRAPE_HELPERS[value] + + # Verify device platform is supported by hyperglass. + if value not in ALL_DEVICE_TYPES: + raise UnsupportedDevice(value) + + return value + @validator("structured_output", pre=True, always=True) - def validate_structured_output(cls, value: bool, values: Dict) -> bool: + def validate_structured_output(cls, value: bool, values: Dict[str, Any]) -> bool: """Validate structured output is supported on the device & set a default.""" if value is True: @@ -213,27 +232,6 @@ class Device(HyperglassModelWithId, extra="allow"): value.cert = cert_file return value - @validator("platform", pre=True, always=True) - def validate_platform(cls: "Device", value: Any, values: Dict[str, Any]) -> str: - """Validate & rewrite device platform, set default `directives`.""" - - if value is None: - # Ensure device platform is defined. - raise ConfigError( - "Device '{device}' is missing a 'platform' (Network Operating System) property", - device={values["name"]}, - ) - - if value in SCRAPE_HELPERS.keys(): - # Rewrite platform to helper value if needed. - value = SCRAPE_HELPERS[value] - - # Verify device platform is supported by hyperglass. - supported, _ = validate_platform(value) - if not supported: - raise UnsupportedDevice(value) - return value - @validator("directives", pre=True, always=True) def validate_directives(cls: "Device", value, values) -> "Directives": """Associate directive IDs to loaded directive objects.""" diff --git a/hyperglass/util/__init__.py b/hyperglass/util/__init__.py index 1dc9b47..16ba04c 100644 --- a/hyperglass/util/__init__.py +++ b/hyperglass/util/__init__.py @@ -47,68 +47,7 @@ def check_python() -> str: return platform.python_version() -async def write_env(variables: t.Dict) -> str: - """Write environment variables to temporary JSON file.""" - env_file = Path("/tmp/hyperglass.env.json") # noqa: S108 - env_vars = json.dumps(variables) - - try: - with env_file.open("w+") as ef: - ef.write(env_vars) - except Exception as e: - raise RuntimeError(str(e)) - - return f"Wrote {env_vars} to {str(env_file)}" - - -def set_app_path(required: bool = False) -> Path: - """Find app directory and set value to environment variable.""" - - # Standard Library - from getpass import getuser - - matched_path = None - - config_paths = (Path.home() / "hyperglass", Path("/etc/hyperglass/")) - - # Ensure only one app directory exists to reduce confusion. - if all((p.exists() for p in config_paths)): - raise RuntimeError( - "Both '{}' and '{}' exist. ".format(*(p.as_posix() for p in config_paths)) - + "Please choose only one configuration directory and delete the other." - ) - - for path in config_paths: - try: - if path.exists(): - tmp = path / "test.tmp" - tmp.touch() - if tmp.exists(): - matched_path = path - tmp.unlink() - break - except Exception: - matched_path = None - - if required and matched_path is None: - # Only raise an error if required is True - raise RuntimeError( - """ -No configuration directories were determined to both exist and be readable -by hyperglass. hyperglass is running as user '{un}' (UID '{uid}'), and tried -to access the following directories: -{dir}""".format( - un=getuser(), - uid=os.getuid(), - dir="\n".join(["\t - " + str(p) for p in config_paths]), - ) - ) - - os.environ["hyperglass_directory"] = str(matched_path) - return matched_path - - -def split_on_uppercase(s): +def split_on_uppercase(s: str) -> t.List[str]: """Split characters by uppercase letters. From: https://stackoverflow.com/a/40382663 @@ -127,7 +66,7 @@ def split_on_uppercase(s): return parts -def parse_exception(exc): +def parse_exception(exc: BaseException) -> str: """Parse an exception and its direct cause.""" if not isinstance(exc, BaseException): @@ -157,31 +96,6 @@ def parse_exception(exc): return ", caused by ".join(parsed) -def set_cache_env(host, port, db): - """Set basic cache config parameters to environment variables. - - Functions using Redis to access the pickled config need to be able - to access Redis without reading the config. - """ - - os.environ["HYPERGLASS_CACHE_HOST"] = str(host) - os.environ["HYPERGLASS_CACHE_PORT"] = str(port) - os.environ["HYPERGLASS_CACHE_DB"] = str(db) - return True - - -def get_cache_env(): - """Get basic cache config from environment variables.""" - - host = os.environ.get("HYPERGLASS_CACHE_HOST") - port = os.environ.get("HYPERGLASS_CACHE_PORT") - db = os.environ.get("HYPERGLASS_CACHE_DB") - for i in (host, port, db): - if i is None: - raise LookupError("Unable to find cache configuration in environment variables") - return host, port, db - - def make_repr(_class): """Create a user-friendly represention of an object.""" @@ -331,7 +245,10 @@ def deep_convert_keys(_dict: t.Type[DeepConvert], predicate: t.Callable[[str], s return converted -def at_least(minimum: int, value: int,) -> int: +def at_least( + minimum: int, + value: int, +) -> int: """Get a number value that is at least a specified minimum.""" if value < minimum: return minimum diff --git a/validate_examples.py b/validate_examples.py deleted file mode 100644 index 4a08214..0000000 --- a/validate_examples.py +++ /dev/null @@ -1,115 +0,0 @@ -"""Validate example files.""" - -# Standard Library -import re -import sys -from pathlib import Path - -# Third Party -import yaml - -# Project -from hyperglass.util import set_app_path - -EXAMPLES = Path(__file__).parent.parent / "hyperglass" / "examples" - -DEVICES = EXAMPLES / "devices.yaml" -COMMANDS = EXAMPLES / "commands.yaml" -MAIN = EXAMPLES / "hyperglass.yaml" - - -def _uncomment_files(): - """Uncomment out files.""" - for file in (MAIN, COMMANDS): - output = [] - with file.open("r") as f: - for line in f.readlines(): - commented = re.compile(r"^(#\s*#?\s?).*$") - if re.match(commented, line): - output.append(re.sub(r"^#\s*#?\s?$", "", line)) - else: - output.append(line) - with file.open("w") as f: - f.write("".join(output)) - return True - - -def _comment_optional_files(): - """Comment out files.""" - for file in (MAIN, COMMANDS): - output = [] - with file.open("r") as f: - for line in f.readlines(): - if not re.match(r"^(#\s*#?\s?).*$|(^\-{3})", line): - output.append("# " + line) - else: - output.append(line) - with file.open("w") as f: - f.write("".join(output)) - return True - - -def _validate_devices(): - # Project - from hyperglass.models.config.devices import Devices - - with DEVICES.open() as raw: - devices_dict = yaml.safe_load(raw.read()) or {} - try: - Devices(devices_dict.get("routers", [])) - except Exception as e: - raise ValueError(str(e)) - return True - - -def _validate_commands(): - # Project - from hyperglass.models.commands import Commands - - with COMMANDS.open() as raw: - commands_dict = yaml.safe_load(raw.read()) or {} - try: - Commands.import_params(**commands_dict) - except Exception as e: - raise ValueError(str(e)) - return True - - -def _validate_main(): - # Project - from hyperglass.models.config.params import Params - - with MAIN.open() as raw: - main_dict = yaml.safe_load(raw.read()) or {} - try: - Params(**main_dict) - except Exception as e: - raise - raise ValueError(str(e)) - return True - - -def validate_all(): - """Validate all example configs against configuration models.""" - _uncomment_files() - for validator in (_validate_main, _validate_commands, _validate_devices): - try: - validator() - except ValueError as e: - raise RuntimeError(str(e)) - return True - - -if __name__ == "__main__": - set_app_path(required=True) - try: - all_passed = validate_all() - message = "All tests passed" - status = 0 - except RuntimeError as e: - message = str(e) - status = 1 - if status == 0: - _comment_optional_files() - print(message) - sys.exit(status)