diff --git a/hyperglass/command/validate.py b/hyperglass/command/validate.py index a3a6051..b50d65c 100644 --- a/hyperglass/command/validate.py +++ b/hyperglass/command/validate.py @@ -37,25 +37,26 @@ class IPType: self.ipv6_host = ( r"^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:)" r"{1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}" - r"(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|" - r"([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA\-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}" - r"(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:(" - r"(:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::" - r"(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25" - r"[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]" - r"|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))?$" + r"(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}" + r"|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA\-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:)" + r"{1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})" + r"|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]" + r"{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]" + r")\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:)" + r"{1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|" + r"1{0,1}[0-9]){0,1}[0-9]))?$" ) self.ipv6_cidr = ( r"^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|" - r"([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]" - r"{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}" - r":){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}" - r"|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:" - r"(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|" - r"(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])" - r"|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25" - r"[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\/((1(1[0-9]|2[0-8]))|([0-9][0-9])|([0-9]" - r"))?$" + r"([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:" + r"[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|" + r"([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}" + r"(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:(" + r"(:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}" + r"|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.)" + r"{3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((" + r"25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}" + r"[0-9]){0,1}[0-9]))\/((1(1[0-9]|2[0-8]))|([0-9][0-9])|([0-9]))?$" ) def is_host(self, target): @@ -158,8 +159,8 @@ def ip_type_check(query_type, target, device): validity = False logger.debug(f"Failed blacklist check") return (validity, msg) - # If enable_max_prefix feature enabled, require that BGP Route queries be smaller than\ - # configured size limit. + # If enable_max_prefix feature enabled, require that BGP Route + # queries be smaller than configured size limit. if query_type == "bgp_route" and params.features.max_prefix.enable: max_length = getattr(params.features.max_prefix, prefix_attr["afi"]) if prefix_attr["length"] > max_length: @@ -169,8 +170,8 @@ def ip_type_check(query_type, target, device): ) logger.debug(f"Failed max prefix length check") return (validity, msg) - # If device NOS is listed in requires_ipv6_cidr.toml, and query is an IPv6 host address, \ - # return an error. + # If device NOS is listed in requires_ipv6_cidr.toml, and query is + # an IPv6 host address, return an error. if ( query_type == "bgp_route" and prefix_attr["version"] == 6 @@ -181,7 +182,8 @@ def ip_type_check(query_type, target, device): validity = False logger.debug(f"Failed requires IPv6 CIDR check") return (validity, msg) - # If query type is ping or traceroute, and query target is in CIDR format, return an error. + # If query type is ping or traceroute, and query target is in CIDR + # format, return an error. if query_type in ("ping", "traceroute") and IPType().is_cidr(target): msg = params.messages.directed_cidr.format(q=query_type.capitalize()) validity = False @@ -210,12 +212,14 @@ class Validate: validity = False msg = params.messages.invalid_ip.format(i=target) status = code.not_allowed - # Perform basic validation of an IP address, return error if not a valid IP. + # Perform basic validation of an IP address, return error if + # not a valid IP. if not ip_validate(target): status = code.invalid logger.error(f"{msg}, {status}") return (validity, msg, status) - # Perform further validation of a valid IP address, return an error upon failure. + # Perform further validation of a valid IP address, return an + # error upon failure. valid_query, msg = ip_type_check(query_type, target, self.device) if valid_query: validity = True @@ -232,12 +236,14 @@ class Validate: validity = False msg = params.messages.invalid_ip.format(i=target) status = code.not_allowed - # Perform basic validation of an IP address, return error if not a valid IP. + # Perform basic validation of an IP address, return error if + # not a valid IP. if not ip_validate(target): status = code.invalid logger.error(f"{msg}, {status}") return (validity, msg, status) - # Perform further validation of a valid IP address, return an error upon failure. + # Perform further validation of a valid IP address, return an + # error upon failure. valid_query, msg = ip_type_check(query_type, target, self.device) if valid_query: validity = True @@ -254,12 +260,14 @@ class Validate: validity = False msg = params.messages.invalid_ip.format(i=target) status = code.not_allowed - # Perform basic validation of an IP address, return error if not a valid IP. + # Perform basic validation of an IP address, return error if not + # a valid IP. if not ip_validate(target): status = code.invalid logger.error(f"{msg}, {status}") return (validity, msg, status) - # Perform further validation of a valid IP address, return an error upon failure. + # Perform further validation of a valid IP address, return an + # error upon failure. valid_query, msg = ip_type_check(query_type, target, self.device) if valid_query: validity = True @@ -277,7 +285,8 @@ class Validate: validity = False msg = params.messages.invalid_dual.format(i=target, qt="BGP Community") status = code.invalid - # Validate input communities against configured or default regex pattern + # Validate input communities against configured or default regex + # pattern. # Extended Communities, new-format if re.match(params.features.bgp_community.regex.extended_as, target): validity = True @@ -306,7 +315,8 @@ class Validate: validity = False msg = params.messages.invalid_dual.format(i=target, qt="AS Path") status = code.invalid - # Validate input AS_PATH regex pattern against configured or default regex pattern + # Validate input AS_PATH regex pattern against configured or + # default regex pattern. mode = getattr(params.features.bgp_aspath.regex, "mode") pattern = getattr(params.features.bgp_aspath.regex, mode) if re.match(pattern, target):