forked from mirrors/thatmattlove-hyperglass
WIP: switching to 100% custom exceptions for error handling
This commit is contained in:
parent
a8a6e06f45
commit
b5b1122ec6
5 changed files with 286 additions and 248 deletions
|
|
@ -26,7 +26,7 @@ from hyperglass.configuration import proxies
|
|||
from hyperglass.constants import Supported
|
||||
from hyperglass.constants import code
|
||||
from hyperglass.constants import protocol_map
|
||||
from hyperglass.exceptions import CantConnect
|
||||
from hyperglass.exceptions import AuthError, RestError, ScrapeError
|
||||
|
||||
|
||||
class Connect:
|
||||
|
|
@ -54,90 +54,99 @@ class Connect:
|
|||
connect to the remote device.
|
||||
"""
|
||||
response = None
|
||||
try:
|
||||
if self.device_config.proxy:
|
||||
device_proxy = getattr(proxies, self.device_config.proxy)
|
||||
logger.debug(
|
||||
f"Proxy: {device_proxy.address.compressed}:{device_proxy.port}"
|
||||
if self.device_config.proxy:
|
||||
device_proxy = getattr(proxies, self.device_config.proxy)
|
||||
logger.debug(
|
||||
f"Proxy: {device_proxy.address.compressed}:{device_proxy.port}"
|
||||
)
|
||||
logger.debug(
|
||||
"Connecting to {dev} via sshtunnel library...".format(
|
||||
dev=self.device_config.proxy
|
||||
)
|
||||
logger.debug(
|
||||
"Connecting to {dev} via sshtunnel library...".format(
|
||||
dev=self.device_config.proxy
|
||||
)
|
||||
)
|
||||
with sshtunnel.open_tunnel(
|
||||
device_proxy.address.compressed,
|
||||
device_proxy.port,
|
||||
ssh_username=device_proxy.username,
|
||||
ssh_password=device_proxy.password.get_secret_value(),
|
||||
remote_bind_address=(
|
||||
self.device_config.address.compressed,
|
||||
self.device_config.port,
|
||||
),
|
||||
local_bind_address=("localhost", 0),
|
||||
) as tunnel:
|
||||
logger.debug(f"Established tunnel with {self.device_config.proxy}")
|
||||
scrape_host = {
|
||||
"host": "localhost",
|
||||
"port": tunnel.local_bind_port,
|
||||
"device_type": self.device_config.nos,
|
||||
"username": self.cred.username,
|
||||
"password": self.cred.password.get_secret_value(),
|
||||
"global_delay_factor": 0.2,
|
||||
}
|
||||
logger.debug(f"Local binding: localhost:{tunnel.local_bind_port}")
|
||||
try:
|
||||
logger.debug(
|
||||
"Connecting to {dev} via Netmiko library...".format(
|
||||
dev=self.device_config.location
|
||||
)
|
||||
)
|
||||
nm_connect_direct = ConnectHandler(**scrape_host)
|
||||
response = nm_connect_direct.send_command(self.query)
|
||||
except (
|
||||
OSError,
|
||||
NetMikoAuthenticationException,
|
||||
NetMikoTimeoutException,
|
||||
NetmikoAuthError,
|
||||
NetmikoTimeoutError,
|
||||
sshtunnel.BaseSSHTunnelForwarderError,
|
||||
) as scrape_error:
|
||||
raise CantConnect(scrape_error)
|
||||
else:
|
||||
)
|
||||
with sshtunnel.open_tunnel(
|
||||
device_proxy.address.compressed,
|
||||
device_proxy.port,
|
||||
ssh_username=device_proxy.username,
|
||||
ssh_password=device_proxy.password.get_secret_value(),
|
||||
remote_bind_address=(
|
||||
self.device_config.address.compressed,
|
||||
self.device_config.port,
|
||||
),
|
||||
local_bind_address=("localhost", 0),
|
||||
) as tunnel:
|
||||
logger.debug(f"Established tunnel with {self.device_config.proxy}")
|
||||
scrape_host = {
|
||||
"host": self.device_config.address.compressed,
|
||||
"port": self.device_config.port,
|
||||
"host": "localhost",
|
||||
"port": tunnel.local_bind_port,
|
||||
"device_type": self.device_config.nos,
|
||||
"username": self.cred.username,
|
||||
"password": self.cred.password.get_secret_value(),
|
||||
"global_delay_factor": 0.2,
|
||||
}
|
||||
logger.debug(f"Local binding: localhost:{tunnel.local_bind_port}")
|
||||
try:
|
||||
logger.debug(
|
||||
"Connecting to {dev} via Netmiko library...".format(
|
||||
dev=self.device_config.location
|
||||
)
|
||||
)
|
||||
logger.debug(f"Device Parameters: {scrape_host}")
|
||||
nm_connect_direct = ConnectHandler(**scrape_host)
|
||||
response = nm_connect_direct.send_command(self.query)
|
||||
except (
|
||||
NetMikoAuthenticationException,
|
||||
OSError,
|
||||
NetMikoTimeoutException,
|
||||
NetmikoAuthError,
|
||||
NetmikoTimeoutError,
|
||||
sshtunnel.BaseSSHTunnelForwarderError,
|
||||
) as scrape_error:
|
||||
raise CantConnect(scrape_error)
|
||||
if not response:
|
||||
raise CantConnect("No response")
|
||||
status = code.valid
|
||||
logger.debug(f"Output for query: {self.query}:\n{response}")
|
||||
except CantConnect as scrape_error:
|
||||
logger.error(scrape_error)
|
||||
response = params.messages.general
|
||||
status = code.invalid
|
||||
return response, status
|
||||
raise ScrapeError(
|
||||
device=self.device_config.location,
|
||||
proxy=self.device_config.proxy,
|
||||
error_msg=scrape_error,
|
||||
) from None
|
||||
except (NetMikoAuthenticationException, NetmikoAuthError) as auth_error:
|
||||
raise AuthError(
|
||||
device=self.device_config.location,
|
||||
proxy=self.device_config.proxy,
|
||||
error_msg=auth_error,
|
||||
) from None
|
||||
else:
|
||||
scrape_host = {
|
||||
"host": self.device_config.address.compressed,
|
||||
"port": self.device_config.port,
|
||||
"device_type": self.device_config.nos,
|
||||
"username": self.cred.username,
|
||||
"password": self.cred.password.get_secret_value(),
|
||||
"global_delay_factor": 0.2,
|
||||
}
|
||||
try:
|
||||
logger.debug(
|
||||
"Connecting to {dev} via Netmiko library...".format(
|
||||
dev=self.device_config.location
|
||||
)
|
||||
)
|
||||
logger.debug(f"Device Parameters: {scrape_host}")
|
||||
nm_connect_direct = ConnectHandler(**scrape_host)
|
||||
response = nm_connect_direct.send_command(self.query)
|
||||
except (
|
||||
OSError,
|
||||
NetMikoTimeoutException,
|
||||
NetmikoTimeoutError,
|
||||
sshtunnel.BaseSSHTunnelForwarderError,
|
||||
) as scrape_error:
|
||||
raise ScrapeError(
|
||||
device=self.device_config.location, error_msg=scrape_error
|
||||
) from None
|
||||
except (NetMikoAuthenticationException, NetmikoAuthError) as auth_error:
|
||||
raise AuthError(
|
||||
device=self.device_config.location, error_msg=auth_error
|
||||
) from None
|
||||
if not response:
|
||||
raise ScrapeError(
|
||||
device=self.device_config.location, error_msg="No response"
|
||||
)
|
||||
logger.debug(f"Output for query: {self.query}:\n{response}")
|
||||
return response
|
||||
|
||||
async def rest(self):
|
||||
"""Sends HTTP POST to router running a hyperglass API agent"""
|
||||
|
|
@ -162,7 +171,6 @@ class Connect:
|
|||
endpoint, headers=headers, json=self.query, timeout=7
|
||||
)
|
||||
response = raw_response.text
|
||||
status = raw_response.status_code
|
||||
|
||||
logger.debug(f"HTTP status code: {status}")
|
||||
logger.debug(f"Output for query {self.query}:\n{response}")
|
||||
|
|
@ -186,10 +194,8 @@ class Connect:
|
|||
) as rest_error:
|
||||
logger.error(f"Error connecting to device {self.device_config.location}")
|
||||
logger.error(rest_error)
|
||||
|
||||
response = params.messages.general
|
||||
status = code.invalid
|
||||
return response, status
|
||||
raise RestError(device=self.device_config.location, error_msg=rest_error)
|
||||
return response
|
||||
|
||||
|
||||
class Execute:
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ from logzero import logger
|
|||
# Project Imports
|
||||
from hyperglass.configuration import logzero_config # noqa: F401
|
||||
from hyperglass.configuration import params
|
||||
from hyperglass.constants import code
|
||||
from hyperglass.exceptions import InputInvalid, InputNotAllowed
|
||||
|
||||
|
||||
class IPType:
|
||||
|
|
@ -82,16 +82,14 @@ class IPType:
|
|||
|
||||
def ip_validate(target):
|
||||
"""Validates if input is a valid IP address"""
|
||||
validity = False
|
||||
try:
|
||||
valid_ip = ipaddress.ip_network(target)
|
||||
if valid_ip.is_reserved or valid_ip.is_unspecified or valid_ip.is_loopback:
|
||||
raise ValueError
|
||||
validity = True
|
||||
raise InputInvalid(target=target)
|
||||
except (ipaddress.AddressValueError, ValueError):
|
||||
logger.debug(f"IP {target} is invalid")
|
||||
validity = False
|
||||
return validity
|
||||
raise InputInvalid(target=target) from None
|
||||
return valid_ip
|
||||
|
||||
|
||||
def ip_blacklist(target):
|
||||
|
|
@ -101,7 +99,6 @@ def ip_blacklist(target):
|
|||
"""
|
||||
logger.debug(f"Blacklist Enabled: {params.features.blacklist.enable}")
|
||||
target = ipaddress.ip_network(target)
|
||||
membership = False
|
||||
if params.features.blacklist.enable:
|
||||
target_ver = target.version
|
||||
user_blacklist = params.features.blacklist.networks
|
||||
|
|
@ -113,18 +110,15 @@ def ip_blacklist(target):
|
|||
logger.debug(
|
||||
f"IPv{target_ver} Blacklist Networks: {[str(n) for n in networks]}"
|
||||
)
|
||||
while not membership:
|
||||
for net in networks:
|
||||
blacklist_net = ipaddress.ip_network(net)
|
||||
if (
|
||||
blacklist_net.network_address <= target.network_address
|
||||
and blacklist_net.network_address >= target.broadcast_address
|
||||
):
|
||||
membership = True
|
||||
logger.debug(f"Blacklist Match Found for {target} in {net}")
|
||||
break
|
||||
break
|
||||
return membership
|
||||
for net in networks:
|
||||
blacklist_net = ipaddress.ip_network(net)
|
||||
if (
|
||||
blacklist_net.network_address <= target.network_address
|
||||
and blacklist_net.network_address >= target.broadcast_address
|
||||
):
|
||||
logger.debug(f"Blacklist Match Found for {target} in {net}")
|
||||
raise InputNotAllowed(target=target) from None
|
||||
return target
|
||||
|
||||
|
||||
def ip_attributes(target):
|
||||
|
|
@ -151,24 +145,24 @@ def ip_type_check(query_type, target, device):
|
|||
"""Checks multiple IP address related validation parameters"""
|
||||
prefix_attr = ip_attributes(target)
|
||||
logger.debug(f"IP Attributes:\n{prefix_attr}")
|
||||
validity = False
|
||||
msg = params.messages.not_allowed.format(i=target)
|
||||
|
||||
# If target is a member of the blacklist, return an error.
|
||||
if ip_blacklist(target):
|
||||
validity = False
|
||||
logger.debug("Failed blacklist check")
|
||||
return (validity, msg)
|
||||
pass
|
||||
|
||||
# If enable_max_prefix feature enabled, require that BGP Route
|
||||
# queries be smaller than configured size limit.
|
||||
if query_type == "bgp_route" and params.features.max_prefix.enable:
|
||||
max_length = getattr(params.features.max_prefix, prefix_attr["afi"])
|
||||
if prefix_attr["length"] > max_length:
|
||||
validity = False
|
||||
msg = params.features.max_prefixmessage.format(
|
||||
m=max_length, i=prefix_attr["network"]
|
||||
)
|
||||
logger.debug("Failed max prefix length check")
|
||||
return (validity, msg)
|
||||
raise InputNotAllowed(
|
||||
target=target,
|
||||
error_msg=params.features.max_prefixmessage.format(
|
||||
m=max_length, i=prefix_attr["network"]
|
||||
),
|
||||
)
|
||||
|
||||
# If device NOS is listed in requires_ipv6_cidr.toml, and query is
|
||||
# an IPv6 host address, return an error.
|
||||
if (
|
||||
|
|
@ -177,20 +171,21 @@ def ip_type_check(query_type, target, device):
|
|||
and device.nos in params.general.requires_ipv6_cidr
|
||||
and IPType().is_host(target)
|
||||
):
|
||||
msg = params.messages.requires_ipv6_cidr.format(d=device.display_name)
|
||||
validity = False
|
||||
logger.debug("Failed requires IPv6 CIDR check")
|
||||
return (validity, msg)
|
||||
raise InputInvalid(
|
||||
target=target,
|
||||
error_msg=params.messages.requires_ipv6_cidr.format(d=device.display_name),
|
||||
)
|
||||
|
||||
# If query type is ping or traceroute, and query target is in CIDR
|
||||
# format, return an error.
|
||||
if query_type in ("ping", "traceroute") and IPType().is_cidr(target):
|
||||
msg = params.messages.directed_cidr.format(q=query_type.capitalize())
|
||||
validity = False
|
||||
logger.debug("Failed CIDR format for ping/traceroute check")
|
||||
return (validity, msg)
|
||||
validity = True
|
||||
msg = f"{target} is a valid {query_type} query."
|
||||
return (validity, msg)
|
||||
raise InputInvalid(
|
||||
target=target,
|
||||
error_msg=params.messages.directed_cidr.format(q=query_type.capitalize()),
|
||||
)
|
||||
return target
|
||||
|
||||
|
||||
class Validate:
|
||||
|
|
@ -200,125 +195,57 @@ class Validate:
|
|||
boolean for validity, specific error message, and status code.
|
||||
"""
|
||||
|
||||
def __init__(self, device):
|
||||
def __init__(self, device, query_type, target):
|
||||
"""Initialize device parameters and error codes."""
|
||||
self.device = device
|
||||
self.query_type = query_type
|
||||
self.target = target
|
||||
|
||||
def validate_ip(self):
|
||||
"""Validates IPv4/IPv6 Input"""
|
||||
logger.debug(f"Validating {self.query_type} query for target {self.target}...")
|
||||
|
||||
def ping(self, target):
|
||||
"""Ping Query: Input Validation & Error Handling"""
|
||||
query_type = "ping"
|
||||
logger.debug(f"Validating {query_type} query for target {target}...")
|
||||
validity = False
|
||||
msg = params.messages.invalid_ip.format(i=target)
|
||||
status = code.not_allowed
|
||||
# Perform basic validation of an IP address, return error if
|
||||
# not a valid IP.
|
||||
if not ip_validate(target):
|
||||
status = code.invalid
|
||||
logger.error(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
if ip_validate(self.target):
|
||||
pass
|
||||
|
||||
# Perform further validation of a valid IP address, return an
|
||||
# error upon failure.
|
||||
valid_query, msg = ip_type_check(query_type, target, self.device)
|
||||
if valid_query:
|
||||
validity = True
|
||||
msg = f"{target} is a valid {query_type} query."
|
||||
status = code.valid
|
||||
logger.debug(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
return (validity, msg, status)
|
||||
if ip_type_check(self.query_type, self.target, self.device):
|
||||
pass
|
||||
return self.target
|
||||
|
||||
def traceroute(self, target):
|
||||
"""Traceroute Query: Input Validation & Error Handling"""
|
||||
query_type = "traceroute"
|
||||
logger.debug(f"Validating {query_type} query for target {target}...")
|
||||
validity = False
|
||||
msg = params.messages.invalid_ip.format(i=target)
|
||||
status = code.not_allowed
|
||||
# Perform basic validation of an IP address, return error if
|
||||
# not a valid IP.
|
||||
if not ip_validate(target):
|
||||
status = code.invalid
|
||||
logger.error(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
# Perform further validation of a valid IP address, return an
|
||||
# error upon failure.
|
||||
valid_query, msg = ip_type_check(query_type, target, self.device)
|
||||
if valid_query:
|
||||
validity = True
|
||||
msg = f"{target} is a valid {query_type} query."
|
||||
status = code.valid
|
||||
logger.debug(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
return (validity, msg, status)
|
||||
|
||||
def bgp_route(self, target):
|
||||
"""BGP Route Query: Input Validation & Error Handling"""
|
||||
query_type = "bgp_route"
|
||||
logger.debug(f"Validating {query_type} query for target {target}...")
|
||||
validity = False
|
||||
msg = params.messages.invalid_ip.format(i=target)
|
||||
status = code.not_allowed
|
||||
# Perform basic validation of an IP address, return error if not
|
||||
# a valid IP.
|
||||
if not ip_validate(target):
|
||||
status = code.invalid
|
||||
logger.error(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
# Perform further validation of a valid IP address, return an
|
||||
# error upon failure.
|
||||
valid_query, msg = ip_type_check(query_type, target, self.device)
|
||||
if valid_query:
|
||||
validity = True
|
||||
msg = f"{target} is a valid {query_type} query."
|
||||
status = code.valid
|
||||
logger.debug(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
return (validity, msg, status)
|
||||
|
||||
@staticmethod
|
||||
def bgp_community(target):
|
||||
"""BGP Community Query: Input Validation & Error Handling"""
|
||||
query_type = "bgp_community"
|
||||
logger.debug(f"Validating {query_type} query for target {target}...")
|
||||
validity = False
|
||||
msg = params.messages.invalid_dual.format(i=target, qt="BGP Community")
|
||||
status = code.invalid
|
||||
def validate_dual(self):
|
||||
"""Validates Dual-Stack Input"""
|
||||
logger.debug(f"Validating {self.query_type} query for target {self.target}...")
|
||||
# Validate input communities against configured or default regex
|
||||
# pattern.
|
||||
# Extended Communities, new-format
|
||||
if re.match(params.features.bgp_community.regex.extended_as, target):
|
||||
validity = True
|
||||
msg = f"{target} matched extended AS format community."
|
||||
status = code.valid
|
||||
# Extended Communities, 32 bit format
|
||||
elif re.match(params.features.bgp_community.regex.decimal, target):
|
||||
validity = True
|
||||
msg = f"{target} matched decimal format community."
|
||||
status = code.valid
|
||||
# RFC 8092 Large Community Support
|
||||
elif re.match(params.features.bgp_community.regex.large, target):
|
||||
validity = True
|
||||
msg = f"{target} matched large community."
|
||||
status = code.valid
|
||||
logger.debug(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
if self.query_type == "bgp_community":
|
||||
# Extended Communities, new-format
|
||||
if re.match(params.features.bgp_community.regex.extended_as, self.target):
|
||||
pass
|
||||
# Extended Communities, 32 bit format
|
||||
elif re.match(params.features.bgp_community.regex.decimal, self.target):
|
||||
pass
|
||||
# RFC 8092 Large Community Support
|
||||
elif re.match(params.features.bgp_community.regex.large, self.target):
|
||||
pass
|
||||
else:
|
||||
raise InputInvalid(target=self.target, query_type=self.query_type)
|
||||
elif self.query_type == "bgp_aspath":
|
||||
# Validate input AS_PATH regex pattern against configured or
|
||||
# default regex pattern.
|
||||
mode = params.features.bgp_aspath.regex.mode
|
||||
pattern = getattr(params.features.bgp_aspath.regex, mode)
|
||||
if re.match(pattern, self.target):
|
||||
pass
|
||||
else:
|
||||
raise InputInvalid(target=self.target, query_type=self.query_type)
|
||||
return self.target
|
||||
|
||||
@staticmethod
|
||||
def bgp_aspath(target):
|
||||
"""BGP AS Path Query: Input Validation & Error Handling"""
|
||||
query_type = "bgp_aspath"
|
||||
logger.debug(f"Validating {query_type} query for target {target}...")
|
||||
validity = False
|
||||
msg = params.messages.invalid_dual.format(i=target, qt="AS Path")
|
||||
status = code.invalid
|
||||
# Validate input AS_PATH regex pattern against configured or
|
||||
# default regex pattern.
|
||||
mode = params.features.bgp_aspath.regex.mode
|
||||
pattern = getattr(params.features.bgp_aspath.regex, mode)
|
||||
if re.match(pattern, target):
|
||||
validity = True
|
||||
msg = f"{target} matched AS_PATH regex."
|
||||
status = code.valid
|
||||
logger.debug(f"{msg}, {status}")
|
||||
return (validity, msg, status)
|
||||
def valdiate_query(self):
|
||||
if self.query_type in ("bgp_community", "bgp_aspath"):
|
||||
return self.validate_dual()
|
||||
else:
|
||||
return self.validate_ip()
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ from pydantic import ValidationError
|
|||
|
||||
# Project Imports
|
||||
from hyperglass.configuration import models
|
||||
from hyperglass.exceptions import ConfigError
|
||||
from hyperglass.exceptions import ConfigError, ConfigInvalid, ConfigMissing
|
||||
|
||||
# Project Directories
|
||||
working_dir = Path(__file__).resolve().parent
|
||||
|
|
@ -38,19 +38,20 @@ except FileNotFoundError:
|
|||
"Defaults will be used."
|
||||
)
|
||||
)
|
||||
except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error:
|
||||
raise ConfigError(error_msg=yaml_error) from None
|
||||
|
||||
# Import device configuration file
|
||||
try:
|
||||
with open(working_dir.joinpath("devices.yaml")) as devices_yaml:
|
||||
user_devices = yaml.safe_load(devices_yaml)
|
||||
except FileNotFoundError as no_devices_error:
|
||||
logger.error(no_devices_error)
|
||||
raise ConfigError(
|
||||
(
|
||||
f'"{working_dir.joinpath("devices.yaml")}" not found. '
|
||||
"Devices are required to start hyperglass, please consult "
|
||||
"the installation documentation."
|
||||
)
|
||||
)
|
||||
raise ConfigMissing(
|
||||
missing_item=str(working_dir.joinpath("devices.yaml"))
|
||||
) from None
|
||||
except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error:
|
||||
raise ConfigError(error_msg=yaml_error) from None
|
||||
|
||||
# Map imported user config files to expected schema:
|
||||
try:
|
||||
|
|
@ -69,9 +70,10 @@ try:
|
|||
except ValidationError as validation_errors:
|
||||
errors = validation_errors.errors()
|
||||
for error in errors:
|
||||
raise ConfigError(
|
||||
f'The value of {error["loc"][0]} field is invalid: {error["msg"]} '
|
||||
)
|
||||
raise ConfigInvalid(
|
||||
field=": ".join([str(item) for item in error["loc"]]),
|
||||
error_msg=error["msg"],
|
||||
) from None
|
||||
|
||||
# Logzero Configuration
|
||||
log_level = 20
|
||||
|
|
@ -116,7 +118,7 @@ class Networks:
|
|||
}
|
||||
]
|
||||
if not locations_dict:
|
||||
raise ConfigError("Unable to build network to device mapping")
|
||||
raise ConfigError(error_msg="Unable to build network to device mapping")
|
||||
return locations_dict
|
||||
|
||||
def networks_display(self):
|
||||
|
|
@ -132,7 +134,7 @@ class Networks:
|
|||
elif net_display not in locations_dict:
|
||||
locations_dict[net_display] = [router_params["display_name"]]
|
||||
if not locations_dict:
|
||||
raise ConfigError("Unable to build network to device mapping")
|
||||
raise ConfigError(error_msg="Unable to build network to device mapping")
|
||||
return [
|
||||
{"network_name": netname, "location_names": display_name}
|
||||
for (netname, display_name) in locations_dict.items()
|
||||
|
|
|
|||
|
|
@ -344,6 +344,7 @@ class Messages(BaseSettings):
|
|||
invalid_dual: str = "<b>{i}</b> is an invalid {qt}."
|
||||
general: str = "An error occurred."
|
||||
directed_cidr: str = "<b>{q}</b> queries can not be in CIDR format."
|
||||
request_timeout: str = "Request timed out."
|
||||
|
||||
|
||||
class Features(BaseSettings):
|
||||
|
|
|
|||
|
|
@ -2,35 +2,137 @@
|
|||
Custom exceptions for hyperglass
|
||||
"""
|
||||
|
||||
import string
|
||||
from typing import Union, List
|
||||
|
||||
from hyperglass.constants import code
|
||||
|
||||
|
||||
class HyperglassError(Exception):
|
||||
"""
|
||||
hyperglass base exception.
|
||||
hyperglass base exception
|
||||
"""
|
||||
|
||||
message: str = ""
|
||||
formatter: string.Formatter = string.Formatter()
|
||||
|
||||
def __init__(self, **kwargs: Union[str, int]) -> None:
|
||||
"""
|
||||
Exception arguments are accepted as kwargs, but a check is
|
||||
performed to ensure that all format string parameters are passed
|
||||
in, and no extras are given.
|
||||
"""
|
||||
self._kwargs = kwargs
|
||||
self._error_check()
|
||||
super().__init__(str(self))
|
||||
|
||||
def _error_check(self) -> None:
|
||||
required = set(
|
||||
arg for _, arg, _, _ in self.formatter.parse(self.message) if arg
|
||||
)
|
||||
given = set(self._kwargs.keys())
|
||||
|
||||
missing = required.difference(given)
|
||||
if missing:
|
||||
raise TypeError(
|
||||
"{name} missing requred arguments: {missing}".format(
|
||||
name=self.__class__.__name__, missing=missing
|
||||
)
|
||||
)
|
||||
|
||||
extra = given.difference(required)
|
||||
if extra:
|
||||
raise TypeError(
|
||||
"{name} given extra arguments: {extra}".format(
|
||||
name=self.__class__.__name__, extra=extra
|
||||
)
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.formatter.format(self.message, **self._kwargs)
|
||||
|
||||
def __getattr__(self, key: str) -> str:
|
||||
"""
|
||||
Any exception kwargs arguments are accessible by name on the
|
||||
object.
|
||||
"""
|
||||
remind = ""
|
||||
if "_kwargs" not in self.__dict__:
|
||||
remind = "(Did you forget to call super().__init__(**kwargs)?)"
|
||||
|
||||
elif key in self._kwargs:
|
||||
return self._kwargs[key]
|
||||
|
||||
raise AttributeError(
|
||||
"{name!r} object has no attribute {key!r} {remind}".format(
|
||||
name=self.__class__.__name__, key=key, remind=remind
|
||||
).strip()
|
||||
)
|
||||
|
||||
|
||||
class ConfigError(HyperglassError):
|
||||
"""
|
||||
Raised for user-inflicted configuration issues. Examples:
|
||||
- Fat fingered NOS in device definition
|
||||
- Used invalid type (str, int, etc.) in hyperglass.yaml
|
||||
Raised for generic user-config issues.
|
||||
"""
|
||||
|
||||
def __init__(self, message):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
return self.message
|
||||
message: str = "{error_msg}"
|
||||
|
||||
|
||||
class CantConnect(HyperglassError):
|
||||
def __init__(self, message):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
class ConfigInvalid(HyperglassError):
|
||||
"""Raised when a config item fails type or option validation"""
|
||||
|
||||
def __str__(self):
|
||||
return self.message
|
||||
message: str = 'The value field "{field}" is invalid: {error_msg}'
|
||||
|
||||
|
||||
class ConfigMissing(HyperglassError):
|
||||
"""
|
||||
Raised when a required config file or item is missing or undefined
|
||||
"""
|
||||
|
||||
message: str = (
|
||||
"{missing_item} is missing or undefined and is required to start "
|
||||
"hyperglass. Please consult the installation documentation."
|
||||
)
|
||||
|
||||
|
||||
class ScrapeError(HyperglassError):
|
||||
"""Raised upon a scrape/netmiko error"""
|
||||
|
||||
message: str = ""
|
||||
status: int = code.target_error
|
||||
|
||||
|
||||
class AuthError(HyperglassError):
|
||||
"""Raised when authentication to a device fails"""
|
||||
|
||||
message: str = ""
|
||||
status: int = code.target_error
|
||||
|
||||
|
||||
class RestError(HyperglassError):
|
||||
"""Raised upon a rest API client error"""
|
||||
|
||||
message: str = ""
|
||||
status: int = code.target_error
|
||||
|
||||
|
||||
class InputInvalid(HyperglassError):
|
||||
"""Raised when input validation fails"""
|
||||
|
||||
message: str = ""
|
||||
status: int = code.invalid
|
||||
keywords: List[str] = []
|
||||
|
||||
|
||||
class InputNotAllowed(HyperglassError):
|
||||
"""
|
||||
Raised when input validation fails due to a blacklist or
|
||||
requires_ipv6_cidr check
|
||||
"""
|
||||
|
||||
message: str = ""
|
||||
status: int = code.not_allowed
|
||||
keywords: List[str] = []
|
||||
|
||||
|
||||
class ParseError(HyperglassError):
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue