switch bgp.tools to bulk API with caching

This commit is contained in:
checktheroads 2020-07-13 01:55:09 -07:00
parent b5a67e7c0e
commit 1e4a2b1512

View file

@ -1,72 +1,77 @@
"""Query & parse data from bgp.tools."""
"""Query & parse data from bgp.tools.
- See https://bgp.tools/credits for acknowledgements and licensing.
- See https://bgp.tools/kb/api for query documentation.
"""
# Standard Library
import re
import socket
import asyncio
from typing import Dict, List
# Project
from hyperglass.log import log
from hyperglass.cache import SyncCache, AsyncCache
from hyperglass.configuration import REDIS_CONFIG, params
REPLACE_KEYS = {
"AS": "asn",
"IP": "ip",
"BGP Prefix": "prefix",
"CC": "country",
"Registry": "rir",
"Allocated": "allocated",
"AS Name": "org",
}
DEFAULT_KEYS = ("asn", "ip", "prefix", "country", "rir", "allocated", "org")
CACHE_KEY = "hyperglass.external.bgptools"
def parse_whois(output: str):
def parse_whois(output: str, targets: List[str]) -> Dict[str, str]:
"""Parse raw whois output from bgp.tools.
Sample output:
AS | IP | BGP Prefix | CC | Registry | Allocated | AS Name # noqa: E501
13335 | 1.1.1.1 | 1.1.1.0/24 | US | ARIN | 2010-07-14 | Cloudflare, Inc.
AS | IP | BGP Prefix | CC | Registry | Allocated | AS Name
13335 | 1.1.1.1 | 1.1.1.0/24 | US | ARIN | 2010-07-14 | Cloudflare, Inc.
"""
# Each new line is a row.
rawlines = output.split("\n")
def lines(raw):
"""Generate clean string values for each column."""
for r in (r for r in raw.split("\n") if r):
fields = (
re.sub(r"(\n|\r)", "", field).strip(" ") for field in r.split("|")
)
yield fields
lines = ()
for rawline in rawlines:
# Split each row into fields, separated by a pipe.
line = ()
rawfields = rawline.split("|")
for rawfield in rawfields:
# Remove newline and leading/trailing whitespaces.
field = re.sub(r"(\n|\r)", "", rawfield).strip(" ")
line += (field,)
lines += (line,)
headers = lines[0]
row = lines[1]
data = {}
for i, header in enumerate(headers):
# Try to replace bgp.tools key names with easier to parse key names
key = REPLACE_KEYS.get(header, header)
data.update({key: row[i]})
for line in lines(output):
# Unpack each line's parsed values.
asn, ip, prefix, country, rir, allocated, org = line
# Match the line to the item in the list of resources to query.
if ip in targets:
i = targets.index(ip)
data[targets[i]] = {
"asn": asn,
"ip": ip,
"prefix": prefix,
"country": country,
"rir": rir,
"allocated": allocated,
"org": org,
}
log.debug("Parsed bgp.tools data: {}", data)
return data
async def run_whois(resource: str):
async def run_whois(targets: List[str]) -> str:
"""Open raw socket to bgp.tools and execute query."""
# Construct bulk query
query = "\n".join(("begin", *targets, "end\n")).encode()
# Open the socket to bgp.tools
log.debug("Opening connection to bgp.tools")
reader, writer = await asyncio.open_connection("bgp.tools", port=43)
# Send the query
writer.write(str(resource).encode())
writer.write(query)
if writer.can_write_eof():
writer.write_eof()
await writer.drain()
@ -85,14 +90,17 @@ async def run_whois(resource: str):
return response.decode()
def run_whois_sync(resource: str):
def run_whois_sync(targets: List[str]) -> str:
"""Open raw socket to bgp.tools and execute query."""
# Construct bulk query
query = "\n".join(("begin", *targets, "end\n")).encode()
# Open the socket to bgp.tools
log.debug("Opening connection to bgp.tools")
sock = socket.socket()
sock.connect(("bgp.tools", 43))
sock.send(f"{resource}\n".encode())
sock.send(query)
# Read the response
response = b""
@ -110,19 +118,42 @@ def run_whois_sync(resource: str):
return response.decode()
async def network_info(resource: str):
async def network_info(*targets: str) -> Dict[str, Dict[str, str]]:
"""Get ASN, Containing Prefix, and other info about an internet resource."""
data = {v: "" for v in REPLACE_KEYS.values()}
targets = [str(t) for t in targets]
cache = AsyncCache(db=params.cache.database, **REDIS_CONFIG)
# Set default data structure.
data = {t: {k: "" for k in DEFAULT_KEYS} for t in targets}
# Get all cached bgp.tools data.
cached = await cache.get_dict(CACHE_KEY)
# Try to use cached data for each of the items in the list of
# resources.
for t in targets:
if t in cached:
# Reassign the cached network info to the matching resource.
data[t] = cached[t]
log.debug("Using cached network info for {}", t)
# Remove cached items from the resource list so they're not queried.
targets = [t for t in targets if t not in cached]
try:
if resource is not None:
whoisdata = await run_whois(resource)
if targets:
whoisdata = await run_whois(targets)
if whoisdata:
# If the response is not empty, parse it.
data = parse_whois(whoisdata)
data.update(parse_whois(whoisdata, targets))
# Cache the response
for t in targets:
await cache.set_dict(CACHE_KEY, t, data[t])
log.debug("Cached network info for {}", t)
except Exception as err:
log.error(str(err))
@ -130,19 +161,42 @@ async def network_info(resource: str):
return data
def network_info_sync(resource: str):
def network_info_sync(*targets: str) -> Dict[str, Dict[str, str]]:
"""Get ASN, Containing Prefix, and other info about an internet resource."""
data = {v: "" for v in REPLACE_KEYS.values()}
targets = [str(t) for t in targets]
cache = SyncCache(db=params.cache.database, **REDIS_CONFIG)
# Set default data structure.
data = {t: {k: "" for k in DEFAULT_KEYS} for t in targets}
# Get all cached bgp.tools data.
cached = cache.get_dict(CACHE_KEY)
# Try to use cached data for each of the items in the list of
# resources.
for t in targets:
if t in cached:
# Reassign the cached network info to the matching resource.
data[t] = cached[t]
log.debug("Using cached network info for {}", t)
# Remove cached items from the resource list so they're not queried.
targets = [t for t in targets if t not in cached]
try:
if resource is not None:
whoisdata = run_whois_sync(resource)
if targets:
whoisdata = run_whois_sync(targets)
if whoisdata:
# If the response is not empty, parse it.
data = parse_whois(whoisdata)
data.update(parse_whois(whoisdata, targets))
# Cache the response
for t in targets:
cache.set_dict(CACHE_KEY, t, data[t])
log.debug("Cached network info for {}", t)
except Exception as err:
log.error(str(err))