start vrf data model; move models to dedicated module

This commit is contained in:
checktheroads 2019-09-11 01:35:31 -07:00
parent 094cfbc82b
commit ad3e368246
15 changed files with 916 additions and 807 deletions

View file

@ -4,7 +4,6 @@ default values if undefined.
"""
# Standard Library Imports
import operator
from pathlib import Path
# Third Party Imports
@ -14,7 +13,15 @@ from logzero import logger
from pydantic import ValidationError
# Project Imports
from hyperglass.configuration import models
from hyperglass.configuration.models import (
params as _params,
commands as _commands,
routers as _routers,
proxies as _proxies,
networks as _networks,
vrfs as _vrfs,
credentials as _credentials,
)
from hyperglass.exceptions import ConfigError, ConfigInvalid, ConfigMissing
# Project Directories
@ -59,19 +66,19 @@ except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error:
# Map imported user config files to expected schema:
try:
if user_config:
params = models.Params(**user_config)
params = _params.Params(**user_config)
elif not user_config:
params = models.Params()
params = _params.Params()
if user_commands:
commands = models.Commands.import_params(user_commands)
commands = _commands.Commands.import_params(user_commands)
elif not user_commands:
commands = models.Commands()
commands = _commands.Commands()
devices = models.Routers.import_params(user_devices["router"])
credentials = models.Credentials.import_params(user_devices["credential"])
proxies = models.Proxies.import_params(user_devices["proxy"])
_networks = models.Networks.import_params(user_devices["network"])
vrfs = models.Vrfs.import_params(user_devices.get("vrf"))
devices = _routers.Routers.import_params(user_devices["router"])
credentials = _credentials.Credentials.import_params(user_devices["credential"])
proxies = _proxies.Proxies.import_params(user_devices["proxy"])
imported_networks = _networks.Networks.import_params(user_devices["network"])
vrfs = _vrfs.Vrfs.import_params(user_devices.get("vrf"))
except ValidationError as validation_errors:
@ -109,7 +116,7 @@ logzero_config = logzero.setup_default_logger(
class Networks:
def __init__(self):
self.routers = devices.routers
self.networks = _networks.networks
self.networks = imported_networks.networks
def networks_verbose(self):
locations_dict = {}
@ -141,8 +148,8 @@ class Networks:
def networks_display(self):
locations_dict = {}
for (router, router_params) in devices.routers.items():
for (netname, net_params) in _networks.networks.items():
for (router, router_params) in self.routers.items():
for (netname, net_params) in self.networks.items():
if router_params["network"] == netname:
net_display = net_params["display_name"]
if net_display in locations_dict:

View file

@ -1,793 +0,0 @@
"""
Defines models for all config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Standard Library Imports
import re
from ipaddress import IPv4Address
from ipaddress import IPv6Address
from math import ceil
from typing import List
from typing import Union
# Third Party Imports
from pydantic import BaseSettings
from pydantic import IPvAnyAddress
from pydantic import IPvAnyNetwork
from pydantic import SecretStr
from pydantic import constr
from pydantic import validator
from pydantic.color import Color
# Project Imports
from hyperglass.constants import Supported
from hyperglass.exceptions import ConfigInvalid
from hyperglass.exceptions import UnsupportedDevice
def clean_name(_name):
"""
Converts any "desirable" seperators to underscore, then
removes all characters that are unsupported in Python class
variable names. Also removes leading numbers underscores.
"""
_replaced = re.sub(r"[\-|\.|\@|\~|\:\/|\s]", "_", _name)
_scrubbed = "".join(re.findall(r"([a-zA-Z]\w+|\_+)", _replaced))
return _scrubbed.lower()
class Vrf(BaseSettings):
"""Model for per VRF/afi config in devices.yaml"""
display_name: str
name: str
afis: List[str]
class Vrfs(BaseSettings):
"""Base model for vrfs class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from VRF names, dynamically sets attributes for
the Vrfs class.
"""
vrfs: Vrf = {
"default": {
"display_name": "Default",
"name": "default",
"afis": ["ipv4, ipv6"],
}
}
names: List[str] = ["default"]
_all: List[str] = ["default"]
for (vrf_key, params) in input_params.items():
vrf = clean_name(vrf_key)
vrf_params = Vrf(**params)
vrfs.update({vrf: vrf_params.dict()})
names.append(params.get("name"))
_all.append(vrf_key)
for (vrf_key, params) in vrfs.items():
setattr(Vrfs, vrf_key, params)
names: List[str] = list(set(names))
_all: List[str] = list(set(_all))
Vrfs.vrfs = vrfs
Vrfs.names = names
Vrfs._all = _all
return Vrfs()
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True
class Router(BaseSettings):
"""Model for per-router config in devices.yaml."""
address: Union[IPvAnyAddress, str]
network: str
src_addr_ipv4: IPv4Address
src_addr_ipv6: IPv6Address
credential: str
proxy: Union[str, None] = None
location: str
display_name: str
port: int
nos: str
commands: Union[str, None] = None
vrfs: List[str] = ["default"]
@validator("nos")
def supported_nos(cls, v): # noqa: N805
"""Validates that passed nos string is supported by hyperglass"""
if not Supported.is_supported(v):
raise UnsupportedDevice(f'"{v}" device type is not supported.')
return v
@validator("credential", "proxy", "location")
def clean_name(cls, v): # noqa: N805
"""Remove or replace unsupported characters from field values"""
return clean_name(v)
# @validator("vrfs")
# def validate_vrfs(cls, v):
# configured_vrfs = Vrfs().names
# if v not in configured_vrfs:
# raise ConfigInvalid(
# field=v, error_msg=f"VRF must be in {str(configured_vrfs)}"
# )
# @validator("afis")
# def validate_afi(cls, v): # noqa: N805
# """Validates that configured AFI is supported"""
# supported_afis = ("ipv4", "ipv6", "vpnv4", "vpnv6")
# if v.lower() not in supported_afis:
# raise ConfigInvalid(
# field=v, error_msg=f"AFI must be one of: {str(supported_afis)}"
# )
# return v.lower()
@validator("commands", always=True)
def validate_commands(cls, v, values): # noqa: N805
if v is None:
v = values["nos"]
return v
class Routers(BaseSettings):
"""Base model for devices class."""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the Routers class.
"""
routers = {}
hostnames = []
vrfs = set()
for (devname, params) in input_params.items():
dev = clean_name(devname)
router_params = Router(**params)
setattr(Routers, dev, router_params)
routers.update({dev: router_params.dict()})
hostnames.append(dev)
for vrf in router_params.dict()["vrfs"]:
vrfs.add(vrf)
Routers.routers = routers
Routers.hostnames = hostnames
Routers.vrfs = list(vrfs)
return Routers()
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True
class Network(BaseSettings):
"""Model for per-network/asn config in devices.yaml"""
display_name: str
class Networks(BaseSettings):
"""Base model for networks class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the credentials class.
"""
obj = Networks()
networks = {}
for (netname, params) in input_params.items():
netname = clean_name(netname)
setattr(Networks, netname, Network(**params))
networks.update({netname: Network(**params).dict()})
Networks.networks = networks
return obj
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True
class Credential(BaseSettings):
"""Model for per-credential config in devices.yaml"""
username: str
password: SecretStr
class Credentials(BaseSettings):
"""Base model for credentials class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the credentials class.
"""
obj = Credentials()
for (credname, params) in input_params.items():
cred = clean_name(credname)
setattr(Credentials, cred, Credential(**params))
return obj
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True
class Proxy(BaseSettings):
"""Model for per-proxy config in devices.yaml"""
address: Union[IPvAnyAddress, str]
port: int = 22
username: str
password: SecretStr
nos: str
ssh_command: str
@validator("nos")
def supported_nos(cls, v): # noqa: N805
"""
Validates that passed nos string is supported by hyperglass.
"""
if not v == "linux_ssh":
raise UnsupportedDevice(f'"{v}" device type is not supported.')
return v
class Proxies(BaseSettings):
"""Base model for proxies class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the proxies class.
"""
obj = Proxies()
for (devname, params) in input_params.items():
dev = clean_name(devname)
setattr(Proxies, dev, Proxy(**params))
return obj
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True
class General(BaseSettings):
"""Class model for params.general"""
debug: bool = False
primary_asn: str = "65001"
org_name: str = "The Company"
google_analytics: Union[str, None] = None
redis_host: Union[str, IPvAnyNetwork] = "localhost"
redis_port: int = 6379
requires_ipv6_cidr: List[str] = ["cisco_ios", "cisco_nxos"]
request_timeout: int = 15
class Branding(BaseSettings):
"""Class model for params.branding"""
site_name: str = "hyperglass"
class Colors(BaseSettings):
"""Class model for params.colors"""
primary: Color = "#40798c"
secondary: Color = "#330036"
danger: Color = "#a21024"
warning: Color = "#eec643"
light: Color = "#fbfffe"
dark: Color = "#383541"
background: Color = "#fbfffe"
class Credit(BaseSettings):
"""Class model for params.branding.credit"""
enable: bool = True
class Font(BaseSettings):
"""Class model for params.branding.font"""
primary: str = "Nunito"
mono: str = "Fira Code"
class HelpMenu(BaseSettings):
"""Class model for params.branding.help_menu"""
enable: bool = True
class Logo(BaseSettings):
"""Class model for params.branding.logo"""
path: str = "ui/images/hyperglass-dark.png"
width: int = 384
favicons: str = "ui/images/favicons/"
class PeeringDb(BaseSettings):
"""Class model for params.branding.peering_db"""
enable: bool = True
class Terms(BaseSettings):
"""Class model for params.branding.terms"""
enable: bool = True
class Text(BaseSettings):
"""Class model for params.branding.text"""
title_mode: str = "logo_only"
title: str = "hyperglass"
subtitle: str = "AS{primary_asn}"
query_location: str = "Location"
query_type: str = "Query"
query_target: str = "Target"
terms: str = "Terms"
info: str = "Help"
peeringdb = "PeeringDB"
bgp_route: str = "BGP Route"
bgp_community: str = "BGP Community"
bgp_aspath: str = "BGP AS Path"
ping: str = "Ping"
traceroute: str = "Traceroute"
vrf: str = "VRF"
class Error404(BaseSettings):
"""Class model for 404 Error Page"""
title: str = "Error"
subtitle: str = "{uri} isn't a thing"
button: str = "Home"
class Error500(BaseSettings):
"""Class model for 500 Error Page"""
title: str = "Error"
subtitle: str = "Something Went Wrong"
button: str = "Home"
class Error504(BaseSettings):
"""Class model for 504 Error Element"""
message: str = "Unable to reach {target}"
error404: Error404 = Error404()
error500: Error500 = Error500()
error504: Error504 = Error504()
@validator("title_mode")
def check_title_mode(cls, v):
"""Verifies title_mode matches supported values"""
supported_modes = ["logo_only", "text_only", "logo_title", "all"]
if v not in supported_modes:
raise ValueError("title_mode must be one of {}".format(supported_modes))
return v
colors: Colors = Colors()
credit: Credit = Credit()
font: Font = Font()
help_menu: HelpMenu = HelpMenu()
logo: Logo = Logo()
peering_db: PeeringDb = PeeringDb()
terms: Terms = Terms()
text: Text = Text()
class Messages(BaseSettings):
"""Class model for params.messages"""
no_query_type: str = "A query type must be specified."
no_location: str = "A location must be selected."
no_input: str = "{field} must be specified."
blacklist: str = "{target} a member of {blacklisted_net}, which is not allowed."
max_prefix: str = (
"Prefix length must be shorter than /{max_length}. {target} is too specific."
)
requires_ipv6_cidr: str = (
"{device_name} requires IPv6 BGP lookups to be in CIDR notation."
)
invalid_input: str = "{target} is not a valid {query_type} target."
invalid_field: str = "{input} is an invalid {field}."
general: str = "Something went wrong."
directed_cidr: str = "{query_type} queries can not be in CIDR format."
request_timeout: str = "Request timed out."
connection_error: str = "Error connecting to {device_name}: {error}"
authentication_error: str = "Authentication error occurred."
noresponse_error: str = "No response."
vrf_not_associated: str = "{vrf} is not associated with {device_name}."
no_matching_vrfs: str = "No VRFs Match"
class Features(BaseSettings):
"""Class model for params.features"""
class Vrf(BaseSettings):
"""Class model for params.features.vrf"""
enable: bool = False
class BgpRoute(BaseSettings):
"""Class model for params.features.bgp_route"""
enable: bool = True
class BgpCommunity(BaseSettings):
"""Class model for params.features.bgp_community"""
enable: bool = True
class Regex(BaseSettings):
"""Class model for params.features.bgp_community.regex"""
decimal: str = r"^[0-9]{1,10}$"
extended_as: str = r"^([0-9]{0,5})\:([0-9]{1,5})$"
large: str = r"^([0-9]{1,10})\:([0-9]{1,10})\:[0-9]{1,10}$"
regex: Regex = Regex()
class BgpAsPath(BaseSettings):
"""Class model for params.features.bgp_aspath"""
enable: bool = True
class Regex(BaseSettings):
"""Class model for params.bgp_aspath.regex"""
mode: constr(regex="asplain|asdot") = "asplain"
asplain: str = r"^(\^|^\_)(\d+\_|\d+\$|\d+\(\_\.\+\_\))+$"
asdot: str = (
r"^(\^|^\_)((\d+\.\d+)\_|(\d+\.\d+)\$|(\d+\.\d+)\(\_\.\+\_\))+$"
)
regex: Regex = Regex()
class Ping(BaseSettings):
"""Class model for params.features.ping"""
enable: bool = True
class Traceroute(BaseSettings):
"""Class model for params.features.traceroute"""
enable: bool = True
class Blacklist(BaseSettings):
"""Class model for params.features.blacklist"""
enable: bool = True
networks: List[IPvAnyNetwork] = [
"198.18.0.0/15",
"100.64.0.0/10",
"2001:db8::/32",
"10.0.0.0/8",
"192.168.0.0/16",
"172.16.0.0/12",
]
class Cache(BaseSettings):
"""Class model for params.features.cache"""
redis_id: int = 0
timeout: int = 120
show_text: bool = True
text: str = "Results will be cached for {timeout} minutes.".format(
timeout=ceil(timeout / 60)
)
class MaxPrefix(BaseSettings):
"""Class model for params.features.max_prefix"""
enable: bool = False
ipv4: int = 24
ipv6: int = 64
message: str = (
"Prefix length must be smaller than /{m}. <b>{i}</b> is too specific."
)
class RateLimit(BaseSettings):
"""Class model for params.features.rate_limit"""
redis_id: int = 1
class Query(BaseSettings):
"""Class model for params.features.rate_limit.query"""
rate: int = 5
period: str = "minute"
title: str = "Query Limit Reached"
message: str = (
"Query limit of {rate} per {period} reached. "
"Please wait one minute and try again."
).format(rate=rate, period=period)
button: str = "Try Again"
class Site(BaseSettings):
"""Class model for params.features.rate_limit.site"""
rate: int = 60
period: str = "minute"
title: str = "Limit Reached"
subtitle: str = (
"You have accessed this site more than {rate} "
"times in the last {period}."
).format(rate=rate, period=period)
button: str = "Try Again"
query: Query = Query()
site: Site = Site()
bgp_route: BgpRoute = BgpRoute()
bgp_community: BgpCommunity = BgpCommunity()
bgp_aspath: BgpAsPath = BgpAsPath()
ping: Ping = Ping()
traceroute: Traceroute = Traceroute()
blacklist: Blacklist = Blacklist()
cache: Cache = Cache()
max_prefix: MaxPrefix = MaxPrefix()
rate_limit: RateLimit = RateLimit()
vrf: Vrf = Vrf()
class Params(BaseSettings):
"""Base model for params"""
general: General = General()
features: Features = Features()
branding: Branding = Branding()
messages: Messages = Messages()
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True
class NosModel(BaseSettings):
"""Class model for non-default commands"""
class Dual(BaseSettings):
"""Class model for non-default dual afi commands"""
bgp_aspath: str = None
bgp_community: str = None
class IPv4(BaseSettings):
"""Class model for non-default ipv4 commands"""
bgp_route: str = None
ping: str = None
traceroute: str = None
class IPv6(BaseSettings):
"""Class model for non-default ipv6 commands"""
bgp_route: str = None
ping: str = None
traceroute: str = None
dual: Dual = Dual()
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
class Commands(BaseSettings):
"""Base class for commands class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, dynamically sets
attributes for the commands class.
"""
obj = Commands()
for (nos, cmds) in input_params.items():
setattr(Commands, nos, NosModel(**cmds))
return obj
# class CiscoIOS(BaseSettings):
# """Class model for default cisco_ios commands"""
# class Dual(BaseSettings):
# """Default commands for dual afi commands"""
# bgp_community: str = (
# "show bgp all community {target} | section {afis}Network"
# )
# bgp_aspath: str = (
# 'show bgp all quote-regexp "{target}" | section {afis}Network'
# )
# class IPv4(BaseSettings):
# """Default commands for ipv4 commands"""
# bgp_route: str = "show bgp ipv4 unicast {target} | exclude pathid:|Epoch"
# ping: str = "ping {target} repeat 5 source {source} | exclude Type escape"
# traceroute: str = (
# "traceroute {target} timeout 1 probe 2 source {source} "
# "| exclude Type escape"
# )
# class IPv6(BaseSettings):
# """Default commands for ipv6 commands"""
# bgp_route: str = "show bgp ipv6 unicast {target} | exclude pathid:|Epoch"
# ping: str = (
# "ping ipv6 {target} repeat 5 source {source} | exclude Type escape"
# )
# traceroute: str = (
# "traceroute ipv6 {target} timeout 1 probe 2 source {source} "
# "| exclude Type escape"
# )
# dual: Dual = Dual()
# ipv4: IPv4 = IPv4()
# ipv6: IPv6 = IPv6()
class CiscoIOS(BaseSettings):
"""Class model for default cisco_ios commands"""
class VPNv4(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community: str = "show bgp {afi} unicast vrf {vrf} community {target}"
bgp_aspath: str = 'show bgp {afi} unicast vrf {vrf} quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast vrf {vrf} {target}"
ping: str = "ping vrf {vrf} {target} repeat 5 source {source}"
traceroute: str = (
"traceroute vrf {vrf} {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
class VPNv6(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community: str = "show bgp {afi} unicast vrf {vrf} community {target}"
bgp_aspath: str = 'show bgp {afi} unicast vrf {vrf} quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast vrf {vrf} {target}"
ping: str = "ping vrf {vrf} {target} repeat 5 source {source}"
traceroute: str = (
"traceroute vrf {vrf} {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
class IPv4(BaseSettings):
"""Default commands for ipv4 commands"""
bgp_community: str = "show bgp {afi} unicast community {target}"
bgp_aspath: str = 'show bgp {afi} unicast quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast {target} | exclude pathid:|Epoch"
ping: str = "ping {target} repeat 5 source {source} | exclude Type escape"
traceroute: str = (
"traceroute {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
class IPv6(BaseSettings):
"""Default commands for ipv6 commands"""
bgp_community: str = "show bgp {afi} unicast community {target}"
bgp_aspath: str = 'show bgp {afi} unicast quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast {target} | exclude pathid:|Epoch"
ping: str = (
"ping {afi} {target} repeat 5 source {source} | exclude Type escape"
)
traceroute: str = (
"traceroute ipv6 {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
vpnv4: VPNv4 = VPNv4()
vpnv6: VPNv6 = VPNv6()
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
class CiscoXR(BaseSettings):
"""Class model for default cisco_xr commands"""
class Dual(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community: str = (
"show bgp all unicast community {target} | utility egrep -v "
'"\\(BGP |Table |Non-stop\\)"'
)
bgp_aspath: str = (
"show bgp all unicast regexp {target} | utility egrep -v "
'"\\(BGP |Table |Non-stop\\)"'
)
class IPv4(BaseSettings):
"""Default commands for ipv4 commands"""
bgp_route: str = (
"show bgp ipv4 unicast {target} | util egrep \\(BGP routing table "
"entry|Path \\#|aggregated by|Origin |Community:|validity| from \\)"
)
ping: str = "ping ipv4 {target} count 5 source {src_addr_ipv4}"
traceroute: str = (
"traceroute ipv4 {target} timeout 1 probe 2 source {source}"
)
class IPv6(BaseSettings):
"""Default commands for ipv6 commands"""
bgp_route: str = (
"show bgp ipv6 unicast {target} | util egrep \\(BGP routing table "
"entry|Path \\#|aggregated by|Origin |Community:|validity| from \\)"
)
ping: str = "ping ipv6 {target} count 5 source {src_addr_ipv6}"
traceroute: str = (
"traceroute ipv6 {target} timeout 1 probe 2 source {source}"
)
dual: Dual = Dual()
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
class Juniper(BaseSettings):
"""Class model for default juniper commands"""
class Dual(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community = "show route protocol bgp community {target}"
bgp_aspath = "show route protocol bgp aspath-regex {target}"
class IPv4(BaseSettings):
"""Default commands for ipv4 commands"""
bgp_route = "show route protocol bgp table inet.0 {target} detail"
ping = "ping inet {target} count 5 source {src_addr_ipv4}"
traceroute = "traceroute inet {target} wait 1 source {source}"
class IPv6(BaseSettings):
"""Default commands for ipv6 commands"""
bgp_route = "show route protocol bgp table inet6.0 {target} detail"
ping = "ping inet6 {target} count 5 source {src_addr_ipv6}"
traceroute = "traceroute inet6 {target} wait 1 source {source}"
dual: Dual = Dual()
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
cisco_ios: NosModel = CiscoIOS()
cisco_xr: NosModel = CiscoXR()
juniper: NosModel = Juniper()
class Config:
"""Pydantic Config"""
validate_all = False
validate_assignment = True

View file

@ -0,0 +1,7 @@
"""
Defines models for all config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""

