diff --git a/hyperglass/external/bgptools.py b/hyperglass/external/bgptools.py index b6c67bb..87b9c38 100644 --- a/hyperglass/external/bgptools.py +++ b/hyperglass/external/bgptools.py @@ -1,72 +1,77 @@ -"""Query & parse data from bgp.tools.""" +"""Query & parse data from bgp.tools. + +- See https://bgp.tools/credits for acknowledgements and licensing. +- See https://bgp.tools/kb/api for query documentation. +""" # Standard Library import re import socket import asyncio +from typing import Dict, List # Project from hyperglass.log import log +from hyperglass.cache import SyncCache, AsyncCache +from hyperglass.configuration import REDIS_CONFIG, params -REPLACE_KEYS = { - "AS": "asn", - "IP": "ip", - "BGP Prefix": "prefix", - "CC": "country", - "Registry": "rir", - "Allocated": "allocated", - "AS Name": "org", -} +DEFAULT_KEYS = ("asn", "ip", "prefix", "country", "rir", "allocated", "org") + +CACHE_KEY = "hyperglass.external.bgptools" -def parse_whois(output: str): +def parse_whois(output: str, targets: List[str]) -> Dict[str, str]: """Parse raw whois output from bgp.tools. Sample output: - AS | IP | BGP Prefix | CC | Registry | Allocated | AS Name # noqa: E501 - 13335 | 1.1.1.1 | 1.1.1.0/24 | US | ARIN | 2010-07-14 | Cloudflare, Inc. + AS | IP | BGP Prefix | CC | Registry | Allocated | AS Name + 13335 | 1.1.1.1 | 1.1.1.0/24 | US | ARIN | 2010-07-14 | Cloudflare, Inc. """ - # Each new line is a row. - rawlines = output.split("\n") + def lines(raw): + """Generate clean string values for each column.""" + for r in (r for r in raw.split("\n") if r): + fields = ( + re.sub(r"(\n|\r)", "", field).strip(" ") for field in r.split("|") + ) + yield fields - lines = () - for rawline in rawlines: - - # Split each row into fields, separated by a pipe. - line = () - rawfields = rawline.split("|") - - for rawfield in rawfields: - - # Remove newline and leading/trailing whitespaces. - field = re.sub(r"(\n|\r)", "", rawfield).strip(" ") - line += (field,) - - lines += (line,) - - headers = lines[0] - row = lines[1] data = {} - for i, header in enumerate(headers): - # Try to replace bgp.tools key names with easier to parse key names - key = REPLACE_KEYS.get(header, header) - data.update({key: row[i]}) + for line in lines(output): + + # Unpack each line's parsed values. + asn, ip, prefix, country, rir, allocated, org = line + + # Match the line to the item in the list of resources to query. + if ip in targets: + i = targets.index(ip) + data[targets[i]] = { + "asn": asn, + "ip": ip, + "prefix": prefix, + "country": country, + "rir": rir, + "allocated": allocated, + "org": org, + } log.debug("Parsed bgp.tools data: {}", data) return data -async def run_whois(resource: str): +async def run_whois(targets: List[str]) -> str: """Open raw socket to bgp.tools and execute query.""" + # Construct bulk query + query = "\n".join(("begin", *targets, "end\n")).encode() + # Open the socket to bgp.tools log.debug("Opening connection to bgp.tools") reader, writer = await asyncio.open_connection("bgp.tools", port=43) # Send the query - writer.write(str(resource).encode()) + writer.write(query) if writer.can_write_eof(): writer.write_eof() await writer.drain() @@ -85,14 +90,17 @@ async def run_whois(resource: str): return response.decode() -def run_whois_sync(resource: str): +def run_whois_sync(targets: List[str]) -> str: """Open raw socket to bgp.tools and execute query.""" + # Construct bulk query + query = "\n".join(("begin", *targets, "end\n")).encode() + # Open the socket to bgp.tools log.debug("Opening connection to bgp.tools") sock = socket.socket() sock.connect(("bgp.tools", 43)) - sock.send(f"{resource}\n".encode()) + sock.send(query) # Read the response response = b"" @@ -110,19 +118,42 @@ def run_whois_sync(resource: str): return response.decode() -async def network_info(resource: str): +async def network_info(*targets: str) -> Dict[str, Dict[str, str]]: """Get ASN, Containing Prefix, and other info about an internet resource.""" - data = {v: "" for v in REPLACE_KEYS.values()} + targets = [str(t) for t in targets] + cache = AsyncCache(db=params.cache.database, **REDIS_CONFIG) + + # Set default data structure. + data = {t: {k: "" for k in DEFAULT_KEYS} for t in targets} + + # Get all cached bgp.tools data. + cached = await cache.get_dict(CACHE_KEY) + + # Try to use cached data for each of the items in the list of + # resources. + for t in targets: + + if t in cached: + # Reassign the cached network info to the matching resource. + data[t] = cached[t] + log.debug("Using cached network info for {}", t) + + # Remove cached items from the resource list so they're not queried. + targets = [t for t in targets if t not in cached] try: - - if resource is not None: - whoisdata = await run_whois(resource) + if targets: + whoisdata = await run_whois(targets) if whoisdata: # If the response is not empty, parse it. - data = parse_whois(whoisdata) + data.update(parse_whois(whoisdata, targets)) + + # Cache the response + for t in targets: + await cache.set_dict(CACHE_KEY, t, data[t]) + log.debug("Cached network info for {}", t) except Exception as err: log.error(str(err)) @@ -130,19 +161,42 @@ async def network_info(resource: str): return data -def network_info_sync(resource: str): +def network_info_sync(*targets: str) -> Dict[str, Dict[str, str]]: """Get ASN, Containing Prefix, and other info about an internet resource.""" - data = {v: "" for v in REPLACE_KEYS.values()} + targets = [str(t) for t in targets] + cache = SyncCache(db=params.cache.database, **REDIS_CONFIG) + + # Set default data structure. + data = {t: {k: "" for k in DEFAULT_KEYS} for t in targets} + + # Get all cached bgp.tools data. + cached = cache.get_dict(CACHE_KEY) + + # Try to use cached data for each of the items in the list of + # resources. + for t in targets: + + if t in cached: + # Reassign the cached network info to the matching resource. + data[t] = cached[t] + log.debug("Using cached network info for {}", t) + + # Remove cached items from the resource list so they're not queried. + targets = [t for t in targets if t not in cached] try: - - if resource is not None: - whoisdata = run_whois_sync(resource) + if targets: + whoisdata = run_whois_sync(targets) if whoisdata: # If the response is not empty, parse it. - data = parse_whois(whoisdata) + data.update(parse_whois(whoisdata, targets)) + + # Cache the response + for t in targets: + cache.set_dict(CACHE_KEY, t, data[t]) + log.debug("Cached network info for {}", t) except Exception as err: log.error(str(err))