1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-01-17 08:48:05 +00:00

Start output plugin core implementation

This commit is contained in:
thatmattlove 2021-09-12 15:06:34 -07:00
parent 52b7cbdd3c
commit a62785227e
13 changed files with 143 additions and 90 deletions

View file

@ -1,5 +1,5 @@
[flake8]
max-line-length=88
max-line-length=100
count=True
show-source=False
statistics=True
@ -16,7 +16,7 @@ per-file-ignores=
# Disable unused import warning for modules
hyperglass/*/__init__.py:F401
hyperglass/models/*/__init__.py:F401
ignore=W503,C0330,R504,D202,S403,S301,S404
ignore=W503,C0330,R504,D202,S403,S301,S404,E731
select=B, BLK, C, D, E, F, I, II, N, P, PIE, S, R, W
disable-noqa=False
hang-closing=False

View file

@ -762,7 +762,7 @@ class SSHTunnelForwarder:
gateway_timeout=None,
*args,
**kwargs, # for backwards compatibility
):
) -> None:
self.logger = logger or log
self.ssh_host_key = ssh_host_key
self.set_keepalive = set_keepalive
@ -1181,7 +1181,7 @@ class SSHTunnelForwarder:
return ssh_pkey
def _check_tunnel(self, _srv):
def _check_tunnel(self, _srv) -> None:
"""Check if tunnel is already established."""
if self.skip_tunnel_checkup:
self.tunnel_is_up[_srv.local_address] = True
@ -1216,7 +1216,7 @@ class SSHTunnelForwarder:
finally:
s.close()
def check_tunnels(self):
def check_tunnels(self) -> None:
"""Check that if all tunnels are established and populates.
:attr:`.tunnel_is_up`
@ -1224,7 +1224,7 @@ class SSHTunnelForwarder:
for _srv in self._server_list:
self._check_tunnel(_srv)
def start(self):
def start(self) -> None:
"""Start the SSH tunnels."""
if self.is_alive:
self.logger.warning("Already started!")
@ -1251,7 +1251,7 @@ class SSHTunnelForwarder:
"An error occurred while opening tunnels.",
)
def stop(self):
def stop(self) -> None:
"""Shut the tunnel down.
.. note:: This **had** to be handled with care before ``0.1.0``:
@ -1278,16 +1278,16 @@ class SSHTunnelForwarder:
self._server_list = [] # reset server list
self.tunnel_is_up = {} # reset tunnel status
def close(self):
def close(self) -> None:
"""Stop the an active tunnel, alias to :meth:`.stop`."""
self.stop()
def restart(self):
def restart(self) -> None:
"""Restart connection to the gateway and tunnels."""
self.stop()
self.start()
def _connect_to_gateway(self):
def _connect_to_gateway(self) -> None:
"""Open connection to SSH gateway.
- First try with all keys loaded from an SSH agent (if allowed)
@ -1330,7 +1330,7 @@ class SSHTunnelForwarder:
self.logger.error("Could not open connection to gateway")
def _serve_forever_wrapper(self, _srv, poll_interval=0.1):
def _serve_forever_wrapper(self, _srv, poll_interval=0.1) -> None:
"""Wrapper for the server created for a SSH forward."""
self.logger.info(
"Opening tunnel: {0} <> {1}".format(
@ -1345,7 +1345,7 @@ class SSHTunnelForwarder:
)
)
def _stop_transport(self):
def _stop_transport(self) -> None:
"""Close the underlying transport when nothing more is needed."""
try:
@ -1444,13 +1444,13 @@ class SSHTunnelForwarder:
)
@property
def is_active(self):
def is_active(self) -> bool:
""" Return True if the underlying SSH transport is up """
if "_transport" in self.__dict__ and self._transport.is_active():
return True
return False
def _check_is_started(self):
def _check_is_started(self) -> None:
if not self.is_active: # underlying transport not alive
msg = "Server is not started. Please .start() first!"
raise BaseSSHTunnelForwarderError(msg)
@ -1458,7 +1458,7 @@ class SSHTunnelForwarder:
msg = "Tunnels are not started. Please .start() first!"
raise HandlerSSHTunnelForwarderError(msg)
def __str__(self):
def __str__(self) -> str:
credentials = {
"password": self.ssh_password,
"pkeys": [
@ -1507,21 +1507,21 @@ class SSHTunnelForwarder:
self._remote_binds,
)
def __repr__(self):
def __repr__(self) -> str:
return self.__str__()
def __enter__(self):
def __enter__(self) -> "SSHTunnelForwarder":
try:
self.start()
return self
except KeyboardInterrupt:
self.__exit__()
def __exit__(self, *args):
def __exit__(self, *args) -> None:
self._stop_transport()
def open_tunnel(*args, **kwargs):
def open_tunnel(*args, **kwargs) -> "SSHTunnelForwarder":
"""Open an SSH Tunnel, wrapper for :class:`SSHTunnelForwarder`.
Arguments:

