1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-01-17 08:48:05 +00:00

format and lint

This commit is contained in:
Jason Hall 2025-06-14 21:44:20 -04:00
parent 7b06930e1b
commit cf137bd7e8
No known key found for this signature in database
GPG key ID: 8F4072E405CC6117
23 changed files with 60 additions and 50 deletions

View file

@ -1,4 +1,5 @@
"""hyperglass API."""
# Standard Library
import logging

View file

@ -1,4 +1,5 @@
"""Helper functions for CLI message printing."""
# Standard Library
import typing as t

View file

@ -33,7 +33,7 @@ def run():
def _version(
version: t.Optional[bool] = typer.Option(
None, "--version", help="hyperglass version", callback=_version
)
),
) -> None:
"""hyperglass"""
pass
@ -77,7 +77,6 @@ def _build_ui(timeout: int = typer.Option(180, help="Timeout in seconds")) -> No
with echo._console.status(
f"Starting new UI build with a {timeout} second timeout...", spinner="aesthetic"
):
_build_ui(timeout=120)
@ -140,7 +139,7 @@ def _clear_cache():
@cli.command(name="devices")
def _devices(
search: t.Optional[str] = typer.Argument(None, help="Device ID or Name Search Pattern")
search: t.Optional[str] = typer.Argument(None, help="Device ID or Name Search Pattern"),
):
"""Show all configured devices"""
# Third Party
@ -189,7 +188,7 @@ def _devices(
@cli.command(name="directives")
def _directives(
search: t.Optional[str] = typer.Argument(None, help="Directive ID or Name Search Pattern")
search: t.Optional[str] = typer.Argument(None, help="Directive ID or Name Search Pattern"),
):
"""Show all configured devices"""
# Third Party
@ -280,7 +279,7 @@ def _plugins(
def _params(
path: t.Optional[str] = typer.Argument(
None, help="Parameter Object Path, for example 'messages.no_input'"
)
),
):
"""Show configuration parameters"""
# Standard Library
@ -312,7 +311,7 @@ def _params(
)
raise typer.Exit(0)
except AttributeError:
echo.error(f"{'params.'+path!r} does not exist")
echo.error(f"{'params.' + path!r} does not exist")
raise typer.Exit(1)
panel = Inspect(

View file

@ -2,7 +2,7 @@
# Standard Library
import json as _json
from typing import Any, Dict, List, Union, Literal, Optional, Set
from typing import Any, Set, Dict, List, Union, Literal, Optional
# Third Party
from pydantic import ValidationError
@ -72,7 +72,7 @@ class HyperglassError(Exception):
for err in errors:
loc = "".join(str(loc) for loc in err["loc"])
errs += (f'Field: {loc}\n Error: {err["msg"]}\n',)
errs += (f"Field: {loc}\n Error: {err['msg']}\n",)
return "\n".join(errs)

View file

@ -94,7 +94,7 @@ class Construct:
for key in [k for k in keys if k != "target" and k != "mask"]:
if key not in attrs:
raise ConfigError(
("Command '{c}' has attribute '{k}', " "which is missing from device '{d}'"),
("Command '{c}' has attribute '{k}', which is missing from device '{d}'"),
level="danger",
c=self.directive.name,
k=key,
@ -224,4 +224,4 @@ class Formatter:
def _bird_bgp_community(self, target: str) -> str:
"""Convert from standard community format to BIRD format."""
parts = target.split(":")
return f'({",".join(parts)})'
return f"({','.join(parts)})"

View file

@ -44,9 +44,9 @@ class SSHConnection(Connection):
if proxy.credential._method == "encrypted_key":
# If the key is encrypted, use the password field as the
# private key password.
tunnel_kwargs[
"ssh_private_key_password"
] = proxy.credential.password.get_secret_value()
tunnel_kwargs["ssh_private_key_password"] = (
proxy.credential.password.get_secret_value()
)
try:
return open_tunnel(proxy._target, proxy.port, **tunnel_kwargs)

View file

@ -212,7 +212,7 @@ class BaseExternal:
if method.upper() not in supported_methods:
raise self._exception(
f'Method must be one of {", ".join(supported_methods)}. ' f"Got: {str(method)}"
f"Method must be one of {', '.join(supported_methods)}. Got: {str(method)}"
)
endpoint = "/".join(
@ -284,7 +284,7 @@ class BaseExternal:
status = httpx.codes(response.status_code)
error = self._parse_response(response)
raise self._exception(
f'{status.name.replace("_", " ")}: {error}', level="danger"
f"{status.name.replace('_', ' ')}: {error}", level="danger"
) from None
except httpx.HTTPError as http_err:
@ -340,7 +340,7 @@ class BaseExternal:
status = httpx.codes(response.status_code)
error = self._parse_response(response)
raise self._exception(
f'{status.name.replace("_", " ")}: {error}', level="danger"
f"{status.name.replace('_', ' ')}: {error}", level="danger"
) from None
except httpx.HTTPError as http_err:

View file

@ -35,7 +35,7 @@ def default_ip_targets(*targets: str) -> t.Tuple[TargetData, t.Tuple[str, ...]]:
default_data = {}
query = ()
for target in targets:
detail: TargetDetail = {k: "None" for k in DEFAULT_KEYS}
detail: TargetDetail = dict.fromkeys(DEFAULT_KEYS, "None")
try:
valid: t.Union[IPv4Address, IPv6Address] = ip_address(target)
@ -139,7 +139,7 @@ async def network_info(*targets: str) -> TargetData:
cache = use_state("cache")
# Set default data structure.
query_data = {t: {k: "" for k in DEFAULT_KEYS} for t in query_targets}
query_data = {t: dict.fromkeys(DEFAULT_KEYS, "") for t in query_targets}
# Get all cached bgp.tools data.
cached = cache.get_map(CACHE_KEY) or {}

View file

@ -1,4 +1,5 @@
"""Test external http client."""
# Standard Library
import asyncio

View file

@ -16,7 +16,6 @@ WHOIS_OUTPUT = """AS | IP | BGP Prefix | CC | Registry | Allocated | AS
# Ignore asyncio deprecation warning about loop
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_network_info():
checks = (
("192.0.2.1", {"asn": "None", "rir": "Private Address"}),
("127.0.0.1", {"asn": "None", "rir": "Loopback Address"}),

View file

@ -1,4 +1,5 @@
"""Test RPKI data fetching."""
# Third Party
import pytest
@ -18,8 +19,8 @@ def test_rpki():
result = rpki_state(prefix, asn)
result_name = RPKI_NAME_MAP.get(result, "No Name")
expected_name = RPKI_NAME_MAP.get(expected, "No Name")
assert (
result == expected
), "RPKI State for '{}' via AS{!s} '{}' ({}) instead of '{}' ({})".format(
prefix, asn, result, result_name, expected, expected_name
assert result == expected, (
"RPKI State for '{}' via AS{!s} '{}' ({}) instead of '{}' ({})".format(
prefix, asn, result, result_name, expected, expected_name
)
)

View file

@ -1,4 +1,5 @@
"""Query & Response Validation Models."""
# Local
from .query import Query
from .response import (

View file

@ -69,7 +69,7 @@ class Query(BaseModel):
self.validate_query_target()
except InputValidationError as err:
raise InputInvalid(**err.kwargs) from err
self.query_target = self.transform_query_target()
def summary(self) -> SimpleQuery:

View file

@ -1,6 +1,5 @@
"""Validation model for cache config."""
# Local
from ..main import HyperglassModel

View file

@ -6,7 +6,7 @@ import typing as t
from ipaddress import ip_network
# Third Party
from pydantic import field_validator, ValidationInfo
from pydantic import ValidationInfo, field_validator
# Project
from hyperglass.state import use_state

View file

@ -50,7 +50,7 @@ class FRRPath(_FRRBase):
aspath: t.List[int]
aggregator_as: int = 0
aggregator_id: str = ""
loc_prf: int = 100 # 100 is the default value for local preference
loc_prf: int = 100 # 100 is the default value for local preference
metric: int = 0
med: int = 0
weight: int = 0
@ -66,7 +66,7 @@ class FRRPath(_FRRBase):
"""Extract meaningful data from FRR response."""
new = values.copy()
new["aspath"] = values["aspath"]["segments"][0]["list"]
community = values.get("community", {'list': []})
community = values.get("community", {"list": []})
new["community"] = community["list"]
new["lastUpdate"] = values["lastUpdate"]["epoch"]
bestpath = values.get("bestpath", {})

View file

@ -19,9 +19,9 @@ def test_check_legacy_fields():
test1_expected.keys()
), "legacy field not replaced"
assert set(check_legacy_fields(model="Device", data=test2).keys()) == set(
test2.keys()
), "new field not left unmodified"
assert set(check_legacy_fields(model="Device", data=test2).keys()) == set(test2.keys()), (
"new field not left unmodified"
)
with pytest.raises(ValueError):
check_legacy_fields(model="Device", data=test3)

View file

@ -1,12 +1,12 @@
"""Built-in hyperglass plugins."""
# Local
from .bgp_route_frr import BGPRoutePluginFrr
from .remove_command import RemoveCommand
from .bgp_route_arista import BGPRoutePluginArista
from .bgp_route_frr import BGPRoutePluginFrr
from .bgp_route_huawei import BGPRoutePluginHuawei
from .bgp_route_juniper import BGPRoutePluginJuniper
from .mikrotik_garbage_output import MikrotikGarbageOutput
from .bgp_route_huawei import BGPRoutePluginHuawei
__all__ = (
"BGPRoutePluginArista",

View file

@ -62,14 +62,13 @@ def parse_frr(output: t.Sequence[str]) -> "OutputDataModel":
return result
class BGPRoutePluginFrr(OutputPlugin):
"""Coerce a FRR route table in JSON format to a standard BGP Table structure."""
_hyperglass_builtin: bool = PrivateAttr(True)
platforms: t.Sequence[str] = ("frr",)
directives: t.Sequence[str] = (
"__hyperglass_frr_bgp_route_table__",
)
directives: t.Sequence[str] = ("__hyperglass_frr_bgp_route_table__",)
def process(self, *, output: "OutputType", query: "Query") -> "OutputType":
"""Parse FRR response if data is a string (and is therefore unparsed)."""

View file

@ -1,36 +1,41 @@
from ipaddress import ip_network
from .._input import InputPlugin
# Standard Library
import typing as t
from ipaddress import ip_network
# Third Party
from pydantic import PrivateAttr
# Local
from .._input import InputPlugin
if t.TYPE_CHECKING:
# Project
from hyperglass.models.api.query import Query
InputPluginTransformReturn = t.Union[t.Sequence[str], str]
class BGPRoutePluginHuawei(InputPlugin):
_hyperglass_builtin: bool = PrivateAttr(True)
platforms: t.Sequence[str] =("huawei", "huawei_vrpv8",)
platforms: t.Sequence[str] = (
"huawei",
"huawei_vrpv8",
)
directives: t.Sequence[str] = ("__hyperglass_huawei_bgp_route__",)
"""
Huawei BGP Route Input Plugin
This plugin transforms a query target into a network address and prefix length
ex.: 192.0.2.0/24 -> 192.0.2.0 24
ex.: 2001:db8::/32 -> 2001:db8:: 32
"""
def transform(self, query: "Query") -> InputPluginTransformReturn:
(target := query.query_target)
target = query.query_target
if not target or not isinstance(target, list) or len(target) == 0:
return None
target = target[0].strip()
# Check for the / in the query target
@ -38,5 +43,5 @@ class BGPRoutePluginHuawei(InputPlugin):
return target
target_network = ip_network(target)
return f"{target_network.network_address!s} {target_network.prefixlen!s}"
return f"{target_network.network_address!s} {target_network.prefixlen!s}"

View file

@ -1,4 +1,5 @@
"""Test BGP Community validation."""
# Standard Library
import typing as t

View file

@ -96,7 +96,7 @@ def test_use_state_caching(state):
instance = use_state(attr)
if i == 0:
first = instance
assert isinstance(
instance, model
), f"{instance!r} is not an instance of '{model.__name__}'"
assert isinstance(instance, model), (
f"{instance!r} is not an instance of '{model.__name__}'"
)
assert instance == first, f"{instance!r} is not equal to {first!r}"

View file

@ -0,0 +1,3 @@
onlyBuiltDependencies:
- '@biomejs/biome'
- esbuild