diff --git a/hyperglass/command/validate.py b/hyperglass/command/validate.py index 5976a80..b1ff50b 100644 --- a/hyperglass/command/validate.py +++ b/hyperglass/command/validate.py @@ -85,10 +85,10 @@ def ip_validate(target): try: valid_ip = ipaddress.ip_network(target) if valid_ip.is_reserved or valid_ip.is_unspecified or valid_ip.is_loopback: - raise InputInvalid(target=target) + raise ValueError except (ipaddress.AddressValueError, ValueError): logger.debug(f"IP {target} is invalid") - raise InputInvalid(target=target) from None + raise ValueError return valid_ip @@ -117,7 +117,7 @@ def ip_blacklist(target): and blacklist_net.network_address >= target.broadcast_address ): logger.debug(f"Blacklist Match Found for {target} in {net}") - raise InputNotAllowed(target=target) from None + raise ValueError(params.messages.blacklist) return target @@ -146,22 +146,15 @@ def ip_type_check(query_type, target, device): prefix_attr = ip_attributes(target) logger.debug(f"IP Attributes:\n{prefix_attr}") - # If target is a member of the blacklist, return an error. - if ip_blacklist(target): - pass - # If enable_max_prefix feature enabled, require that BGP Route # queries be smaller than configured size limit. if query_type == "bgp_route" and params.features.max_prefix.enable: max_length = getattr(params.features.max_prefix, prefix_attr["afi"]) if prefix_attr["length"] > max_length: logger.debug("Failed max prefix length check") - raise InputNotAllowed( - target=target, - error_msg=params.features.max_prefixmessage.format( - m=max_length, i=prefix_attr["network"] - ), - ) + _exception = ValueError(params.messages.max_prefix) + _exception.details = {"max_length": params.features.max_prefix} + raise _exception # If device NOS is listed in requires_ipv6_cidr.toml, and query is # an IPv6 host address, return an error. @@ -172,19 +165,17 @@ def ip_type_check(query_type, target, device): and IPType().is_host(target) ): logger.debug("Failed requires IPv6 CIDR check") - raise InputInvalid( - target=target, - error_msg=params.messages.requires_ipv6_cidr.format(d=device.display_name), - ) + _exception = ValueError(params.messages.requires_ipv6_cidr) + _exception.details = {"device_name": device.location} + raise _exception # If query type is ping or traceroute, and query target is in CIDR # format, return an error. if query_type in ("ping", "traceroute") and IPType().is_cidr(target): logger.debug("Failed CIDR format for ping/traceroute check") - raise InputInvalid( - target=target, - error_msg=params.messages.directed_cidr.format(q=query_type.capitalize()), - ) + _exception = ValueError(params.messages.directed_cidr) + _exception.details = {} + raise _exception return target @@ -207,21 +198,43 @@ class Validate: # Perform basic validation of an IP address, return error if # not a valid IP. - if ip_validate(self.target): - pass + try: + ip_validate(self.target) + except ValueError as unformatted_error: + raise InputInvalid( + unformatted_error, + target=self.target, + query_type=getattr(params.branding.text, self.query_type), + **unformatted_error.details, + ) from None + + # If target is a member of the blacklist, return an error. + try: + ip_blacklist(self.target) + except ValueError as unformatted_error: + raise InputNotAllowed( + unformatted_error, target=self.target, **unformatted_error.details + ) # Perform further validation of a valid IP address, return an # error upon failure. - if ip_type_check(self.query_type, self.target, self.device): - pass + try: + ip_type_check(self.query_type, self.target, self.device) + except ValueError as unformatted_error: + raise InputNotAllowed( + unformatted_error, target=self.target, **unformatted_error.details + ) + return self.target def validate_dual(self): """Validates Dual-Stack Input""" logger.debug(f"Validating {self.query_type} query for target {self.target}...") - # Validate input communities against configured or default regex - # pattern. + if self.query_type == "bgp_community": + # Validate input communities against configured or default regex + # pattern. + # Extended Communities, new-format if re.match(params.features.bgp_community.regex.extended_as, self.target): pass @@ -232,16 +245,26 @@ class Validate: elif re.match(params.features.bgp_community.regex.large, self.target): pass else: - raise InputInvalid(target=self.target, query_type=self.query_type) + raise InputInvalid( + params.messages.invalid_input, + target=self.target, + query_type=getattr(params.branding.text, self.query_type), + ) + elif self.query_type == "bgp_aspath": # Validate input AS_PATH regex pattern against configured or # default regex pattern. + mode = params.features.bgp_aspath.regex.mode pattern = getattr(params.features.bgp_aspath.regex, mode) if re.match(pattern, self.target): pass else: - raise InputInvalid(target=self.target, query_type=self.query_type) + raise InputInvalid( + params.messages.invalid_input, + target=self.target, + query_type=getattr(params.branding.text, self.query_type), + ) return self.target def valdiate_query(self):