View file

@ -1,7 +1,8 @@
"""Base Connection Class."""
# Standard Library
from typing import Dict, Union, Sequence
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, Union, Sequence
# Project
from hyperglass.log import log
@ -14,8 +15,12 @@ from hyperglass.models.config.devices import Device
# Local
from ._construct import Construct
if TYPE_CHECKING:
# Project
from hyperglass.compat._sshtunnel import SSHTunnelForwarder
class Connection:
class Connection(ABC):
"""Base transport driver class."""
def __init__(self, device: Device, query_data: Query) -> None:
@ -28,6 +33,11 @@ class Connection:
self.query = self._query.queries()
self.plugin_manager = OutputPluginManager()
@abstractmethod
def setup_proxy(self: "Connection") -> "SSHTunnelForwarder":
"""Return a preconfigured sshtunnel.SSHTunnelForwarder instance."""
pass
async def parsed_response( # noqa: C901 ("too complex")
self, output: Sequence[str]
) -> Union[str, Sequence[Dict]]:

View file

@ -8,7 +8,7 @@ hyperglass-frr API calls, returns the output back to the front end.
# Standard Library
from ssl import CertificateError
from typing import Iterable
from typing import TYPE_CHECKING, Iterable
# Third Party
import httpx
@ -23,10 +23,18 @@ from hyperglass.exceptions.public import RestError, ResponseEmpty
# Local
from ._common import Connection
if TYPE_CHECKING:
# Project
from hyperglass.compat._sshtunnel import SSHTunnelForwarder
class AgentConnection(Connection):
"""Connect to target device via hyperglass-agent."""
def setup_proxy(self: "Connection") -> "SSHTunnelForwarder":
"""Return a preconfigured sshtunnel.SSHTunnelForwarder instance."""
raise NotImplementedError("AgentConnection does not implement an SSH proxy.")
async def collect(self) -> Iterable: # noqa: C901
"""Connect to a device running hyperglass-agent via HTTP."""
log.debug("Query parameters: {}", self.query)

View file

@ -1,7 +1,7 @@
"""Common Classes or Utilities for SSH Drivers."""
# Standard Library
from typing import Callable
from typing import TYPE_CHECKING
# Project
from hyperglass.log import log
@ -12,11 +12,15 @@ from hyperglass.exceptions.public import ScrapeError
# Local
from ._common import Connection
if TYPE_CHECKING:
# Project
from hyperglass.compat._sshtunnel import SSHTunnelForwarder
class SSHConnection(Connection):
"""Base class for SSH drivers."""
def setup_proxy(self) -> Callable:
def setup_proxy(self) -> "SSHTunnelForwarder":
"""Return a preconfigured sshtunnel.SSHTunnelForwarder instance."""
proxy = self.device.proxy

View file

