From ae6a1a0bb8395bf9460fa1040eee187ffd5c9a3d Mon Sep 17 00:00:00 2001 From: Wilhelm Schonfeldt Date: Fri, 26 Sep 2025 09:43:06 +0200 Subject: [PATCH] Refactor code for improved readability and consistency across multiple files using black --- hyperglass/compat/_sshtunnel.py | 16 ++-- hyperglass/constants.py | 9 +- hyperglass/defaults/directives/huawei.py | 8 +- hyperglass/defaults/directives/mikrotik.py | 20 ++-- hyperglass/external/_base.py | 4 +- hyperglass/external/rpki.py | 23 +++-- hyperglass/external/tests/test_rpki.py | 8 +- hyperglass/models/config/devices.py | 6 +- hyperglass/models/config/structured.py | 1 + hyperglass/models/data/bgp_route.py | 2 +- hyperglass/models/parsing/huawei.py | 96 ++++++++++++------- hyperglass/models/parsing/mikrotik.py | 44 ++++++--- hyperglass/models/tests/test_util.py | 6 +- .../plugins/_builtin/bgp_routestr_huawei.py | 12 +-- .../_builtin/mikrotik_garbage_output.py | 13 ++- .../_builtin/mikrotik_normalize_input.py | 18 ++-- hyperglass/state/tests/test_hooks.py | 6 +- hyperglass/util/tests/test_typing.py | 1 + 18 files changed, 184 insertions(+), 109 deletions(-) diff --git a/hyperglass/compat/_sshtunnel.py b/hyperglass/compat/_sshtunnel.py index ca6547f..41dbd26 100644 --- a/hyperglass/compat/_sshtunnel.py +++ b/hyperglass/compat/_sshtunnel.py @@ -272,7 +272,9 @@ class _ForwardServer(socketserver.TCPServer): # Not Threading def handle_error(self, request, client_address): (exc_class, exc, tb) = sys.exc_info() - self.logger.bind(source=request.getsockname()).error("Could not establish connection to remote side of the tunnel") + self.logger.bind(source=request.getsockname()).error( + "Could not establish connection to remote side of the tunnel" + ) self.tunnel_ok.put(False) @property @@ -1023,7 +1025,7 @@ class SSHTunnelForwarder: msg = template.format(self.ssh_host, self.ssh_port, e.args[0]) self.logger.error(msg) return - for (rem, loc) in zip(self._remote_binds, self._local_binds): + for rem, loc in zip(self._remote_binds, self._local_binds): try: self._make_ssh_forward_server(rem, loc) except BaseSSHTunnelForwarderError as e: @@ -1053,7 +1055,7 @@ class SSHTunnelForwarder: bind_addresses = [bind_address] if not is_remote: # Add random port if missing in local bind - for (i, local_bind) in enumerate(bind_addresses): + for i, local_bind in enumerate(bind_addresses): if isinstance(local_bind, tuple) and len(local_bind) == 1: bind_addresses[i] = (local_bind[0], 0) check_addresses(bind_addresses, is_remote) @@ -1400,9 +1402,11 @@ class SSHTunnelForwarder: def __str__(self) -> str: credentials = { "password": self.ssh_password, - "pkeys": [(key.get_name(), hexlify(key.get_fingerprint())) for key in self.ssh_pkeys] - if any(self.ssh_pkeys) - else None, + "pkeys": ( + [(key.get_name(), hexlify(key.get_fingerprint())) for key in self.ssh_pkeys] + if any(self.ssh_pkeys) + else None + ), } _remove_none_values(credentials) template = os.linesep.join( diff --git a/hyperglass/constants.py b/hyperglass/constants.py index ed5a6f4..02723a1 100644 --- a/hyperglass/constants.py +++ b/hyperglass/constants.py @@ -19,7 +19,14 @@ TARGET_FORMAT_SPACE = ("huawei", "huawei_vrpv8") TARGET_JUNIPER_ASPATH = ("juniper", "juniper_junos") -SUPPORTED_STRUCTURED_OUTPUT = ("frr", "juniper", "arista_eos", "huawei", "mikrotik_routeros", "mikrotik_switchos") +SUPPORTED_STRUCTURED_OUTPUT = ( + "frr", + "juniper", + "arista_eos", + "huawei", + "mikrotik_routeros", + "mikrotik_switchos", +) CONFIG_EXTENSIONS = ("py", "yaml", "yml", "json", "toml") diff --git a/hyperglass/defaults/directives/huawei.py b/hyperglass/defaults/directives/huawei.py index b92a93e..4c14638 100644 --- a/hyperglass/defaults/directives/huawei.py +++ b/hyperglass/defaults/directives/huawei.py @@ -177,13 +177,13 @@ HuaweiBGPRouteTable = BuiltinDirective( command="", ), # Regra DENY AS PREFIXO - #RuleWithIPv4( + # RuleWithIPv4( # condition="x.x.x.x/xx", # ge="xx", # le="32", # action="deny", # command="", - #), + # ), RuleWithIPv4( condition="0.0.0.0/0", ge="8", @@ -232,13 +232,13 @@ HuaweiBGPRouteTable = BuiltinDirective( command="", ), # REGRA DENY AS PREFIXO - #RuleWithIPv6( + # RuleWithIPv6( # condition="x.x.x.x/xx", # ge="XX", # le="128", # action="deny", # command="", - #), + # ), RuleWithIPv6( condition="::/0", ge="10", diff --git a/hyperglass/defaults/directives/mikrotik.py b/hyperglass/defaults/directives/mikrotik.py index 4f05c28..56091f6 100644 --- a/hyperglass/defaults/directives/mikrotik.py +++ b/hyperglass/defaults/directives/mikrotik.py @@ -35,7 +35,7 @@ Mikrotik_BGPRoute = BuiltinDirective( # v7 command="routing route print detail without-paging where {target} in dst-address bgp and dst-address !=0.0.0.0/0", # v6 - #command="ip route print detail without-paging where {target} in dst-address bgp and dst-address !=0.0.0.0/0", + # command="ip route print detail without-paging where {target} in dst-address bgp and dst-address !=0.0.0.0/0", ), RuleWithIPv6( condition="::/0", @@ -43,7 +43,7 @@ Mikrotik_BGPRoute = BuiltinDirective( # v7 command="routing route print detail without-paging where {target} in dst-address bgp and dst-address !=::/0", # v6 - #command="ipv6 route print detail without-paging where {target} in dst-address bgp and dst-address !=::/0", + # command="ipv6 route print detail without-paging where {target} in dst-address bgp and dst-address !=::/0", ), ], field=Text(description="IP Address, Prefix, or Hostname"), @@ -66,7 +66,7 @@ Mikrotik_BGPASPath = BuiltinDirective( ) ], field=Text(description="AS Path Regular Expression"), - plugins=["mikrotik_normalize_input","mikrotik_garbage_output", "bgp_routestr_mikrotik"], + plugins=["mikrotik_normalize_input", "mikrotik_garbage_output", "bgp_routestr_mikrotik"], table_output="__hyperglass_mikrotik_bgp_aspath_table__", platforms=PLATFORMS, ) @@ -85,7 +85,7 @@ Mikrotik_BGPCommunity = BuiltinDirective( ) ], field=Text(description="BGP Community String"), - plugins=["mikrotik_normalize_input","mikrotik_garbage_output", "bgp_routestr_mikrotik"], + plugins=["mikrotik_normalize_input", "mikrotik_garbage_output", "bgp_routestr_mikrotik"], table_output="__hyperglass_mikrotik_bgp_community_table__", platforms=PLATFORMS, ) @@ -183,13 +183,13 @@ MikrotikBGPRouteTable = BuiltinDirective( command="", ), # Regra DENY AS PREFIXO - #RuleWithIPv4( + # RuleWithIPv4( # condition="x.x.x.x/x", # ge="xx", # le="32", # action="deny", # command="", - #), + # ), RuleWithIPv4( condition="0.0.0.0/0", ge="8", @@ -198,7 +198,7 @@ MikrotikBGPRouteTable = BuiltinDirective( # v7 command="routing route print detail without-paging where {target} in dst-address bgp and dst-address !=0.0.0.0/0", # v6 - #command="ip route print detail without-paging where {target} in dst-address bgp and dst-address !=0.0.0.0/0", + # command="ip route print detail without-paging where {target} in dst-address bgp and dst-address !=0.0.0.0/0", ), # REGRA DENY SITE LOCAL DEPRECIADO RFC 3879 RuleWithIPv6( @@ -241,20 +241,20 @@ MikrotikBGPRouteTable = BuiltinDirective( command="", ), # REGRA DENY AS PREFIXO - #RuleWithIPv6( + # RuleWithIPv6( # condition="xxxx:xxxx::/xx", # ge="xx", # le="128", # action="deny", # command="", - #), + # ), RuleWithIPv6( condition="::/0", action="permit", # v7 command="routing route print detail without-paging where {target} in dst-address bgp and dst-address !=::/0", # v6 - #command="ipv6 route print detail without-paging where {target} in dst-address bgp and dst-address !=::/0", + # command="ipv6 route print detail without-paging where {target} in dst-address bgp and dst-address !=::/0", ), ], field=Text(description="IP Address, Prefix, or Hostname"), diff --git a/hyperglass/external/_base.py b/hyperglass/external/_base.py index 3954c51..9aedaf5 100644 --- a/hyperglass/external/_base.py +++ b/hyperglass/external/_base.py @@ -208,7 +208,9 @@ class BaseExternal: data, timeout, response_required, - ) = itemgetter(*kwargs.keys())(kwargs) + ) = itemgetter( + *kwargs.keys() + )(kwargs) if method.upper() not in supported_methods: raise self._exception( diff --git a/hyperglass/external/rpki.py b/hyperglass/external/rpki.py index 1c46ffd..0cb37a7 100644 --- a/hyperglass/external/rpki.py +++ b/hyperglass/external/rpki.py @@ -13,25 +13,32 @@ if t.TYPE_CHECKING: from ipaddress import IPv4Address, IPv6Address RPKI_STATE_MAP = { - "Invalid": 0, "invalid": 0, - "Valid": 1, "valid": 1, - "NotFound": 2, "notfound": 2, "not_found": 2, "not-found": 2, - "Unknown": 2, "unknown": 2, - "DEFAULT": 3 + "Invalid": 0, + "invalid": 0, + "Valid": 1, + "valid": 1, + "NotFound": 2, + "notfound": 2, + "not_found": 2, + "not-found": 2, + "Unknown": 2, + "unknown": 2, + "DEFAULT": 3, } RPKI_NAME_MAP = {v: k for k, v in RPKI_STATE_MAP.items()} CACHE_KEY = "hyperglass.external.rpki" + def rpki_state( prefix: t.Union["IPv4Address", "IPv6Address", str], asn: t.Union[int, str], backend: str = "cloudflare", - rpki_server_url: str = "" + rpki_server_url: str = "", ) -> int: """Get RPKI state and map to expected integer.""" _log = log.bind(prefix=prefix, asn=asn) _log.debug("Validating RPKI State") - + cache = use_state("cache") state = 3 ro = f"{prefix!s}@{asn!s}" @@ -68,4 +75,4 @@ def rpki_state( if cached is not None: msg += " [CACHED]" log.debug(msg) - return state \ No newline at end of file + return state diff --git a/hyperglass/external/tests/test_rpki.py b/hyperglass/external/tests/test_rpki.py index 01438e7..d21d505 100644 --- a/hyperglass/external/tests/test_rpki.py +++ b/hyperglass/external/tests/test_rpki.py @@ -19,8 +19,8 @@ def test_rpki(): result = rpki_state(prefix, asn) result_name = RPKI_NAME_MAP.get(result, "No Name") expected_name = RPKI_NAME_MAP.get(expected, "No Name") - assert result == expected, ( - "RPKI State for '{}' via AS{!s} '{}' ({}) instead of '{}' ({})".format( - prefix, asn, result, result_name, expected, expected_name - ) + assert ( + result == expected + ), "RPKI State for '{}' via AS{!s} '{}' ({}) instead of '{}' ({})".format( + prefix, asn, result, result_name, expected, expected_name ) diff --git a/hyperglass/models/config/devices.py b/hyperglass/models/config/devices.py index 99d868b..0e872e2 100644 --- a/hyperglass/models/config/devices.py +++ b/hyperglass/models/config/devices.py @@ -357,9 +357,9 @@ class Devices(MultiModel, model=Device, unique_by="id"): "group": group, "id": device.id, "name": device.name, - "avatar": f"/images/{device.avatar.name}" - if device.avatar is not None - else None, + "avatar": ( + f"/images/{device.avatar.name}" if device.avatar is not None else None + ), "description": device.description, "directives": [d.frontend() for d in device.directives], } diff --git a/hyperglass/models/config/structured.py b/hyperglass/models/config/structured.py index c207d35..420f445 100644 --- a/hyperglass/models/config/structured.py +++ b/hyperglass/models/config/structured.py @@ -24,6 +24,7 @@ class StructuredRpki(HyperglassModel): backend: str = "cloudflare" rpki_server_url: str = "" + class Structured(HyperglassModel): """Control structured data responses.""" diff --git a/hyperglass/models/data/bgp_route.py b/hyperglass/models/data/bgp_route.py index 50e329b..905c1b7 100644 --- a/hyperglass/models/data/bgp_route.py +++ b/hyperglass/models/data/bgp_route.py @@ -123,4 +123,4 @@ class BGPRouteTable(HyperglassModel): if isinstance(other, BGPRouteTable): self.routes = sorted([*self.routes, *other.routes], key=lambda r: r.prefix) self.count = len(self.routes) - return self \ No newline at end of file + return self diff --git a/hyperglass/models/parsing/huawei.py b/hyperglass/models/parsing/huawei.py index a54aa43..5b7671e 100644 --- a/hyperglass/models/parsing/huawei.py +++ b/hyperglass/models/parsing/huawei.py @@ -21,10 +21,11 @@ RPKI_STATE_MAP = { "unverified": 3, } + def remove_prefix(text: str, prefix: str) -> str: """Remove prefix from text if it exists.""" if text.startswith(prefix): - return text[len(prefix):] + return text[len(prefix) :] return text @@ -38,6 +39,7 @@ class HuaweiBase(HyperglassModel, extra="ignore"): class HuaweiPaths(HuaweiBase): """BGP paths information.""" + available: int = 0 best: int = 0 select: int = 0 @@ -117,7 +119,8 @@ class HuaweiRouteEntry(HuaweiBase): def peer_rid(self) -> str: """Get peer router ID.""" return self.from_addr - + + def _extract_paths(line: str) -> HuaweiPaths: """Extract paths information from line like 'Paths: 3 available, 1 best, 1 select, 0 best-external, 0 add-path'.""" paths_data = { @@ -127,7 +130,7 @@ def _extract_paths(line: str) -> HuaweiPaths: "best_external": 0, "add_path": 0, } - + try: values = remove_prefix(line.strip(), "Paths:").strip().split(",") for value in values: @@ -139,23 +142,30 @@ def _extract_paths(line: str) -> HuaweiPaths: paths_data[name] = count except (ValueError, IndexError): log.warning(f"Failed to parse paths line: {line}") - + return HuaweiPaths(**paths_data) def _extract_route_entries(lines: t.List[str]) -> t.List[HuaweiRouteEntry]: """Extract route entries from lines.""" routes = [] - + # Split lines into route blocks using empty lines as separators size = len(lines) idx_list = [idx + 1 for idx, val in enumerate(lines) if val.strip() == ""] - entries = [lines[i:j] for i, j in zip([0] + idx_list, idx_list + ([size] if idx_list[-1] != size else []))] if idx_list else [lines] - + entries = ( + [ + lines[i:j] + for i, j in zip([0] + idx_list, idx_list + ([size] if idx_list[-1] != size else [])) + ] + if idx_list + else [lines] + ) + for route_entry in entries: if not route_entry: continue - + # Initialize route data route_data = { "prefix": "", @@ -181,14 +191,16 @@ def _extract_route_entries(lines: t.List[str]) -> t.List[HuaweiRouteEntry]: "is_selected": False, "preference": 0, } - + for info in route_entry: info = info.strip() if not info: continue - + if info.startswith("BGP routing table entry information of"): - route_data["prefix"] = remove_prefix(info, "BGP routing table entry information of ").rstrip(":") + route_data["prefix"] = remove_prefix( + info, "BGP routing table entry information of " + ).rstrip(":") elif info.startswith("From:"): route_data["from_addr"] = remove_prefix(info, "From: ").split(" (")[0] elif info.startswith("Route Duration:"): @@ -199,13 +211,15 @@ def _extract_route_entries(lines: t.List[str]) -> t.List[HuaweiRouteEntry]: h_match = re.search(r"(\d+)h", duration_str) m_match = re.search(r"(\d+)m", duration_str) s_match = re.search(r"(\d+)s", duration_str) - + days = int(d_match.group(1)) if d_match else 0 hours = int(h_match.group(1)) if h_match else 0 minutes = int(m_match.group(1)) if m_match else 0 seconds = int(s_match.group(1)) if s_match else 0 - - route_data["duration"] = days * 24 * 60 * 60 + hours * 60 * 60 + minutes * 60 + seconds + + route_data["duration"] = ( + days * 24 * 60 * 60 + hours * 60 * 60 + minutes * 60 + seconds + ) except: route_data["duration"] = 0 elif info.startswith("Direct Out-interface:"): @@ -215,23 +229,34 @@ def _extract_route_entries(lines: t.List[str]) -> t.List[HuaweiRouteEntry]: elif info.startswith("Relay IP Nexthop:"): route_data["relay_ip_next_hop"] = remove_prefix(info, "Relay IP Nexthop: ") elif info.startswith("Relay IP Out-Interface:"): - route_data["relay_ip_out_interface"] = remove_prefix(info, "Relay IP Out-Interface: ") + route_data["relay_ip_out_interface"] = remove_prefix( + info, "Relay IP Out-Interface: " + ) elif info.startswith("Qos information :"): route_data["qos"] = remove_prefix(info, "Qos information : ") elif info.startswith("Community:"): communities_str = remove_prefix(info, "Community: ") if communities_str and communities_str.lower() != "none": - communities = [c.strip().replace("<", "").replace(">", "") for c in communities_str.split(", ")] + communities = [ + c.strip().replace("<", "").replace(">", "") + for c in communities_str.split(", ") + ] route_data["communities"] = [c for c in communities if c] elif info.startswith("Large-Community:"): large_communities_str = remove_prefix(info, "Large-Community: ") if large_communities_str and large_communities_str.lower() != "none": - large_communities = [c.strip().replace("<", "").replace(">", "") for c in large_communities_str.split(", ")] + large_communities = [ + c.strip().replace("<", "").replace(">", "") + for c in large_communities_str.split(", ") + ] route_data["large_communities"] = [c for c in large_communities if c] elif info.startswith("Ext-Community:"): ext_communities_str = remove_prefix(info, "Ext-Community: ") if ext_communities_str and ext_communities_str.lower() != "none": - ext_communities = [c.strip().replace("<", "").replace(">", "") for c in ext_communities_str.split(", ")] + ext_communities = [ + c.strip().replace("<", "").replace(">", "") + for c in ext_communities_str.split(", ") + ] route_data["ext_communities"] = [c for c in ext_communities if c] elif info.startswith("AS-path"): values = info.split(",") @@ -240,7 +265,9 @@ def _extract_route_entries(lines: t.List[str]) -> t.List[HuaweiRouteEntry]: if v.startswith("AS-path"): as_path_str = remove_prefix(v, "AS-path ") try: - route_data["as_path"] = [int(a) for a in as_path_str.split() if a.isdigit()] + route_data["as_path"] = [ + int(a) for a in as_path_str.split() if a.isdigit() + ] except ValueError: route_data["as_path"] = [] elif v.startswith("origin"): @@ -275,22 +302,24 @@ def _extract_route_entries(lines: t.List[str]) -> t.List[HuaweiRouteEntry]: route_data["is_best"] = True elif v.strip() == "select": route_data["is_selected"] = True - + # Only add route if we have a valid prefix if route_data["prefix"]: try: route = HuaweiRouteEntry(**route_data) routes.append(route) except Exception as e: - log.warning(f'Failed to create route entry for prefix {{route_data.get("prefix", "unknown")}}: {{e}}') + log.warning( + f'Failed to create route entry for prefix {{route_data.get("prefix", "unknown")}}: {{e}}' + ) continue - + return routes class HuaweiBGPRouteTable(BGPRouteTable): """Custom BGP Route Table for Huawei that bypasses validation.""" - + def __init__(self, **kwargs): """Initialize without calling parent validation.""" # Set attributes directly without validation using object.__setattr__ @@ -312,33 +341,35 @@ class HuaweiBGPTable(HuaweiBase): def parse_text(cls, text: str) -> "HuaweiBGPTable": """Parse Huawei BGP text output.""" _log = log.bind(parser="HuaweiBGPTable") - + instance = cls() - + lines = text.split("\n") - + # Extract general information for line in lines: if "BGP local router ID" in line: instance.local_router_id = remove_prefix(line, "BGP local router ID : ").strip() elif "Local AS number" in line: try: - instance.local_as_number = int(remove_prefix(line, "Local AS number : ").strip()) + instance.local_as_number = int( + remove_prefix(line, "Local AS number : ").strip() + ) except ValueError: instance.local_as_number = 0 elif line.strip().startswith("Paths:"): instance.paths = _extract_paths(line) - + # Extract route entries instance.routes = _extract_route_entries(lines) - + _log.debug(f"Parsed {len(instance.routes)} Huawei routes") return instance def bgp_table(self) -> BGPRouteTable: """Convert to standard BGP table format.""" routes = [] - + for route in self.routes: route_data = { "prefix": route.prefix, @@ -353,7 +384,9 @@ class HuaweiBGPTable(HuaweiBase): "source_as": route.source_as, "source_rid": route.source_rid, "peer_rid": route.peer_rid, - "rpki_state": RPKI_STATE_MAP.get("unknown") if route.is_valid else RPKI_STATE_MAP.get("valid"), + "rpki_state": ( + RPKI_STATE_MAP.get("unknown") if route.is_valid else RPKI_STATE_MAP.get("valid") + ), } routes.append(route_data) @@ -363,4 +396,3 @@ class HuaweiBGPTable(HuaweiBase): routes=routes, winning_weight="high", ) - diff --git a/hyperglass/models/parsing/mikrotik.py b/hyperglass/models/parsing/mikrotik.py index 1275465..3ad75f5 100644 --- a/hyperglass/models/parsing/mikrotik.py +++ b/hyperglass/models/parsing/mikrotik.py @@ -21,22 +21,26 @@ RPKI_STATE_MAP = { "unverified": 3, } + def remove_prefix(text: str, prefix: str) -> str: if text.startswith(prefix): - return text[len(prefix):] + return text[len(prefix) :] return text + # Regex to find key=value pairs. The key can contain dots and hyphens. # The value can be quoted or a single word. TOKEN_RE = re.compile(r'([a-zA-Z0-9_.-]+)=(".*?"|\S+)') # Regex to find flags at the beginning of a line (e.g., "Ab dst-address=...") -FLAGS_RE = re.compile(r'^\s*([DXIAcmsroivmyH\+b]+)\s+') +FLAGS_RE = re.compile(r"^\s*([DXIAcmsroivmyH\+b]+)\s+") + class MikrotikBase(HyperglassModel, extra="ignore"): def __init__(self, **kwargs: t.Any) -> None: super().__init__(**kwargs) + class MikrotikPaths(MikrotikBase): available: int = 0 best: int = 0 @@ -44,8 +48,10 @@ class MikrotikPaths(MikrotikBase): best_external: int = 0 add_path: int = 0 + class MikrotikRouteEntry(MikrotikBase): """MikroTik Route Entry.""" + model_config = ConfigDict(validate_assignment=False) prefix: str @@ -102,6 +108,7 @@ class MikrotikRouteEntry(MikrotikBase): def peer_rid(self) -> str: return self.gateway + def _extract_paths(lines: t.List[str]) -> MikrotikPaths: """Simple count based on lines with dst/dst-address and 'A' flag.""" available = 0 @@ -114,6 +121,7 @@ def _extract_paths(lines: t.List[str]) -> MikrotikPaths: best += 1 return MikrotikPaths(available=available, best=best, select=best) + def _process_kv(route: dict, key: str, val: str): _log = log.bind(parser="MikrotikBGPTable") """Process a key-value pair and update the route dictionary.""" @@ -125,7 +133,7 @@ def _process_kv(route: dict, key: str, val: str): route["prefix"] = val elif key in ("gateway", "nexthop"): # Extract only the IP from gateway (e.g., 168.254.0.2%vlan-2000) - route["gateway"] = val.split('%')[0] + route["gateway"] = val.split("%")[0] elif key == "distance": route["distance"] = int(val) if val.isdigit() else route.get("distance", 0) elif key == "scope": @@ -155,10 +163,11 @@ def _process_kv(route: dict, key: str, val: str): if val and val.lower() != "none": route["ext_communities"] = [c.strip() for c in val.split(",") if c.strip()] elif key == "rpki": - #_log.debug(f"RPKI raw value: {val!r}") + # _log.debug(f"RPKI raw value: {val!r}") clean_val = val.strip().strip('"').lower() route["rpki_state"] = RPKI_STATE_MAP.get(clean_val, 2) + def _extract_route_entries(lines: t.List[str]) -> t.List[MikrotikRouteEntry]: """Extract route entries from a list of lines.""" routes: t.List[MikrotikRouteEntry] = [] @@ -192,6 +201,7 @@ def _extract_route_entries(lines: t.List[str]) -> t.List[MikrotikRouteEntry]: return routes + def _parse_route_block(block: t.List[str]) -> t.Optional[MikrotikRouteEntry]: """Parse a single route block and return a MikrotikRouteEntry.""" if not block: @@ -202,10 +212,21 @@ def _parse_route_block(block: t.List[str]) -> t.Optional[MikrotikRouteEntry]: return None rd = { - "prefix": "", "gateway": "", "distance": 20, "scope": 30, "target_scope": 10, - "as_path": [], "communities": [], "large_communities": [], "ext_communities": [], - "local_preference": 100, "metric": 0, "origin": "", - "is_active": False, "is_best": False, "is_valid": False, + "prefix": "", + "gateway": "", + "distance": 20, + "scope": 30, + "target_scope": 10, + "as_path": [], + "communities": [], + "large_communities": [], + "ext_communities": [], + "local_preference": 100, + "metric": 0, + "origin": "", + "is_active": False, + "is_best": False, + "is_valid": False, "rpki_state": RPKI_STATE_MAP.get("unknown", 2), } @@ -229,12 +250,14 @@ def _parse_route_block(block: t.List[str]) -> t.Optional[MikrotikRouteEntry]: class MikrotikBGPRouteTable(BGPRouteTable): """Bypass validation to align with Huawei parser.""" + def __init__(self, **kwargs): object.__setattr__(self, "vrf", kwargs.get("vrf", "default")) object.__setattr__(self, "count", kwargs.get("count", 0)) object.__setattr__(self, "routes", kwargs.get("routes", [])) object.__setattr__(self, "winning_weight", kwargs.get("winning_weight", "low")) + class MikrotikBGPTable(MikrotikBase): """MikroTik BGP Table in canonical format.""" @@ -253,10 +276,7 @@ class MikrotikBGPTable(MikrotikBase): return inst # Filter out command echoes and header lines - lines = [ - ln for ln in lines - if not ln.strip().startswith((">", "Flags:", "[", "#")) - ] + lines = [ln for ln in lines if not ln.strip().startswith((">", "Flags:", "[", "#"))] inst.paths = _extract_paths(lines) inst.routes = _extract_route_entries(lines) diff --git a/hyperglass/models/tests/test_util.py b/hyperglass/models/tests/test_util.py index f990d57..bb133d6 100644 --- a/hyperglass/models/tests/test_util.py +++ b/hyperglass/models/tests/test_util.py @@ -19,9 +19,9 @@ def test_check_legacy_fields(): test1_expected.keys() ), "legacy field not replaced" - assert set(check_legacy_fields(model="Device", data=test2).keys()) == set(test2.keys()), ( - "new field not left unmodified" - ) + assert set(check_legacy_fields(model="Device", data=test2).keys()) == set( + test2.keys() + ), "new field not left unmodified" with pytest.raises(ValueError): check_legacy_fields(model="Device", data=test3) diff --git a/hyperglass/plugins/_builtin/bgp_routestr_huawei.py b/hyperglass/plugins/_builtin/bgp_routestr_huawei.py index c824c0d..bac49de 100644 --- a/hyperglass/plugins/_builtin/bgp_routestr_huawei.py +++ b/hyperglass/plugins/_builtin/bgp_routestr_huawei.py @@ -29,21 +29,21 @@ def parse_huawei(output: Sequence[str]) -> "OutputDataModel": result = None _log = log.bind(plugin=BGPSTRRoutePluginHuawei.__name__) - + # Combine all output into a single string combined_output = "\n".join(output) _log.debug(f"Combined output length: {len(combined_output)}") - + # Debug: log the first few lines to understand the format - lines = combined_output.split('\n')[:10] + lines = combined_output.split("\n")[:10] _log.debug(f"First 10 lines: {lines}") - + for response in output: try: # Parse the text output using the Huawei parser validated = HuaweiBGPTable.parse_text(response) bgp_table = validated.bgp_table() - + _log.debug(f"Successfully parsed {len(validated.routes)} routes") if result is None: @@ -76,7 +76,7 @@ class BGPSTRRoutePluginHuawei(OutputPlugin): """Parse Huawei response if data is a string (and is therefore unparsed).""" _log = log.bind(plugin=self.__class__.__name__) _log.debug("Processing Huawei output with structured parser") - + should_process = all( ( isinstance(output, (list, tuple)), diff --git a/hyperglass/plugins/_builtin/mikrotik_garbage_output.py b/hyperglass/plugins/_builtin/mikrotik_garbage_output.py index 1e4f11d..fb13f90 100644 --- a/hyperglass/plugins/_builtin/mikrotik_garbage_output.py +++ b/hyperglass/plugins/_builtin/mikrotik_garbage_output.py @@ -38,13 +38,13 @@ class MikrotikGarbageOutput(OutputPlugin): Clean raw output from a MikroTik device. This plugin removes command echoes, prompts, flag legends, and interactive help text. """ - + # O 'output' é uma tupla de strings, onde cada string é a saída de um comando. # Vamos processar cada uma delas. cleaned_outputs = [] for raw_output in output: - + # Se a saída já estiver vazia, não há nada a fazer. if not raw_output or not raw_output.strip(): cleaned_outputs.append("") @@ -52,7 +52,7 @@ class MikrotikGarbageOutput(OutputPlugin): # 1. Dividir a saída em linhas para processamento individual. lines = raw_output.splitlines() - + # 2. Filtrar as linhas de "lixo" conhecidas. filtered_lines = [] in_flags_section = False @@ -62,7 +62,7 @@ class MikrotikGarbageOutput(OutputPlugin): # Ignorar prompts e ecos de comando if stripped_line.startswith("@") and stripped_line.endswith("] >"): continue - + # Ignorar a linha de ajuda interativa if "[Q quit|D dump|C-z pause]" in stripped_line: continue @@ -70,7 +70,7 @@ class MikrotikGarbageOutput(OutputPlugin): # Iniciar a detecção da seção de Flags if stripped_line.startswith("Flags:"): in_flags_section = True - continue # Pula a própria linha "Flags:" + continue # Pula a própria linha "Flags:" # Se estivermos na seção de flags, verificar se a linha ainda é parte dela. # Uma linha de dados de rota real geralmente começa com flags (ex: "Ab") ou é indentada. @@ -84,7 +84,7 @@ class MikrotikGarbageOutput(OutputPlugin): if "=" in stripped_line: in_flags_section = False else: - continue # Pula as linhas da legenda de flags + continue # Pula as linhas da legenda de flags filtered_lines.append(line) @@ -94,4 +94,3 @@ class MikrotikGarbageOutput(OutputPlugin): log.debug(f"MikrotikGarbageOutput cleaned {len(output)} output blocks.") return tuple(cleaned_outputs) - diff --git a/hyperglass/plugins/_builtin/mikrotik_normalize_input.py b/hyperglass/plugins/_builtin/mikrotik_normalize_input.py index a208c53..972b7d2 100644 --- a/hyperglass/plugins/_builtin/mikrotik_normalize_input.py +++ b/hyperglass/plugins/_builtin/mikrotik_normalize_input.py @@ -7,6 +7,7 @@ from pydantic import PrivateAttr # Project from hyperglass.log import log + # REMOVIDA a importação direta de Query para evitar o erro circular # from hyperglass.models.api.query import Query @@ -20,6 +21,7 @@ class MikrotikTargetNormalizerInput(InputPlugin): This ensures that queries for different IPs within the same subnet resolve to the same cache key. """ + _hyperglass_builtin: bool = PrivateAttr(False) name: str = "mikrotik_normalizer" platforms: t.Sequence[str] = ("mikrotik_routeros", "mikrotik_switchos", "mikrotik") @@ -28,27 +30,27 @@ class MikrotikTargetNormalizerInput(InputPlugin): # INÍCIO DA MODIFICAÇÃO: Usar 't.Any' em vez de 'Query' # ############################################################# def validate(self, query: t.Any) -> InputPluginValidationReturn: - # ############################################################# - # FIM DA MODIFICAÇÃO - # ############################################################# + # ############################################################# + # FIM DA MODIFICAÇÃO + # ############################################################# """ Takes the query object and modifies the target if it's a BGP Route query. """ - + # Acessamos os atributos normalmente, pois sabemos que o objeto 'query' os terá. if query.query_type != "bgp_route": return True, None try: target_ip = ip_address(query.target) - + if target_ip.version == 4: prefix_len = 24 else: prefix_len = 48 - + network = ip_network(f"{str(target_ip)}/{prefix_len}", strict=False) - + normalized_target = str(network.with_prefixlen) if query.target != normalized_target: @@ -57,7 +59,7 @@ class MikrotikTargetNormalizerInput(InputPlugin): f"'{normalized_target}' for MikroTik cache key." ) query.target = normalized_target - + except ValueError: pass diff --git a/hyperglass/state/tests/test_hooks.py b/hyperglass/state/tests/test_hooks.py index e82f1a4..aaa9d5c 100644 --- a/hyperglass/state/tests/test_hooks.py +++ b/hyperglass/state/tests/test_hooks.py @@ -96,7 +96,7 @@ def test_use_state_caching(state): instance = use_state(attr) if i == 0: first = instance - assert isinstance(instance, model), ( - f"{instance!r} is not an instance of '{model.__name__}'" - ) + assert isinstance( + instance, model + ), f"{instance!r} is not an instance of '{model.__name__}'" assert instance == first, f"{instance!r} is not equal to {first!r}" diff --git a/hyperglass/util/tests/test_typing.py b/hyperglass/util/tests/test_typing.py index 66fe8ca..44a10f6 100644 --- a/hyperglass/util/tests/test_typing.py +++ b/hyperglass/util/tests/test_typing.py @@ -1,4 +1,5 @@ """Test typing utilities.""" + # flake8: noqa # Standard Library