diff --git a/hyperglass/api/routes.py b/hyperglass/api/routes.py index 9ea46c9..a3cf268 100644 --- a/hyperglass/api/routes.py +++ b/hyperglass/api/routes.py @@ -2,7 +2,6 @@ # Standard Library import os -import json import time # Third Party @@ -11,10 +10,11 @@ from starlette.requests import Request from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html # Project -from hyperglass.log import log, query_hook -from hyperglass.util import adonothing, clean_name, get_network_info, import_public_key +from hyperglass.log import log +from hyperglass.util import clean_name, process_headers, import_public_key from hyperglass.cache import Cache from hyperglass.encode import jwt_decode +from hyperglass.external import Webhook, RIPEStat from hyperglass.exceptions import HyperglassError from hyperglass.configuration import REDIS_CONFIG, params, devices from hyperglass.api.models.query import Query @@ -23,39 +23,25 @@ from hyperglass.api.models.cert_import import EncodedRequest APP_PATH = os.environ["hyperglass_directory"] -if params.logging.http is not None and params.logging.http.enable: - log_query = query_hook -else: - log_query = adonothing - async def query(query_data: Query, request: Request): """Ingest request data pass it to the backend application to perform the query.""" - network_info = get_network_info(request.client.host, serialize=True) + headers = await process_headers(headers=request.headers) - header_keys = ( - "content-length", - "accept", - "user-agent", - "content-type", - "referer", - "accept-encoding", - "accept-language", - ) + async with RIPEStat() as ripe: + network_info = await ripe.network_info(request.client.host, serialize=True) - await log_query( - { - **json.loads(query_data.export_json()), - "headers": { - k: v for k, v in dict(request.headers).items() if k in header_keys + async with Webhook(params.logging.http.provider) as hook: + await hook.send( + query={ + **query_data.export_dict(), + "headers": headers, + "source": request.client.host, + "network": network_info, }, - "source": request.client.host, - "network": network_info, - }, - params.logging.http, - log, - ) + provider=params.logging.http, + ) # Initialize cache cache = Cache(db=params.cache.database, **REDIS_CONFIG) diff --git a/hyperglass/external/__init__.py b/hyperglass/external/__init__.py new file mode 100644 index 0000000..2140f09 --- /dev/null +++ b/hyperglass/external/__init__.py @@ -0,0 +1,5 @@ +"""Functions & handlers for external data.""" + +# Project +from hyperglass.external.ripestat import RIPEStat # noqa: F401 +from hyperglass.external.webhooks import Webhook # noqa: F401 diff --git a/hyperglass/external/_base.py b/hyperglass/external/_base.py new file mode 100644 index 0000000..ccf5a37 --- /dev/null +++ b/hyperglass/external/_base.py @@ -0,0 +1,196 @@ +"""Session handler for RIPEStat Data API.""" + +# Standard Library +import re +import json as _json +import asyncio +from json import JSONDecodeError +from socket import gaierror + +# Third Party +import httpx +from httpx.status_codes import StatusCode + +# Project +from hyperglass.log import log +from hyperglass.util import make_repr, parse_exception +from hyperglass.exceptions import HyperglassError + + +def _prepare_dict(_dict): + return _json.loads(_json.dumps(_dict, default=str)) + + +def _parse_response(response): + parsed = {} + try: + parsed = response.json() + except JSONDecodeError: + try: + parsed = _json.loads(response) + except JSONDecodeError: + log.error("Error parsing JSON for response {}", repr(response)) + parsed = {"data": response.text} + return parsed + + +class BaseExternal: + """Base session handler.""" + + def __init__( + self, base_url, uri_prefix="", uri_suffix="", verify_ssl=True, timeout=10, + ): + """Initialize connection instance.""" + self.__name__ = self.name + self.base_url = base_url.strip("/") + self.uri_prefix = uri_prefix.strip("/") + self.uri_suffix = uri_suffix.strip("/") + self.verify_ssl = verify_ssl + self.timeout = timeout + self._session = httpx.AsyncClient( + verify=self.verify_ssl, base_url=self.base_url, timeout=self.timeout + ) + + @classmethod + def __init_subclass__(cls, name=None, **kwargs): + """Set correct subclass name.""" + super().__init_subclass__(**kwargs) + cls.name = name or cls.__name__ + + async def __aenter__(self): + """Test connection on entry.""" + available = await self._test() + + if available: + log.debug("Initialized session with {}", self.base_url) + return self + else: + raise self._exception(f"Unable to create session to {self.name}") + + async def __aexit__(self, exc_type=None, exc_value=None, traceback=None): + """Close connection on exit.""" + log.debug("Closing session with {}", self.base_url) + + await self._session.aclose() + return True + + def __repr__(self): + """Return user friendly representation of instance.""" + return make_repr(self) + + def _exception(self, message, exc=None, level="warning", **kwargs): + """Add stringified exception to message if passed.""" + if exc is not None: + message = f"{str(message)}: {str(exc)}" + + return HyperglassError(message, str(level), **kwargs) + + async def _test(self): + """Open a low-level connection to the base URL to ensure its port is open.""" + log.debug("Testing connection to {}", self.base_url) + + try: + test_host = re.sub(r"http(s)?\:\/\/", "", self.base_url) + _reader, _writer = await asyncio.open_connection(test_host, 443) + + except gaierror as err: + raise self._exception( + f"{self.name} appears to be unreachable", err + ) from None + + if _reader or _writer: + return True + else: + return False + + async def _request( # noqa: C901 + self, + method, + endpoint, + item=None, + params=None, + data=None, + timeout=None, + response_required=False, + ): + """Run HTTP POST operation.""" + + supported_methods = ("GET", "POST", "PUT", "DELETE", "HEAD", "PATCH") + + if method.upper() not in supported_methods: + raise self._exception( + f'Method must be one of {", ".join(supported_methods)}. ' + f"Got: {str(method)}" + ) + + endpoint = "/".join( + i + for i in ( + "", + self.uri_prefix.strip("/"), + endpoint.strip("/"), + self.uri_suffix.strip("/"), + item, + ) + if i + ) + + request = { + "method": method, + "url": endpoint, + } + + if params is not None: + params = {str(k): str(v) for k, v in params.items() if v is not None} + request["params"] = params + + if data is not None: + if not isinstance(data, dict): + raise self._exception(f"Data must be a dict, got: {str(data)}") + request["json"] = _prepare_dict(data) + + if timeout is not None: + if not isinstance(timeout, int): + try: + timeout = int(timeout) + except TypeError: + raise self._exception( + f"Timeout must be an int, got: {str(timeout)}" + ) + request["timeout"] = timeout + + log.debug("Constructed url {}", "".join((self.base_url, endpoint))) + log.debug("Constructed request parameters {}", request) + + try: + response = await self._session.request(**request) + + if response.status_code not in range(200, 300): + status = StatusCode(response.status_code) + error = _parse_response(response) + raise self._exception( + f'{status.name.replace("_", " ")}: {error}', level="danger" + ) from None + + except httpx.HTTPError as http_err: + raise self._exception(parse_exception(http_err), level="danger") from None + + return _parse_response(response) + + async def _get(self, endpoint, **kwargs): + return await self._request(method="GET", endpoint=endpoint, **kwargs) + + async def _post(self, endpoint, **kwargs): + return await self._request(method="POST", endpoint=endpoint, **kwargs) + + async def _put(self, endpoint, **kwargs): + return await self._request(method="PUT", endpoint=endpoint, **kwargs) + + async def _delete(self, endpoint, **kwargs): + return await self._request(method="DELETE", endpoint=endpoint, **kwargs) + + async def _patch(self, endpoint, **kwargs): + return await self._request(method="PATCH", endpoint=endpoint, **kwargs) + + async def _head(self, endpoint, **kwargs): + return await self._request(method="HEAD", endpoint=endpoint, **kwargs) diff --git a/hyperglass/external/ripestat.py b/hyperglass/external/ripestat.py new file mode 100644 index 0000000..79b017f --- /dev/null +++ b/hyperglass/external/ripestat.py @@ -0,0 +1,49 @@ +"""Session handler for RIPEStat Data API.""" + +# Standard Library +from ipaddress import ip_address, ip_network + +# Project +from hyperglass.log import log +from hyperglass.external._base import BaseExternal + + +class RIPEStat(BaseExternal, name="RIPEStat"): + """RIPEStat session handler.""" + + def __init__(self): + """Initialize external base class with RIPEStat connection details.""" + + super().__init__( + base_url="https://stat.ripe.net", uri_prefix="/data", uri_suffix="data.json" + ) + + async def network_info(self, resource, serialize=False): + """Get network info via RIPE's Network Info API endpoint. + + See: https://stat.ripe.net/docs/data_api#network-info + """ + try: + valid_ip = ip_address(resource) + + if not valid_ip.is_global: + log.debug("IP {ip} is not a global address", ip=str(valid_ip)) + return {"prefix": None, "asn": None} + + except ValueError: + log.debug("'{resource}' is not a valid IP address", resource=resource) + return {"prefix": None, "asn": None} + + raw = await self._get(endpoint="network-info", params={"resource": valid_ip}) + + data = { + "asns": raw["data"]["asns"], + "prefix": ip_network(raw["data"]["prefix"]), + } + + if serialize: + data["prefix"] = str(data["prefix"]) + data["asns"] = data["asns"][0] + + log.debug("Collected network info from RIPEState: {i}", i=str(data)) + return data diff --git a/hyperglass/external/slack.py b/hyperglass/external/slack.py new file mode 100644 index 0000000..c4719d3 --- /dev/null +++ b/hyperglass/external/slack.py @@ -0,0 +1,24 @@ +"""Session handler for Slack API.""" + +# Project +from hyperglass.log import log +from hyperglass.models import Webhook +from hyperglass.external._base import BaseExternal + + +class SlackHook(BaseExternal, name="Slack"): + """Slack session handler.""" + + def __init__(self): + """Initialize external base class with Slack connection details.""" + + super().__init__(base_url="https://hooks.slack.com") + + async def send(self, query, provider): + """Send an incoming webhook to Slack.""" + + payload = Webhook(**query) + + log.debug("Sending query data to Slack:\n{}", payload) + + return await self._post(endpoint=provider.host.path, data=payload.slack()) diff --git a/hyperglass/external/webhooks.py b/hyperglass/external/webhooks.py new file mode 100644 index 0000000..5d9805b --- /dev/null +++ b/hyperglass/external/webhooks.py @@ -0,0 +1,24 @@ +"""Convenience functions for webhooks.""" + +# Project +from hyperglass.exceptions import HyperglassError +from hyperglass.external._base import BaseExternal +from hyperglass.external.slack import SlackHook + +PROVIDER_MAP = { + "slack": SlackHook, +} + + +class Webhook(BaseExternal): + """Get webhook for provider name.""" + + def __new__(cls, provider): + """Return instance for correct provider handler.""" + try: + provider_class = PROVIDER_MAP[provider] + return provider_class() + except KeyError: + raise HyperglassError( + f"{provider} is not yet supported as a webhook target." + ) diff --git a/hyperglass/log.py b/hyperglass/log.py index 8aab21c..611428a 100644 --- a/hyperglass/log.py +++ b/hyperglass/log.py @@ -100,31 +100,3 @@ def enable_syslog_logging(logger, syslog_host, syslog_port): p=str(syslog_port), ) return True - - -async def query_hook(query, http_logging, log): - """Log a query to an http server.""" - import httpx - - from hyperglass.models import Webhook - from hyperglass.util import parse_exception - - valid_webhook = Webhook(**query) - - format_map = {"generic": valid_webhook.export_dict, "slack": valid_webhook.slack} - format_func = format_map[http_logging.provider] - - async with httpx.AsyncClient(**http_logging.decoded()) as client: - payload = format_func() - log.debug("Sending query data to webhook:\n{}", payload) - try: - response = await client.post(str(http_logging.host), json=payload) - - if response.status_code not in range(200, 300): - log.error(f"{response.status_code} error: {response.text}") - - except httpx.HTTPError as err: - parsed = parse_exception(err) - log.error(parsed) - - return True diff --git a/hyperglass/models.py b/hyperglass/models.py index 1db5335..bd3b680 100644 --- a/hyperglass/models.py +++ b/hyperglass/models.py @@ -147,6 +147,8 @@ class WebhookHeaders(HyperglassModel): referer: Optional[StrictStr] accept_encoding: Optional[StrictStr] accept_language: Optional[StrictStr] + x_real_ip: Optional[StrictStr] + x_forwarded_for: Optional[StrictStr] class Config: """Pydantic model config.""" @@ -157,6 +159,8 @@ class WebhookHeaders(HyperglassModel): "content_type": "content-type", "accept_encoding": "accept-encoding", "accept_language": "accept-language", + "x_real_ip": "x-real-ip", + "x_forwarded_for": "x-forwarded-for", } diff --git a/hyperglass/util.py b/hyperglass/util.py index 5f24824..4b6c2b5 100644 --- a/hyperglass/util.py +++ b/hyperglass/util.py @@ -695,64 +695,6 @@ def parse_exception(exc): return ", caused by ".join(parsed) -def get_network_info(ip, serialize=False): - """Get containing prefix for an IP host query from RIPEstat API. - - Arguments: - valid_ip {IPv4Address|IPv6Address} -- Valid IP Address object - - Raises: - InputInvalid: Raised if an http error occurs - InputInvalid: Raised if RIPEstat response doesn't contain a prefix. - - Returns: - {IPv4Network|IPv6Network} -- Valid IP Network object - """ - import httpx - from ipaddress import ip_network, ip_address - from hyperglass.exceptions import InputInvalid - - log.debug("Attempting to find network details for {ip}", ip=str(ip)) - - try: - valid_ip = ip_address(ip) - if not valid_ip.is_global: - return {"prefix": None, "asn": None} - except ValueError: - return {"prefix": None, "asn": None} - - try: - response = httpx.get( - "https://stat.ripe.net/data/network-info/data.json", - params={"resource": str(valid_ip)}, - ) - except httpx.HTTPError as error: - msg = parse_exception(error) - raise InputInvalid(msg) - - network_info = response.json().get("data", {}) - - if not network_info.get("prefix", ""): - raise InputInvalid(f"{str(valid_ip)} has no containing prefix") - elif not network_info.get("asns", []): - raise InputInvalid(f"{str(valid_ip)} is not announced") - - log.debug( - "Network info for IP '{i}': Announced as '{p}' from AS {a}", - p=network_info.get("prefix"), - a=", ".join(network_info.get("asns")), - i=str(valid_ip), - ) - - if not serialize: - network_info["prefix"] = ip_network(network_info["prefix"]) - - if serialize: - network_info["asns"] = network_info["asns"][0] - - return network_info - - def set_cache_env(host, port, db): """Set basic cache config parameters to environment variables. @@ -782,11 +724,42 @@ def get_cache_env(): return host, port, db -def donothing(*args, **kwargs): - """Do nothing.""" - pass +async def process_headers(headers): + """Filter out unwanted headers and return as a dictionary.""" + headers = dict(headers) + header_keys = ( + "content-length", + "accept", + "user-agent", + "content-type", + "referer", + "accept-encoding", + "accept-language", + "x-real-ip", + "x-forwarded-for", + ) + return {k: headers.get(k) for k in header_keys} -async def adonothing(*args, **kwargs): - """Do nothing.""" - pass +def make_repr(_class): + """Create a user-friendly represention of an object.""" + from asyncio import iscoroutine + + def _process_attrs(_dir): + for attr in _dir: + if not attr.startswith("_"): + attr_val = getattr(_class, attr) + + if callable(attr_val): + yield f'{attr}=' + + elif iscoroutine(attr_val): + yield f'{attr}=' + + elif isinstance(attr_val, str): + yield f'{attr}="{attr_val}"' + + else: + yield f"{attr}={str(attr_val)}" + + return f'{_class.__name__}({", ".join(_process_attrs(dir(_class)))})'