@ -8,19 +8,22 @@ hyperglass-frr API calls, returns the output back to the front end.
# Standard Library
import signal
from typing import Any, Dict, Union, Callable, Sequence
from typing import Any, Dict, Union, Callable, Sequence, TYPE_CHECKING
# Project
from hyperglass.log import log
from hyperglass.models.api import Query
from hyperglass.configuration import params
from hyperglass.exceptions.public import DeviceTimeout, ResponseEmpty
if TYPE_CHECKING:
from hyperglass.models.api import Query
from .drivers import Connection
# Local
from .drivers import Connection, AgentConnection, NetmikoConnection, ScrapliConnection
from .drivers import AgentConnection, NetmikoConnection, ScrapliConnection
def map_driver(driver_name: str) -> Connection:
def map_driver(driver_name: str) -> "Connection":
"""Get the correct driver class based on the driver name."""
if driver_name == "scrapli":
@ -41,7 +44,7 @@ def handle_timeout(**exc_args: Any) -> Callable:
return handler
async def execute(query: Query) -> Union[str, Sequence[Dict]]:
async def execute(query: "Query") -> Union[str, Sequence[Dict]]:
"""Initiate query validation and execution."""
output = params.messages.general
@ -50,19 +53,15 @@ async def execute(query: Query) -> Union[str, Sequence[Dict]]:
log.debug("Matched device config: {}", query.device)
mapped_driver = map_driver(query.device.driver)
driver = mapped_driver(query.device, query)
driver: "Connection" = mapped_driver(query.device, query)
signal.signal(
signal.SIGALRM, handle_timeout(error=TimeoutError(), device=query.device)
)
signal.signal(signal.SIGALRM, handle_timeout(error=TimeoutError(), device=query.device))
signal.alarm(params.request_timeout - 1)
if query.device.proxy:
proxy = driver.setup_proxy()
with proxy() as tunnel:
response = await driver.collect(
tunnel.local_bind_host, tunnel.local_bind_port
)
response = await driver.collect(tunnel.local_bind_host, tunnel.local_bind_port)
else:
response = await driver.collect()

View file

@ -2,18 +2,20 @@
# Standard Library
from abc import ABC
from typing import Any, Union, Literal
from typing import Any, Union, Literal, TypeVar
from inspect import Signature
# Third Party
from pydantic import BaseModel
from pydantic import BaseModel, PrivateAttr
PluginType = Union[Literal["output"], Literal["input"]]
SupportedMethod = TypeVar("SupportedMethod")
class HyperglassPlugin(BaseModel, ABC):
"""Plugin to interact with device command output."""
__hyperglass_builtin__: bool = PrivateAttr(False)
name: str
@property
@ -21,11 +23,13 @@ class HyperglassPlugin(BaseModel, ABC):
"""Get this instance's class signature."""
return self.__class__.__signature__
def __eq__(self, other: "HyperglassPlugin"):
def __eq__(self, other: "HyperglassPlugin") -> bool:
"""Other plugin is equal to this plugin."""
return other and self._signature == other._signature
if hasattr(other, "_signature"):
return other and self._signature == other._signature
return False
def __ne__(self, other: "HyperglassPlugin"):
def __ne__(self, other: "HyperglassPlugin") -> bool:
"""Other plugin is not equal to this plugin."""
return not self.__eq__(other)

View file

@ -3,6 +3,9 @@
# Standard Library
from typing import TYPE_CHECKING
# Third Party
from pydantic import PrivateAttr
# Local
from .._output import OutputPlugin
@ -14,6 +17,8 @@ if TYPE_CHECKING:
class RemoveCommand(OutputPlugin):
"""Remove anything before the command if found in output."""
__hyperglass_builtin__: bool = PrivateAttr(True)
def process(self, device_output: str, device: "Device") -> str:
"""Remove anything before the command if found in output."""
output = device_output.strip().split("\n")

View file

@ -1,8 +1,7 @@
"""Input validation plugins."""
# Standard Library
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union
# Local
from ._base import HyperglassPlugin
@ -11,11 +10,12 @@ if TYPE_CHECKING:
# Project
from hyperglass.models.api.query import Query
InputPluginReturn = Union[None, bool]
class InputPlugin(HyperglassPlugin):
"""Plugin to validate user input prior to running commands."""
@abstractmethod
def process(self, device_output: str, query: "Query") -> str:
def validate(self, query: "Query") -> InputPluginReturn:
"""Validate input from hyperglass UI/API."""
pass
return None

View file

