Improve bgp.tools data handling/validation

This commit is contained in:
thatmattlove 2021-09-16 16:46:56 -07:00
parent 6bdfa9e645
commit 446534d839
2 changed files with 76 additions and 22 deletions

View file

@ -6,8 +6,9 @@
# Standard Library
import re
import typing as t
import asyncio
from typing import Dict, List
from ipaddress import IPv4Address, IPv6Address, ip_address
# Project
from hyperglass.log import log
@ -17,8 +18,54 @@ DEFAULT_KEYS = ("asn", "ip", "prefix", "country", "rir", "allocated", "org")
CACHE_KEY = "hyperglass.external.bgptools"
TargetDetail = t.TypedDict(
"TargetDetail",
{"asn": str, "ip": str, "country": str, "rir": str, "allocated": str, "org": str},
)
def parse_whois(output: str, targets: List[str]) -> Dict[str, str]:
TargetData = t.Dict[str, TargetDetail]
def default_ip_targets(*targets: str) -> t.Tuple[TargetData, t.Tuple[str, ...]]:
"""Construct a mapping of default data and other data that should be queried.
Targets in the mapping don't need to be queried and already have default values. Targets in the
query tuple should be queried.
"""
default_data = {}
query = ()
for target in targets:
detail: TargetDetail = {k: "None" for k in DEFAULT_KEYS}
try:
valid: t.Union[IPv4Address, IPv6Address] = ip_address(target)
checks = (
(valid.version == 6 and valid.is_site_local, "Site Local Address"),
(valid.is_loopback, "Loopback Address"),
(valid.is_multicast, "Multicast Address"),
(valid.is_link_local, "Link Local Address"),
(valid.is_private, "Private Address"),
)
for exp, rir in checks:
if exp is True:
detail["rir"] = rir
break
should_query = any((valid.is_global, valid.is_unspecified, valid.is_reserved))
if not should_query:
detail["ip"] = str(target)
default_data[str(target)] = detail
elif should_query:
query += (str(target),)
except ValueError:
pass
return default_data, query
def parse_whois(output: str, targets: t.List[str]) -> TargetDetail:
"""Parse raw whois output from bgp.tools.
Sample output:
@ -56,7 +103,7 @@ def parse_whois(output: str, targets: List[str]) -> Dict[str, str]:
return data
async def run_whois(targets: List[str]) -> str:
async def run_whois(targets: t.List[str]) -> str:
"""Open raw socket to bgp.tools and execute query."""
# Construct bulk query
@ -86,27 +133,28 @@ async def run_whois(targets: List[str]) -> str:
return response.decode()
async def network_info(*targets: str) -> Dict[str, Dict[str, str]]:
async def network_info(*targets: str) -> TargetData:
"""Get ASN, Containing Prefix, and other info about an internet resource."""
targets = [str(t) for t in targets]
default_data, query_targets = default_ip_targets(*targets)
cache = use_state("cache")
# Set default data structure.
data = {t: {k: "" for k in DEFAULT_KEYS} for t in targets}
query_data = {t: {k: "" for k in DEFAULT_KEYS} for t in query_targets}
# Get all cached bgp.tools data.
cached = cache.get_map(CACHE_KEY) or {}
# Try to use cached data for each of the items in the list of
# resources.
for t in (t for t in targets if t in cached):
for target in (t for t in query_targets if t in cached):
# Reassign the cached network info to the matching resource.
data[t] = cached[t]
log.debug("Using cached network info for {}", t)
query_data[target] = cached[target]
log.debug("Using cached network info for {}", target)
# Remove cached items from the resource list so they're not queried.
targets = [t for t in targets if t not in cached]
targets = [t for t in query_targets if t not in cached]
try:
if targets:
@ -114,19 +162,19 @@ async def network_info(*targets: str) -> Dict[str, Dict[str, str]]:
if whoisdata:
# If the response is not empty, parse it.
data.update(parse_whois(whoisdata, targets))
query_data.update(parse_whois(whoisdata, targets))
# Cache the response
for t in targets:
cache.set_map_item(CACHE_KEY, t, data[t])
for target in targets:
cache.set_map_item(CACHE_KEY, target, query_data[target])
log.debug("Cached network info for {}", t)
except Exception as err:
log.error(str(err))
return data
return {**default_data, **query_data}
def network_info_sync(*targets: str) -> Dict[str, Dict[str, str]]:
def network_info_sync(*targets: str) -> TargetData:
"""Get ASN, Containing Prefix, and other info about an internet resource."""
return asyncio.run(network_info(*targets))

View file

@ -16,13 +16,19 @@ WHOIS_OUTPUT = """AS | IP | BGP Prefix | CC | Registry | Allocated | AS
# Ignore asyncio deprecation warning about loop
@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_network_info():
addr = "192.0.2.1"
info = asyncio.run(network_info(addr))
assert isinstance(info, dict)
assert "192.0.2.1" in info, "Address missing"
assert "asn" in info[addr], "ASN missing"
assert info[addr]["asn"] == "0", "Unexpected ASN"
assert info[addr]["rir"] == "Unknown", "Unexpected RIR"
checks = (
("192.0.2.1", {"asn": "None", "rir": "Private Address"}),
("127.0.0.1", {"asn": "None", "rir": "Loopback Address"}),
("fe80:dead:beef::1", {"asn": "None", "rir": "Link Local Address"}),
("2001:db8::1", {"asn": "None", "rir": "Private Address"}),
("1.1.1.1", {"asn": "13335", "rir": "ARIN"}),
)
for addr, fields in checks:
info = asyncio.run(network_info(addr))
assert addr in info
for key, expected in fields.items():
assert info[addr][key] == expected
# Ignore asyncio deprecation warning about loop