From 7bd23e58c8302f97f08603cfc22360bf83f4f6eb Mon Sep 17 00:00:00 2001 From: thatmattlove Date: Mon, 13 Sep 2021 02:39:37 -0700 Subject: [PATCH] Implement Juniper BGP Route plugin and tests --- .../plugins/_builtin/bgp_route_juniper.py | 128 ++++++++++++++++++ hyperglass/plugins/tests/__init__.py | 1 + .../plugins/tests/test_bgp_route_juniper.py | 57 ++++++++ 3 files changed, 186 insertions(+) create mode 100644 hyperglass/plugins/_builtin/bgp_route_juniper.py create mode 100644 hyperglass/plugins/tests/__init__.py create mode 100644 hyperglass/plugins/tests/test_bgp_route_juniper.py diff --git a/hyperglass/plugins/_builtin/bgp_route_juniper.py b/hyperglass/plugins/_builtin/bgp_route_juniper.py new file mode 100644 index 0000000..128a386 --- /dev/null +++ b/hyperglass/plugins/_builtin/bgp_route_juniper.py @@ -0,0 +1,128 @@ +"""Coerce a Juniper route table in XML format to a standard BGP Table structure.""" + +# Standard Library +import re +from typing import TYPE_CHECKING, List, Sequence, Generator + +# Third Party +import xmltodict # type: ignore +from pydantic import PrivateAttr, ValidationError + +# Project +from hyperglass.log import log +from hyperglass.exceptions.private import ParsingError +from hyperglass.models.parsing.juniper import JuniperBGPTable + +# Local +from .._output import OutputPlugin + +if TYPE_CHECKING: + # Standard Library + from collections import OrderedDict + + # Project + from hyperglass.models.data import OutputDataModel + from hyperglass.models.config.devices import Device + + # Local + from .._output import OutputType + + +REMOVE_PATTERNS = ( + # The XML response can a CLI banner appended to the end of the XML + # string. For example: + # ``` + # + # ... + # + # {master} + # + # + # + # {master} noqa: E800 + # ``` + # + # This pattern will remove anything inside braces, including the braces. + r"\{.+\}", +) + + +def clean_xml_output(output: str) -> str: + """Remove Juniper-specific patterns from output.""" + + def scrub(lines: List[str]) -> Generator[str, None, None]: + """Clean & remove each pattern from each line.""" + for pattern in REMOVE_PATTERNS: + for line in lines: + # Remove the pattern & strip extra newlines + scrubbed = re.sub(pattern, "", line.strip()) + # Only return non-empty and non-newline lines + if scrubbed and scrubbed != "\n": + yield scrubbed + + lines = scrub(output.splitlines()) + + return "\n".join(lines) + + +def parse_juniper(output: Sequence[str]) -> "OutputDataModel": # noqa: C901 + """Parse a Juniper BGP XML response.""" + result = None + + for response in output: + cleaned = clean_xml_output(response) + + try: + parsed: "OrderedDict" = xmltodict.parse( + cleaned, force_list=("rt", "rt-entry", "community") + ) + + log.debug("Initially Parsed Response: \n{}", parsed) + + if "rpc-reply" in parsed.keys(): + if "xnm:error" in parsed["rpc-reply"]: + if "message" in parsed["rpc-reply"]["xnm:error"]: + err = parsed["rpc-reply"]["xnm:error"]["message"] + raise ParsingError('Error from device: "{}"', err) + + parsed_base = parsed["rpc-reply"]["route-information"] + elif "route-information" in parsed.keys(): + parsed_base = parsed["route-information"] + + if "route-table" not in parsed_base: + return result + + if "rt" not in parsed_base["route-table"]: + return result + + parsed = parsed_base["route-table"] + validated = JuniperBGPTable(**parsed) + bgp_table = validated.bgp_table() + + if result is None: + result = bgp_table + else: + result += bgp_table + + except xmltodict.expat.ExpatError as err: + raise ParsingError("Error parsing response data") from err + + except KeyError as err: + raise ParsingError("{key} was not found in the response", key=str(err)) + + except ValidationError as err: + raise ParsingError(err) + + return result + + +class BGPRoutePluginJuniper(OutputPlugin): + """Coerce a Juniper route table in XML format to a standard BGP Table structure.""" + + __hyperglass_builtin__: bool = PrivateAttr(True) + + def process(self, output: "OutputType", device: "Device") -> "OutputType": + """Parse Juniper response if data is a string (and is therefore unparsed).""" + if isinstance(output, (list, tuple)) and device.structured_output: + return parse_juniper(output) + return output diff --git a/hyperglass/plugins/tests/__init__.py b/hyperglass/plugins/tests/__init__.py new file mode 100644 index 0000000..30d0597 --- /dev/null +++ b/hyperglass/plugins/tests/__init__.py @@ -0,0 +1 @@ +"""Plugin tests.""" diff --git a/hyperglass/plugins/tests/test_bgp_route_juniper.py b/hyperglass/plugins/tests/test_bgp_route_juniper.py new file mode 100644 index 0000000..b368465 --- /dev/null +++ b/hyperglass/plugins/tests/test_bgp_route_juniper.py @@ -0,0 +1,57 @@ +"""Juniper BGP Route Parsing Tests.""" + +# flake8: noqa +# Standard Library +from pathlib import Path + +# Third Party +import py + +# Project +from hyperglass.log import log +from hyperglass.models.config.devices import Device +from hyperglass.models.data.bgp_route import BGPRouteTable + +# Local +from .._builtin.bgp_route_juniper import BGPRoutePluginJuniper + +DIRECT = Path(__file__).parent.parent.parent.parent / ".samples" / "juniper_route_direct.xml" +INDIRECT = Path(__file__).parent.parent.parent.parent / ".samples" / "juniper_route_indirect.xml" +AS_PATH = Path(__file__).parent.parent.parent.parent / ".samples" / "juniper_route_aspath.xml" + + +def _tester(sample: str): + plugin = BGPRoutePluginJuniper() + + device = Device( + name="Test Device", + address="127.0.0.1", + network={"name": "Test Network", "display_name": "Test Network"}, + credential={"username": "", "password": ""}, + nos="juniper", + structured_output=True, + commands=[{"id": "test", "name": "Test", "rules": []}], + ) + + result = plugin.process((sample,), device) + assert isinstance(result, BGPRouteTable), "Invalid parsed result" + assert hasattr(result, "count"), "BGP Table missing count" + assert result.count > 0, "BGP Table count is 0" + + +def test_juniper_bgp_route_direct(): + with DIRECT.open("r") as file: + sample = file.read() + return _tester(sample) + + +def test_juniper_bgp_route_indirect(): + with INDIRECT.open("r") as file: + sample = file.read() + return _tester(sample) + + +def test_juniper_bgp_route_aspath(): + with AS_PATH.open("r") as file: + sample = file.read() + return _tester(sample)