@ -4,7 +4,7 @@
import json
import codecs
import pickle
from typing import List, Generic, TypeVar, Callable, Generator
from typing import TYPE_CHECKING, List, Generic, TypeVar, Callable, Generator
from inspect import isclass
# Project
@ -15,8 +15,13 @@ from hyperglass.exceptions.private import PluginError
# Local
from ._base import PluginType, HyperglassPlugin
from ._input import InputPlugin
from ._output import OutputPlugin
from ._input import InputPlugin, InputPluginReturn
from ._output import OutputPlugin, OutputPluginReturn
if TYPE_CHECKING:
# Project
from hyperglass.models.api.query import Query
from hyperglass.models.config.devices import Device
PluginT = TypeVar("PluginT")
@ -38,9 +43,7 @@ class PluginManager(Generic[PluginT]):
"""Set this plugin manager's type on subclass initialization."""
_type = kwargs.get("type", None) or cls._type
if _type is None:
raise PluginError(
"Plugin '{}' is missing a 'type', keyword argument", repr(cls)
)
raise PluginError("Plugin '{}' is missing a 'type', keyword argument", repr(cls))
cls._type = _type
return super().__init_subclass__()
@ -60,12 +63,7 @@ class PluginManager(Generic[PluginT]):
def _get_plugins(self: "PluginManager") -> List[PluginT]:
"""Retrieve plugins from cache."""
cached = self._cache.get(self._cache_key)
return list(
{
pickle.loads(codecs.decode(plugin.encode(), "base64"))
for plugin in cached
}
)
return list({pickle.loads(codecs.decode(plugin.encode(), "base64")) for plugin in cached})
def _clear_plugins(self: "PluginManager") -> None:
"""Remove all plugins."""
@ -73,8 +71,17 @@ class PluginManager(Generic[PluginT]):
@property
def plugins(self: "PluginManager") -> List[PluginT]:
"""Get all plugins."""
return self._get_plugins()
"""Get all plugins, with built-in plugins last."""
return sorted(
self.plugins,
key=lambda p: -1 if p.__hyperglass_builtin__ else 1, # flake8: noqa IF100
reverse=True,
)
@property
def name(self: PluginT) -> str:
"""Get this plugin manager's name."""
return self.__class__.__name__
def methods(self: "PluginManager", name: str) -> Generator[Callable, None, None]:
"""Get methods of all registered plugins matching `name`."""
@ -84,6 +91,10 @@ class PluginManager(Generic[PluginT]):
if callable(method):
yield method
def execute(self, *args, **kwargs) -> None:
"""Gather all plugins and execute in order."""
raise NotImplementedError(f"Plugin Manager '{self.name}' is missing an 'execute()' method.")
def reset(self: "PluginManager") -> None:
"""Remove all plugins."""
self._index = 0
@ -102,9 +113,7 @@ class PluginManager(Generic[PluginT]):
if p != plugin
}
# Add plugins from cache.
self._cache.set(
f"hyperglass.plugins.{self._type}", json.dumps(list(plugins))
)
self._cache.set(f"hyperglass.plugins.{self._type}", json.dumps(list(plugins)))
return
raise PluginError("Plugin '{}' is not a valid hyperglass plugin", repr(plugin))
@ -121,9 +130,7 @@ class PluginManager(Generic[PluginT]):
for p in [*self._get_plugins(), instance]
}
# Add plugins from cache.
self._cache.set(
f"hyperglass.plugins.{self._type}", json.dumps(list(plugins))
)
self._cache.set(f"hyperglass.plugins.{self._type}", json.dumps(list(plugins)))
log.success("Registered plugin '{}'", instance.name)
return
except TypeError:
@ -132,14 +139,37 @@ class PluginManager(Generic[PluginT]):
"Please consult the hyperglass documentation.",
p=repr(plugin),
)
raise PluginError(
"Plugin '{p}' is not a valid hyperglass plugin", p=repr(plugin)
)
raise PluginError("Plugin '{p}' is not a valid hyperglass plugin", p=repr(plugin))
class InputPluginManager(PluginManager[InputPlugin], type="input"):
"""Manage Input Validation Plugins."""
def execute(self: "InputPluginManager", query: "Query") -> InputPluginReturn:
"""Execute all input validation plugins.
If any plugin returns `False`, execution is halted.
"""
result = None
for plugin in self.plugins:
if result is False:
return result
result = plugin.validate(query)
return result
class OutputPluginManager(PluginManager[OutputPlugin], type="output"):
"""Manage Output Processing Plugins."""
def execute(self: "OutputPluginManager", output: str, device: "Device") -> OutputPluginReturn:
"""Execute all output parsing plugins.
The result of each plugin is passed to the next plugin.
"""
result = output
for plugin in self.plugins:
if result is False:
return result
# Pass the result of each plugin to the next plugin.
result = plugin.process(result, device)
return result