View file

@ -0,0 +1,16 @@
"""
Utility Functions for Pydantic Models
"""
import re
def clean_name(_name):
"""
Converts any "desirable" seperators to underscore, then
removes all characters that are unsupported in Python class
variable names. Also removes leading numbers underscores.
"""
_replaced = re.sub(r"[\-|\.|\@|\~|\:\/|\s]", "_", _name)
_scrubbed = "".join(re.findall(r"([a-zA-Z]\w+|\_+)", _replaced))
return _scrubbed.lower()

View file

@ -0,0 +1,121 @@
"""
Defines models for all Branding variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Third Party Imports
from pydantic import BaseSettings
from pydantic import validator
from pydantic.color import Color
class Branding(BaseSettings):
"""Class model for params.branding"""
site_name: str = "hyperglass"
class Colors(BaseSettings):
"""Class model for params.colors"""
primary: Color = "#40798c"
secondary: Color = "#330036"
danger: Color = "#a21024"
warning: Color = "#eec643"
light: Color = "#fbfffe"
dark: Color = "#383541"
background: Color = "#fbfffe"
class Credit(BaseSettings):
"""Class model for params.branding.credit"""
enable: bool = True
class Font(BaseSettings):
"""Class model for params.branding.font"""
primary: str = "Nunito"
mono: str = "Fira Code"
class HelpMenu(BaseSettings):
"""Class model for params.branding.help_menu"""
enable: bool = True
class Logo(BaseSettings):
"""Class model for params.branding.logo"""
path: str = "ui/images/hyperglass-dark.png"
width: int = 384
favicons: str = "ui/images/favicons/"
class PeeringDb(BaseSettings):
"""Class model for params.branding.peering_db"""
enable: bool = True
class Terms(BaseSettings):
"""Class model for params.branding.terms"""
enable: bool = True
class Text(BaseSettings):
"""Class model for params.branding.text"""
title_mode: str = "logo_only"
title: str = "hyperglass"
subtitle: str = "AS{primary_asn}"
query_location: str = "Location"
query_type: str = "Query"
query_target: str = "Target"
terms: str = "Terms"
info: str = "Help"
peeringdb = "PeeringDB"
bgp_route: str = "BGP Route"
bgp_community: str = "BGP Community"
bgp_aspath: str = "BGP AS Path"
ping: str = "Ping"
traceroute: str = "Traceroute"
vrf: str = "VRF"
class Error404(BaseSettings):
"""Class model for 404 Error Page"""
title: str = "Error"
subtitle: str = "{uri} isn't a thing"
button: str = "Home"
class Error500(BaseSettings):
"""Class model for 500 Error Page"""
title: str = "Error"
subtitle: str = "Something Went Wrong"
button: str = "Home"
class Error504(BaseSettings):
"""Class model for 504 Error Element"""
message: str = "Unable to reach {target}"
error404: Error404 = Error404()
error500: Error500 = Error500()
error504: Error504 = Error504()
@validator("title_mode")
def check_title_mode(cls, v):
"""Verifies title_mode matches supported values"""
supported_modes = ["logo_only", "text_only", "logo_title", "all"]
if v not in supported_modes:
raise ValueError("title_mode must be one of {}".format(supported_modes))
return v
colors: Colors = Colors()
credit: Credit = Credit()
font: Font = Font()
help_menu: HelpMenu = HelpMenu()
logo: Logo = Logo()
peering_db: PeeringDb = PeeringDb()
terms: Terms = Terms()
text: Text = Text()

View file

@ -0,0 +1,208 @@
"""
Defines models for all config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Third Party Imports
from pydantic import BaseSettings
class Command(BaseSettings):
"""Class model for non-default commands"""
class IPv4(BaseSettings):
"""Class model for non-default dual afi commands"""
bgp_route: str = ""
bgp_aspath: str = ""
bgp_community: str = ""
ping: str = ""
traceroute: str = ""
class IPv6(BaseSettings):
"""Class model for non-default ipv4 commands"""
bgp_route: str = ""
bgp_aspath: str = ""
bgp_community: str = ""
ping: str = ""
traceroute: str = ""
class VPNIPv4(BaseSettings):
"""Class model for non-default ipv6 commands"""
bgp_route: str = ""
bgp_aspath: str = ""
bgp_community: str = ""
ping: str = ""
traceroute: str = ""
class VPNIPv6(BaseSettings):
"""Class model for non-default ipv6 commands"""
bgp_route: str = ""
bgp_aspath: str = ""
bgp_community: str = ""
ping: str = ""
traceroute: str = ""
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
vpn_ipv4: VPNIPv4 = VPNIPv4()
vpn_ipv6: VPNIPv6 = VPNIPv6()
class Commands(BaseSettings):
"""Base class for commands class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, dynamically sets
attributes for the commands class.
"""
obj = Commands()
for (nos, cmds) in input_params.items():
setattr(Commands, nos, Command(**cmds))
return obj
class CiscoIOS(BaseSettings):
"""Class model for default cisco_ios commands"""
class VPNv4IPv4(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community: str = "show bgp {afi} unicast vrf {vrf} community {target}"
bgp_aspath: str = 'show bgp {afi} unicast vrf {vrf} quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast vrf {vrf} {target}"
ping: str = "ping vrf {vrf} {target} repeat 5 source {source}"
traceroute: str = (
"traceroute vrf {vrf} {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
class VPNv6IPv6(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community: str = "show bgp {afi} unicast vrf {vrf} community {target}"
bgp_aspath: str = 'show bgp {afi} unicast vrf {vrf} quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast vrf {vrf} {target}"
ping: str = "ping vrf {vrf} {target} repeat 5 source {source}"
traceroute: str = (
"traceroute vrf {vrf} {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
class IPv4(BaseSettings):
"""Default commands for ipv4 commands"""
bgp_community: str = "show bgp {afi} unicast community {target}"
bgp_aspath: str = 'show bgp {afi} unicast quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast {target} | exclude pathid:|Epoch"
ping: str = "ping {target} repeat 5 source {source} | exclude Type escape"
traceroute: str = (
"traceroute {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
class IPv6(BaseSettings):
"""Default commands for ipv6 commands"""
bgp_community: str = "show bgp {afi} unicast community {target}"
bgp_aspath: str = 'show bgp {afi} unicast quote-regexp "{target}"'
bgp_route: str = "show bgp {afi} unicast {target} | exclude pathid:|Epoch"
ping: str = (
"ping {afi} {target} repeat 5 source {source} | exclude Type escape"
)
traceroute: str = (
"traceroute ipv6 {target} timeout 1 probe 2 source {source} "
"| exclude Type escape"
)
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
vpn_ipv4: VPNv4IPv4 = VPNv4IPv4()
vpn_ipv6: VPNv6IPv6 = VPNv6IPv6()
class CiscoXR(BaseSettings):
"""Class model for default cisco_xr commands"""
class Dual(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community: str = (
"show bgp all unicast community {target} | utility egrep -v "
'"\\(BGP |Table |Non-stop\\)"'
)
bgp_aspath: str = (
"show bgp all unicast regexp {target} | utility egrep -v "
'"\\(BGP |Table |Non-stop\\)"'
)
class IPv4(BaseSettings):
"""Default commands for ipv4 commands"""
bgp_route: str = (
"show bgp ipv4 unicast {target} | util egrep \\(BGP routing table "
"entry|Path \\#|aggregated by|Origin |Community:|validity| from \\)"
)
ping: str = "ping ipv4 {target} count 5 source {src_addr_ipv4}"
traceroute: str = (
"traceroute ipv4 {target} timeout 1 probe 2 source {source}"
)
class IPv6(BaseSettings):
"""Default commands for ipv6 commands"""
bgp_route: str = (
"show bgp ipv6 unicast {target} | util egrep \\(BGP routing table "
"entry|Path \\#|aggregated by|Origin |Community:|validity| from \\)"
)
ping: str = "ping ipv6 {target} count 5 source {src_addr_ipv6}"
traceroute: str = (
"traceroute ipv6 {target} timeout 1 probe 2 source {source}"
)
dual: Dual = Dual()
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
class Juniper(BaseSettings):
"""Class model for default juniper commands"""
class Dual(BaseSettings):
"""Default commands for dual afi commands"""
bgp_community = "show route protocol bgp community {target}"
bgp_aspath = "show route protocol bgp aspath-regex {target}"
class IPv4(BaseSettings):
"""Default commands for ipv4 commands"""
bgp_route = "show route protocol bgp table inet.0 {target} detail"
ping = "ping inet {target} count 5 source {src_addr_ipv4}"
traceroute = "traceroute inet {target} wait 1 source {source}"
class IPv6(BaseSettings):
"""Default commands for ipv6 commands"""
bgp_route = "show route protocol bgp table inet6.0 {target} detail"
ping = "ping inet6 {target} count 5 source {src_addr_ipv6}"
traceroute = "traceroute inet6 {target} wait 1 source {source}"
dual: Dual = Dual()
ipv4: IPv4 = IPv4()
ipv6: IPv6 = IPv6()
cisco_ios: Command = CiscoIOS()
cisco_xr: Command = CiscoXR()
juniper: Command = Juniper()
class Config:
"""Pydantic Config"""
validate_all = False
validate_assignment = True

View file

@ -0,0 +1,44 @@
"""
Defines models for Credential config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Third Party Imports
from pydantic import BaseSettings
from pydantic import SecretStr
# Project Imports
from hyperglass.configuration.models._utils import clean_name
class Credential(BaseSettings):
"""Model for per-credential config in devices.yaml"""
username: str
password: SecretStr
class Credentials(BaseSettings):
"""Base model for credentials class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the credentials class.
"""
obj = Credentials()
for (credname, params) in input_params.items():
cred = clean_name(credname)
setattr(Credentials, cred, Credential(**params))
return obj
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True

View file

@ -0,0 +1,145 @@
"""
Defines models for all Features variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Standard Library Imports
from math import ceil
from typing import List
# Third Party Imports
from pydantic import BaseSettings
from pydantic import IPvAnyNetwork
from pydantic import constr
class Features(BaseSettings):
"""Class model for params.features"""
class Vrf(BaseSettings):
"""Class model for params.features.vrf"""
enable: bool = False
class BgpRoute(BaseSettings):
"""Class model for params.features.bgp_route"""
enable: bool = True
class BgpCommunity(BaseSettings):
"""Class model for params.features.bgp_community"""
enable: bool = True
class Regex(BaseSettings):
"""Class model for params.features.bgp_community.regex"""
decimal: str = r"^[0-9]{1,10}$"
extended_as: str = r"^([0-9]{0,5})\:([0-9]{1,5})$"
large: str = r"^([0-9]{1,10})\:([0-9]{1,10})\:[0-9]{1,10}$"
regex: Regex = Regex()
class BgpAsPath(BaseSettings):
"""Class model for params.features.bgp_aspath"""
enable: bool = True
class Regex(BaseSettings):
"""Class model for params.bgp_aspath.regex"""
mode: constr(regex="asplain|asdot") = "asplain"
asplain: str = r"^(\^|^\_)(\d+\_|\d+\$|\d+\(\_\.\+\_\))+$"
asdot: str = (
r"^(\^|^\_)((\d+\.\d+)\_|(\d+\.\d+)\$|(\d+\.\d+)\(\_\.\+\_\))+$"
)
regex: Regex = Regex()
class Ping(BaseSettings):
"""Class model for params.features.ping"""
enable: bool = True
class Traceroute(BaseSettings):
"""Class model for params.features.traceroute"""
enable: bool = True
class Blacklist(BaseSettings):
"""Class model for params.features.blacklist"""
enable: bool = True
networks: List[IPvAnyNetwork] = [
"198.18.0.0/15",
"100.64.0.0/10",
"2001:db8::/32",
"10.0.0.0/8",
"192.168.0.0/16",
"172.16.0.0/12",
]
class Cache(BaseSettings):
"""Class model for params.features.cache"""
redis_id: int = 0
timeout: int = 120
show_text: bool = True
text: str = "Results will be cached for {timeout} minutes.".format(
timeout=ceil(timeout / 60)
)
class MaxPrefix(BaseSettings):
"""Class model for params.features.max_prefix"""
enable: bool = False
ipv4: int = 24
ipv6: int = 64
message: str = (
"Prefix length must be smaller than /{m}. <b>{i}</b> is too specific."
)
class RateLimit(BaseSettings):
"""Class model for params.features.rate_limit"""
redis_id: int = 1
class Query(BaseSettings):
"""Class model for params.features.rate_limit.query"""
rate: int = 5
period: str = "minute"
title: str = "Query Limit Reached"
message: str = (
"Query limit of {rate} per {period} reached. "
"Please wait one minute and try again."
).format(rate=rate, period=period)
button: str = "Try Again"
class Site(BaseSettings):
"""Class model for params.features.rate_limit.site"""
rate: int = 60
period: str = "minute"
title: str = "Limit Reached"
subtitle: str = (
"You have accessed this site more than {rate} "
"times in the last {period}."
).format(rate=rate, period=period)
button: str = "Try Again"
query: Query = Query()
site: Site = Site()
bgp_route: BgpRoute = BgpRoute()
bgp_community: BgpCommunity = BgpCommunity()
bgp_aspath: BgpAsPath = BgpAsPath()
ping: Ping = Ping()
traceroute: Traceroute = Traceroute()
blacklist: Blacklist = Blacklist()
cache: Cache = Cache()
max_prefix: MaxPrefix = MaxPrefix()
rate_limit: RateLimit = RateLimit()
vrf: Vrf = Vrf()

View file

@ -0,0 +1,27 @@
"""
Defines models for General config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Standard Library Imports
from typing import List
from typing import Union
# Third Party Imports
from pydantic import BaseSettings
from pydantic import IPvAnyNetwork
class General(BaseSettings):
"""Class model for params.general"""
debug: bool = False
primary_asn: str = "65001"
org_name: str = "The Company"
google_analytics: Union[str, None] = None
redis_host: Union[str, IPvAnyNetwork] = "localhost"
redis_port: int = 6379
requires_ipv6_cidr: List[str] = ["cisco_ios", "cisco_nxos"]
request_timeout: int = 15

View file

@ -0,0 +1,35 @@
"""
Defines models for Messages config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Third Party Imports
from pydantic import BaseSettings
class Messages(BaseSettings):
"""Class model for params.messages"""
no_query_type: str = "A query type must be specified."
no_location: str = "A location must be selected."
no_input: str = "{field} must be specified."
blacklist: str = "{target} a member of {blacklisted_net}, which is not allowed."
max_prefix: str = (
"Prefix length must be shorter than /{max_length}. {target} is too specific."
)
requires_ipv6_cidr: str = (
"{device_name} requires IPv6 BGP lookups to be in CIDR notation."
)
invalid_input: str = "{target} is not a valid {query_type} target."
invalid_field: str = "{input} is an invalid {field}."
general: str = "Something went wrong."
directed_cidr: str = "{query_type} queries can not be in CIDR format."
request_timeout: str = "Request timed out."
connection_error: str = "Error connecting to {device_name}: {error}"
authentication_error: str = "Authentication error occurred."
noresponse_error: str = "No response."
vrf_not_associated: str = "{vrf} is not associated with {device_name}."
no_matching_vrfs: str = "No VRFs Match"

View file

@ -0,0 +1,45 @@
"""
Defines models for Networks config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Third Party Imports
from pydantic import BaseSettings
# Project Imports
from hyperglass.configuration.models._utils import clean_name
class Network(BaseSettings):
"""Model for per-network/asn config in devices.yaml"""
display_name: str
class Networks(BaseSettings):
"""Base model for networks class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the credentials class.
"""
obj = Networks()
networks = {}
for (netname, params) in input_params.items():
netname = clean_name(netname)
setattr(Networks, netname, Network(**params))
networks.update({netname: Network(**params).dict()})
Networks.networks = networks
return obj
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True

View file

@ -0,0 +1,29 @@
"""
Defines models for all Params variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Third Party Imports
from pydantic import BaseSettings
from hyperglass.configuration.models.branding import Branding
from hyperglass.configuration.models.features import Features
from hyperglass.configuration.models.general import General
from hyperglass.configuration.models.messages import Messages
class Params(BaseSettings):
"""Base model for params"""
general: General = General()
features: Features = Features()
branding: Branding = Branding()
messages: Messages = Messages()
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True

View file

@ -0,0 +1,62 @@
"""
Defines models for Router config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Standard Library Imports
from typing import Union
# Third Party Imports
from pydantic import BaseSettings
from pydantic import IPvAnyAddress
from pydantic import SecretStr
from pydantic import validator
# Project Imports
from hyperglass.configuration.models._utils import clean_name
from hyperglass.exceptions import UnsupportedDevice
class Proxy(BaseSettings):
"""Model for per-proxy config in devices.yaml"""
address: Union[IPvAnyAddress, str]
port: int = 22
username: str
password: SecretStr
nos: str
ssh_command: str
@validator("nos")
def supported_nos(cls, v): # noqa: N805
"""
Validates that passed nos string is supported by hyperglass.
"""
if not v == "linux_ssh":
raise UnsupportedDevice(f'"{v}" device type is not supported.')
return v
class Proxies(BaseSettings):
"""Base model for proxies class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the proxies class.
"""
obj = Proxies()
for (devname, params) in input_params.items():
dev = clean_name(devname)
setattr(Proxies, dev, Proxy(**params))
return obj
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True

View file

@ -0,0 +1,90 @@
"""
Defines models for Router config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Standard Library Imports
from ipaddress import IPv4Address
from ipaddress import IPv6Address
from typing import List
from typing import Union
# Third Party Imports
from pydantic import BaseSettings
from pydantic import IPvAnyAddress
from pydantic import validator
# Project Imports
from hyperglass.configuration.models._utils import clean_name
from hyperglass.constants import Supported
from hyperglass.exceptions import UnsupportedDevice
class Router(BaseSettings):
"""Model for per-router config in devices.yaml."""
address: Union[IPvAnyAddress, str]
network: str
src_addr_ipv4: IPv4Address
src_addr_ipv6: IPv6Address
credential: str
proxy: Union[str, None] = None
location: str
display_name: str
port: int
nos: str
commands: Union[str, None] = None
vrfs: List[str] = ["default"]
@validator("nos")
def supported_nos(cls, v): # noqa: N805
"""Validates that passed nos string is supported by hyperglass"""
if not Supported.is_supported(v):
raise UnsupportedDevice(f'"{v}" device type is not supported.')
return v
@validator("credential", "proxy", "location")
def clean_name(cls, v): # noqa: N805
"""Remove or replace unsupported characters from field values"""
return clean_name(v)
@validator("commands", always=True)
def validate_commands(cls, v, values): # noqa: N805
if v is None:
v = values["nos"]
return v
class Routers(BaseSettings):
"""Base model for devices class."""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from device names, dynamically sets attributes for
the Routers class.
"""
routers = {}
hostnames = []
vrfs = set()
for (devname, params) in input_params.items():
dev = clean_name(devname)
router_params = Router(**params)
setattr(Routers, dev, router_params)
routers.update({dev: router_params.dict()})
hostnames.append(dev)
for vrf in router_params.dict()["vrfs"]:
vrfs.add(vrf)
Routers.routers = routers
Routers.hostnames = hostnames
Routers.vrfs = list(vrfs)
return Routers()
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True

View file

@ -0,0 +1,66 @@
"""
Defines models for VRF config variables.
Imports config variables and overrides default class attributes.
Validates input for overridden parameters.
"""
# Standard Library Imports
from typing import List
# Third Party Imports
from pydantic import BaseSettings
# Project Imports
from hyperglass.configuration.models._utils import clean_name
class Vrf(BaseSettings):
"""Model for per VRF/afi config in devices.yaml"""
display_name: str
name: str
afis: List[str]
class Vrfs(BaseSettings):
"""Base model for vrfs class"""
@classmethod
def import_params(cls, input_params):
"""
Imports passed dict from YAML config, removes unsupported
characters from VRF names, dynamically sets attributes for
the Vrfs class.
"""
vrfs: Vrf = {
"default": {
"display_name": "Default",
"name": "default",
"afis": ["ipv4, ipv6"],
}
}
names: List[str] = ["default"]
_all: List[str] = ["default"]
for (vrf_key, params) in input_params.items():
vrf = clean_name(vrf_key)
vrf_params = Vrf(**params)
vrfs.update({vrf: vrf_params.dict()})
names.append(params.get("name"))
_all.append(vrf_key)
for (vrf_key, params) in vrfs.items():
setattr(Vrfs, vrf_key, params)
names: List[str] = list(set(names))
_all: List[str] = list(set(_all))
Vrfs.vrfs = vrfs
Vrfs.names = names
Vrfs._all = _all
return Vrfs()
class Config:
"""Pydantic Config"""
validate_all = True
validate_assignment = True