1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-01-17 08:48:05 +00:00

Implement plugin manager and abstract plugin definition

This commit is contained in:
thatmattlove 2021-09-11 11:17:38 -07:00
parent 3c012f7ed1
commit a89d08ba58
8 changed files with 227 additions and 68 deletions

View file

@ -92,3 +92,7 @@ class ParsingError(PrivateHyperglassError):
class DependencyError(PrivateHyperglassError):
"""Raised when a dependency is missing, not running, or on the wrong version."""
class PluginError(PrivateHyperglassError):
"""Raised when a plugin error occurs."""

View file

@ -2,11 +2,14 @@
# Local
from .main import init_plugins
from ._input import InputPlugin
from ._output import OutputPlugin
from ._register import register_output_plugin
from ._manager import InputPluginManager, OutputPluginManager
__all__ = (
"OutputPlugin",
"register_output_plugin",
"init_plugins",
"InputPlugin",
"InputPluginManager",
"OutputPlugin",
"OutputPluginManager",
)

View file

@ -0,0 +1,50 @@
"""Base Plugin Definition."""
# Standard Library
from abc import ABC
from typing import Any, Union, Literal
from inspect import Signature
# Third Party
from pydantic import BaseModel
PluginType = Union[Literal["output"], Literal["input"]]
class HyperglassPlugin(BaseModel, ABC):
"""Plugin to interact with device command output."""
name: str
@property
def _signature(self) -> Signature:
"""Get this instance's class signature."""
return self.__class__.__signature__
def __eq__(self, other: "HyperglassPlugin"):
"""Other plugin is equal to this plugin."""
return other and self._signature == other._signature
def __ne__(self, other: "HyperglassPlugin"):
"""Other plugin is not equal to this plugin."""
return not self.__eq__(other)
def __hash__(self) -> int:
"""Create a hashed representation of this plugin's name."""
return hash(self._signature)
def __str__(self) -> str:
"""Represent plugin by its name."""
return self.name
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
"""Initialize plugin object."""
name = kwargs.pop("name", None) or cls.__name__
cls._name = name
super().__init_subclass__()
def __init__(self, **kwargs: Any) -> None:
"""Initialize plugin instance."""
name = kwargs.pop("name", None) or self.__class__.__name__
super().__init__(name=name, **kwargs)

View file

@ -0,0 +1,19 @@
"""Input validation plugins."""
# Standard Library
from abc import abstractmethod
# Project
from hyperglass.models.api.query import Query
# Local
from ._base import HyperglassPlugin
class InputPlugin(HyperglassPlugin):
"""Plugin to validate user input prior to running commands."""
@abstractmethod
def process(self, device_output: str, query: Query) -> str:
"""Validate input from hyperglass UI/API."""
pass

View file

@ -0,0 +1,134 @@
"""Plugin manager definition."""
# Standard Library
import json
import codecs
import pickle
from typing import List, Generic, TypeVar, Callable, Generator
from inspect import isclass
# Project
from hyperglass.cache import SyncCache
from hyperglass.configuration import REDIS_CONFIG, params
from hyperglass.exceptions.private import PluginError
# Local
from ._base import PluginType, HyperglassPlugin
from ._input import InputPlugin
from ._output import OutputPlugin
PluginT = TypeVar("PluginT")
class PluginManager(Generic[PluginT]):
"""Manage all plugins."""
_type: PluginType
_cache: SyncCache
_index: int = 0
_cache_key: str
def __init__(self: "PluginManager") -> None:
"""Initialize plugin manager."""
self._cache = SyncCache(db=params.cache.database, **REDIS_CONFIG)
self._cache_key = f"hyperglass.plugins.{self._type}"
def __init_subclass__(cls: "PluginManager", **kwargs: PluginType) -> None:
"""Set this plugin manager's type on subclass initialization."""
_type = kwargs.get("type", None) or cls._type
if _type is None:
raise PluginError(
"Plugin '{}' is missing a 'type', keyword argument", repr(cls)
)
cls._type = _type
return super().__init_subclass__()
def __iter__(self: "PluginManager") -> "PluginManager":
"""Plugin manager iterator."""
return self
def __next__(self: "PluginManager") -> PluginT:
"""Plugin manager iteration."""
if self._index <= len(self.plugins):
result = self.plugins[self._index - 1]
self._index += 1
return result
self._index = 0
raise StopIteration
def _get_plugins(self: "PluginManager") -> List[PluginT]:
"""Retrieve plugins from cache."""
cached = self._cache.get(self._cache_key)
return list(
{
pickle.loads(codecs.decode(plugin.encode(), "base64"))
for plugin in cached
}
)
def _clear_plugins(self: "PluginManager") -> None:
"""Remove all plugins."""
self._cache.set(self._cache_key, json.dumps([]))
@property
def plugins(self: "PluginManager") -> List[PluginT]:
"""Get all plugins."""
return self._get_plugins()
def methods(self: "PluginManager", name: str) -> Generator[Callable, None, None]:
"""Get methods of all registered plugins matching `name`."""
for plugin in self.plugins:
if hasattr(plugin, name):
method = getattr(plugin, name)
if callable(method):
yield method
def reset(self: "PluginManager") -> None:
"""Remove all plugins."""
self._index = 0
self._cache = SyncCache(db=params.cache.database, **REDIS_CONFIG)
return self._clear_plugins()
def unregister(self: "PluginManager", plugin: PluginT) -> None:
"""Remove a plugin from currently active plugins."""
if isclass(plugin):
if issubclass(plugin, HyperglassPlugin):
plugins = {
# Create a base64 representation of a picked plugin.
codecs.encode(pickle.dumps(p), "base64").decode()
# Merge current plugins with the new plugin.
for p in self._get_plugins()
if p != plugin
}
# Add plugins from cache.
self._cache.set(
f"hyperglass.plugins.{self._type}", json.dumps(list(plugins))
)
return
raise PluginError("Plugin '{}' is not a valid hyperglass plugin", repr(plugin))
def register(self: "PluginManager", plugin: PluginT) -> None:
"""Add a plugin to currently active plugins."""
# Create a set of plugins so duplicate plugins are not mistakenly added.
if isclass(plugin):
if issubclass(plugin, HyperglassPlugin):
plugins = {
# Create a base64 representation of a picked plugin.
codecs.encode(pickle.dumps(p), "base64").decode()
# Merge current plugins with the new plugin.
for p in [*self._get_plugins(), plugin()]
}
# Add plugins from cache.
self._cache.set(
f"hyperglass.plugins.{self._type}", json.dumps(list(plugins))
)
return
raise PluginError("Plugin '{}' is not a valid hyperglass plugin", repr(plugin))
class InputPluginManager(PluginManager[InputPlugin], type="input"):
"""Manage Input Validation Plugins."""
class OutputPluginManager(PluginManager[OutputPlugin], type="output"):
"""Manage Output Processing Plugins."""