View file

@ -1,8 +1,7 @@
"""Device output plugins."""
# Standard Library
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union
# Local
from ._base import HyperglassPlugin
@ -10,12 +9,14 @@ from ._base import HyperglassPlugin
if TYPE_CHECKING:
# Project
from hyperglass.models.config.devices import Device
from hyperglass.models.parsing.serialized import ParsedRoutes
OutputPluginReturn = Union[None, "ParsedRoutes", str]
class OutputPlugin(HyperglassPlugin):
"""Plugin to interact with device command output."""
@abstractmethod
def process(self, device_output: str, device: "Device") -> str:
"""Process/manipulate output from a device."""
pass
def process(self, output: Union["ParsedRoutes", str], device: "Device") -> OutputPluginReturn:
"""Process or manipulate output from a device."""
return None

View file

@ -6,8 +6,7 @@ import sys
import json
import string
import platform
from queue import Queue
from typing import Dict, Union, Optional, Sequence, Generator
from typing import Any, Dict, Union, Optional, Sequence, Generator
from asyncio import iscoroutine
from pathlib import Path
from ipaddress import IPv4Address, IPv6Address, ip_address
@ -144,10 +143,7 @@ def format_listen_address(listen_address: Union[IPv4Address, IPv6Address, str])
log.error(err)
pass
if (
isinstance(listen_address, (IPv4Address, IPv6Address))
and listen_address.version == 6
):
if isinstance(listen_address, (IPv4Address, IPv6Address)) and listen_address.version == 6:
fmt = f"[{str(listen_address)}]"
return fmt
@ -159,9 +155,7 @@ def split_on_uppercase(s):
From: https://stackoverflow.com/a/40382663
"""
string_length = len(s)
is_lower_around = (
lambda: s[i - 1].islower() or string_length > (i + 1) and s[i + 1].islower()
)
is_lower_around = lambda: s[i - 1].islower() or string_length > (i + 1) and s[i + 1].islower()
start = 0
parts = []
@ -225,9 +219,7 @@ def get_cache_env():
db = os.environ.get("HYPERGLASS_CACHE_DB")
for i in (host, port, db):
if i is None:
raise LookupError(
"Unable to find cache configuration in environment variables"
)
raise LookupError("Unable to find cache configuration in environment variables")
return host, port, db

View file

@ -83,7 +83,7 @@ stackprinter = "^0.2.3"
taskipy = "^1.8.2"
[tool.black]
line-length = 88
line-length = 100
[tool.pyright]
exclude = ["**/node_modules", "**/ui", "**/__pycache__"]
@ -96,8 +96,8 @@ reportMissingTypeStubs = true
check = {cmd = "task lint && task ui-lint", help = "Run all lint checks"}
lint = {cmd = "flake8 hyperglass", help = "Run Flake8"}
sort = {cmd = "isort hyperglass", help = "Run iSort"}
start = {cmd = "uvicorn hyperglass.api:app", help = "Start hyperglass via Uvicorn"}
start-direct = {cmd = "python3 -m hyperglass.console start", help = "Start hyperglass via hyperglass.console.CLI"}
start = {cmd = "python3 -m hyperglass.console start", help = "Start hyperglass"}
start-asgi = {cmd = "uvicorn hyperglass.api:app", help = "Start hyperglass via Uvicorn"}
ui-build = {cmd = "python3 -m hyperglass.console build-ui", help = "Run a UI Build"}
ui-dev = {cmd = "yarn --cwd ./hyperglass/ui/ dev", help = "Start the Next.JS dev server"}
ui-format = {cmd = "yarn --cwd ./hyperglass/ui/ format", help = "Run Prettier"}