""" Accepts raw input data from execute.py, passes it through specific filters based on query type, returns validity boolean and specific error message. """ # Standard Library Imports import ipaddress import re # Third Party Imports from logzero import logger as log # Project Imports from hyperglass.configuration import logzero_config # noqa: F401 from hyperglass.configuration import params from hyperglass.exceptions import HyperglassError from hyperglass.exceptions import InputInvalid from hyperglass.exceptions import InputNotAllowed class IPType: """ Passes input through IPv4/IPv6 regex patterns to determine if input is formatted as a host (e.g. 192.0.2.1), or as CIDR (e.g. 192.0.2.0/24). is_host() and is_cidr() return a boolean. """ def __init__(self): self.ipv4_host = ( r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4]" r"[0-9]|[01]?[0-9][0-9]?)?$" ) self.ipv4_cidr = ( r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4]" r"[0-9]|[01]?[0-9][0-9]?)\/(3[0-2]|2[0-9]|1[0-9]|[0-9])?$" ) self.ipv6_host = ( r"^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:)" r"{1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}" r"(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}" r"|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA\-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:)" r"{1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})" r"|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]" r"{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]" r")\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:)" r"{1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|" r"1{0,1}[0-9]){0,1}[0-9]))?$" ) self.ipv6_cidr = ( r"^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|" r"([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:" r"[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|" r"([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}" r"(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:(" r"(:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}" r"|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.)" r"{3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((" r"25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}" r"[0-9]){0,1}[0-9]))\/((1(1[0-9]|2[0-8]))|([0-9][0-9])|([0-9]))?$" ) def is_host(self, target): """Tests input to see if formatted as host""" ip_version = ipaddress.ip_network(target).version state = False if ip_version == 4 and re.match(self.ipv4_host, target): log.debug(f"{target} is an IPv{ip_version} host.") state = True if ip_version == 6 and re.match(self.ipv6_host, target): log.debug(f"{target} is an IPv{ip_version} host.") state = True return state def is_cidr(self, target): """Tests input to see if formatted as CIDR""" ip_version = ipaddress.ip_network(target).version state = False if ip_version == 4 and re.match(self.ipv4_cidr, target): state = True if ip_version == 6 and re.match(self.ipv6_cidr, target): state = True return state def ip_validate(target): """Validates if input is a valid IP address""" try: valid_ip = ipaddress.ip_network(target) if valid_ip.is_reserved or valid_ip.is_unspecified or valid_ip.is_loopback: _exception = ValueError(params.messages.invalid_input) _exception.details = {} raise _exception except (ipaddress.AddressValueError, ValueError) as ip_error: log.debug(f"IP {target} is invalid") _exception = ValueError(ip_error) _exception.details = {} raise _exception return valid_ip def ip_access_list(query_data, device): """ Check VRF access list for matching prefixes, returns an error if a match is found. """ log.debug(f'Checking Access List for: {query_data["query_target"]}') def member_of(target, network): """ Returns boolean if an input target IP is a member of an input network. """ log.debug(f"Checking membership of {target} for {network}") membership = False if ( network.network_address <= target.network_address and network.broadcast_address >= target.broadcast_address # NOQA: W503 ): log.debug(f"{target} is a member of {network}") membership = True return membership target = ipaddress.ip_network(query_data["query_target"]) vrf_acl = None for vrf in device.vrfs: if vrf.name == query_data["query_vrf"]: vrf_acl = vrf.access_list if not vrf_acl: raise HyperglassError( message="Unable to match query VRF to any configured VRFs", alert="danger", keywords=[query_data["query_vrf"]], ) target_ver = target.version log.debug(f"Access List: {vrf_acl}") for ace in vrf_acl: for action, net in { a: n for a, n in ace.items() for ace in vrf_acl if n.version == target_ver }.items(): # If the target is a member of an allowed network, exit successfully. if member_of(target, net) and action == "allow": log.debug(f"{target} is specifically allowed") return target # If the target is a member of a denied network, return an error. elif member_of(target, net) and action == "deny": log.debug(f"{target} is specifically denied") _exception = ValueError(params.messages.acl_denied) _exception.details = {"denied_network": str(net)} raise _exception # Implicitly deny queries if an allow statement does not exist. log.debug(f"{target} is implicitly denied") _exception = ValueError(params.messages.acl_not_allowed) _exception.details = {"denied_network": ""} raise _exception def ip_attributes(target): """ Construct dictionary of validated IP attributes for repeated use. """ network = ipaddress.ip_network(target) addr = network.network_address ip_version = addr.version afi = f"ipv{ip_version}" afi_pretty = f"IPv{ip_version}" length = network.prefixlen return { "prefix": target, "network": network, "version": ip_version, "length": length, "afi": afi, "afi_pretty": afi_pretty, } def ip_type_check(query_type, target, device): """Checks multiple IP address related validation parameters""" prefix_attr = ip_attributes(target) log.debug(f"IP Attributes:\n{prefix_attr}") # If enable_max_prefix feature enabled, require that BGP Route # queries be smaller than configured size limit. if query_type == "bgp_route" and params.features.max_prefix.enable: max_length = getattr(params.features.max_prefix, prefix_attr["afi"]) if prefix_attr["length"] > max_length: log.debug("Failed max prefix length check") _exception = ValueError(params.messages.max_prefix) _exception.details = {"max_length": max_length} raise _exception # If device NOS is listed in requires_ipv6_cidr.toml, and query is # an IPv6 host address, return an error. if ( query_type == "bgp_route" and prefix_attr["version"] == 6 and device.nos in params.general.requires_ipv6_cidr and IPType().is_host(target) ): log.debug("Failed requires IPv6 CIDR check") _exception = ValueError(params.messages.requires_ipv6_cidr) _exception.details = {"device_name": device.display_name} raise _exception # If query type is ping or traceroute, and query target is in CIDR # format, return an error. if query_type in ("ping", "traceroute") and IPType().is_cidr(target): log.debug("Failed CIDR format for ping/traceroute check") _exception = ValueError(params.messages.directed_cidr) _exception.details = {"query_type": getattr(params.branding.text, query_type)} raise _exception return target class Validate: """ Accepts raw input and associated device parameters from execute.py and validates the input based on specific query type. Returns boolean for validity, specific error message, and status code. """ def __init__(self, device, query_data, target): """Initialize device parameters and error codes.""" self.device = device self.query_data = query_data self.query_type = self.query_data["query_type"] self.target = target def validate_ip(self): """Validates IPv4/IPv6 Input""" log.debug(f"Validating {self.query_type} query for target {self.target}...") # Perform basic validation of an IP address, return error if # not a valid IP. try: ip_validate(self.target) except ValueError as unformatted_error: raise InputInvalid( params.messages.invalid_input, target=self.target, query_type=getattr(params.branding.text, self.query_type), **unformatted_error.details, ) # If target is a not allowed, return an error. try: ip_access_list(self.query_data, self.device) except ValueError as unformatted_error: raise InputNotAllowed( str(unformatted_error), target=self.target, **unformatted_error.details ) # Perform further validation of a valid IP address, return an # error upon failure. try: ip_type_check(self.query_type, self.target, self.device) except ValueError as unformatted_error: raise InputNotAllowed( str(unformatted_error), target=self.target, **unformatted_error.details ) return self.target def validate_dual(self): """Validates Dual-Stack Input""" log.debug(f"Validating {self.query_type} query for target {self.target}...") if self.query_type == "bgp_community": # Validate input communities against configured or default regex # pattern. # Extended Communities, new-format if re.match(params.features.bgp_community.regex.extended_as, self.target): pass # Extended Communities, 32 bit format elif re.match(params.features.bgp_community.regex.decimal, self.target): pass # RFC 8092 Large Community Support elif re.match(params.features.bgp_community.regex.large, self.target): pass else: raise InputInvalid( params.messages.invalid_input, target=self.target, query_type=getattr(params.branding.text, self.query_type), ) elif self.query_type == "bgp_aspath": # Validate input AS_PATH regex pattern against configured or # default regex pattern. mode = params.features.bgp_aspath.regex.mode pattern = getattr(params.features.bgp_aspath.regex, mode) if re.match(pattern, self.target): pass else: raise InputInvalid( params.messages.invalid_input, target=self.target, query_type=getattr(params.branding.text, self.query_type), ) return self.target def validate_query(self): if self.query_type in ("bgp_community", "bgp_aspath"): return self.validate_dual() else: return self.validate_ip()