View file

@ -1,29 +1,19 @@
"""Device output plugins."""
# Standard Library
import abc
from abc import abstractmethod
# Project
from hyperglass.models import HyperglassModel
from hyperglass.models.config.devices import Device
# Local
from ._base import HyperglassPlugin
class OutputPlugin(HyperglassModel, abc.ABC):
class OutputPlugin(HyperglassPlugin):
"""Plugin to interact with device command output."""
def __eq__(self, other: "OutputPlugin"):
"""Other plugin is equal to this plugin."""
return other and self.__repr_name__ == other.__repr_name__
def __ne__(self, other: "OutputPlugin"):
"""Other plugin is not equal to this plugin."""
return not self.__eq__(other)
def __hash__(self) -> int:
"""Create a hashed representation of this plugin's name."""
return hash(self.__repr_name__)
@abc.abstractmethod
@abstractmethod
def process(self, device_output: str, device: Device) -> str:
"""Process/manipulate output from a device."""
pass

View file

@ -1,47 +0,0 @@
"""Plugin registration."""
# Standard Library
import json
import codecs
import pickle
from typing import List, Generator
# Project
from hyperglass.log import log
from hyperglass.cache import SyncCache
from hyperglass.configuration import REDIS_CONFIG, params
# Local
from ._output import OutputPlugin
CACHE = SyncCache(db=params.cache.database, **REDIS_CONFIG)
def get_plugins() -> Generator[OutputPlugin, None, None]:
"""Retrieve plugins from cache."""
# Retrieve plugins from cache.
raw = CACHE.get("hyperglass.plugins.output")
if isinstance(raw, List):
for plugin in raw:
yield pickle.loads(codecs.decode(plugin.encode(), "base64"))
def add_plugin(plugin: OutputPlugin) -> None:
"""Add a plugin to currently active plugins."""
# Create a set of plugins so duplicate plugins are not mistakenly added.
plugins = {
# Create a base64 representation of a picked plugin.
codecs.encode(pickle.dumps(p), "base64").decode()
# Merge current plugins with the new plugin.
for p in [*get_plugins(), plugin]
}
# Add plugins from cache.
CACHE.set("hyperglass.plugins.output", json.dumps(list(plugins)))
def register_output_plugin(plugin: OutputPlugin):
"""Register an output plugin."""
if issubclass(plugin, OutputPlugin):
plugin = plugin()
add_plugin(plugin)
log.info("Registered output plugin '{}'", plugin.__class__.__name__)

View file

@ -5,8 +5,9 @@ from inspect import isclass
# Local
from . import _builtin
from ._input import InputPlugin
from ._output import OutputPlugin
from ._register import register_output_plugin
from ._manager import InputPluginManager, OutputPluginManager
def init_plugins() -> None:
@ -15,4 +16,9 @@ def init_plugins() -> None:
plugin = getattr(_builtin, name)
if isclass(plugin):
if issubclass(plugin, OutputPlugin):
register_output_plugin(plugin)
manager = OutputPluginManager()
elif issubclass(plugin, InputPlugin):
manager = InputPluginManager()
else:
continue
manager.register(plugin)