1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-01-22 02:08:07 +00:00
thatmattlove-hyperglass/hyperglass/external/_base.py
Jason Hall 830f300822 Upgraded tooling and testing
Due to changes in tooling from the originals used file formats have changed.

pnpm	10.10.0
rye	0.44.0
ruff	0.11.8

CI is now testing on a matrix of pnpm, node, and python versions. This
will hopefully cover edgecases where users are running various version.
Still needs update to use python version in matrix with `rye`.

Installs OS deps in workflow

Adds 'packages' key in workspace form pnpm 9

Makes testing for BaseExternal configurable

Adds redis and httpbin as service containers

ruff lint changed dictionary comprehensions

adds environment variables for httpbin

Fixes runner to docker communications
2025-05-13 17:55:56 -04:00

378 lines
13 KiB
Python

"""Session handler for external http data sources."""
# Standard Library
from urllib.parse import urlparse
import json as _json
import socket
import typing as t
from json import JSONDecodeError
from socket import gaierror
# Third Party
import httpx
# Project
from hyperglass.log import log
from hyperglass.util import parse_exception, repr_from_attrs
from hyperglass.settings import Settings
from hyperglass.constants import __version__
from hyperglass.models.fields import JsonValue, HttpMethod, Primitives
from hyperglass.exceptions.private import ExternalError
if t.TYPE_CHECKING:
# Standard Library
from types import TracebackType
# Project
from hyperglass.exceptions._common import ErrorLevel
from hyperglass.models.config.logging import Http
D = t.TypeVar("D", bound=t.Dict)
def _prepare_dict(_dict: D) -> D:
return _json.loads(_json.dumps(_dict, default=str))
class BaseExternal:
"""Base session handler."""
def __init__(
self,
base_url: str,
config: t.Optional["Http"] = None,
uri_prefix: str = "",
uri_suffix: str = "",
verify_ssl: bool = True,
timeout: int = 10,
parse: bool = True,
) -> None:
"""Initialize connection instance."""
self.__name__ = getattr(self, "name", "BaseExternal")
self.name = self.__name__
self.config = config
self.base_url = base_url.strip("/")
self.uri_prefix = uri_prefix.strip("/")
self.uri_suffix = uri_suffix.strip("/")
self.verify_ssl = verify_ssl
self.timeout = timeout
self.parse = parse
context = httpx.create_ssl_context(verify=verify_ssl)
if Settings.ca_cert is not None:
context.load_verify_locations(cafile=str(Settings.ca_cert))
client_kwargs = {
"base_url": self.base_url,
"timeout": self.timeout,
"verify": context,
}
self._session = httpx.Client(**client_kwargs)
self._asession = httpx.AsyncClient(**client_kwargs)
@classmethod
def __init_subclass__(
cls: "BaseExternal", name: t.Optional[str] = None, **kwargs: t.Any
) -> None:
"""Set correct subclass name."""
super().__init_subclass__(**kwargs)
cls.name = name or cls.__name__
async def __aenter__(self: "BaseExternal") -> "BaseExternal":
"""Test connection on entry."""
available = await self._atest()
if available:
log.bind(url=self.base_url).debug("Initialized session")
return self
raise self._exception(f"Unable to create session to {self.name}")
async def __aexit__(
self: "BaseExternal",
exc_type: t.Optional[t.Type[BaseException]] = None,
exc_value: t.Optional[BaseException] = None,
traceback: t.Optional["TracebackType"] = None,
) -> True:
"""Close connection on exit."""
log.bind(url=self.base_url).debug("Closing session")
if exc_type is not None:
log.error(str(exc_value))
await self._asession.aclose()
if exc_value is not None:
raise exc_value
return True
def __enter__(self: "BaseExternal") -> "BaseExternal":
"""Test connection on entry."""
available = self._test()
if available:
log.bind(url=self.base_url).debug("Initialized session")
return self
raise self._exception(f"Unable to create session to {self.name}")
def __exit__(
self: "BaseExternal",
exc_type: t.Optional[t.Type[BaseException]] = None,
exc_value: t.Optional[BaseException] = None,
exc_traceback: t.Optional["TracebackType"] = None,
) -> bool:
"""Close connection on exit."""
if exc_type is not None:
log.error(str(exc_value))
self._session.close()
if exc_value is not None:
raise exc_value
return True
def __repr__(self: "BaseExternal") -> str:
"""Return user friendly representation of instance."""
return repr_from_attrs(self, ("name", "base_url", "config", "parse"))
def _exception(
self: "BaseExternal",
message: str,
exc: t.Optional[BaseException] = None,
level: "ErrorLevel" = "warning",
**kwargs: t.Any,
) -> ExternalError:
"""Add stringified exception to message if passed."""
if exc is not None:
message = f"{message!s}: {exc!s}"
return ExternalError(message=message, level=level, **kwargs)
def _parse_response(self: "BaseExternal", response: httpx.Response) -> t.Any:
if self.parse:
parsed = {}
try:
parsed = response.json()
except JSONDecodeError:
try:
parsed = _json.loads(response)
except (JSONDecodeError, TypeError):
parsed = {"data": response.text}
else:
parsed = response
return parsed
def _test(self: "BaseExternal") -> bool:
"""Open a low-level connection to the base URL to ensure its port is open."""
log.bind(url=self.base_url).debug("Testing connection")
try:
# Parse out just the hostname from a URL string.
# E.g. `https://www.example.com` becomes `www.example.com`
parsed = urlparse(self.base_url)
host_parts = parsed.netloc.split(":")
try:
test_host, test_port = host_parts
test_port = int(test_port)
except ValueError:
test_host = host_parts[0]
match parsed.scheme:
case "http":
test_port = 80
case "https":
test_port = 443
# Create a generic socket object
test_socket = socket.socket()
# Try opening a low-level socket to make sure it's even
# listening on the port prior to trying to use it.
test_socket.connect((test_host, test_port))
# Properly shutdown & close the socket.
test_socket.shutdown(1)
test_socket.close()
except gaierror as err:
# Raised if the target isn't listening on the port
raise self._exception(
f"{self.name!r} appears to be unreachable at {self.base_url!r}", err
) from None
return True
async def _atest(self: "BaseExternal") -> bool:
"""Open a low-level connection to the base URL to ensure its port is open."""
return self._test()
def _build_request(self: "BaseExternal", **kwargs: t.Any) -> t.Dict[str, t.Any]:
"""Process requests parameters into structure usable by http library."""
# Standard Library
from operator import itemgetter
supported_methods = ("GET", "POST", "PUT", "DELETE", "HEAD", "PATCH")
(
method,
endpoint,
item,
headers,
params,
data,
timeout,
response_required,
) = itemgetter(*kwargs.keys())(kwargs)
if method.upper() not in supported_methods:
raise self._exception(
f"Method must be one of {', '.join(supported_methods)}. Got: {str(method)}"
)
endpoint = "/".join(
i
for i in (
"",
self.uri_prefix.strip("/"),
endpoint.strip("/"),
self.uri_suffix.strip("/"),
item,
)
if i
)
request = {
"method": method,
"url": endpoint,
"headers": {"user-agent": f"hyperglass/{__version__}"},
}
if headers is not None:
request.update({"headers": headers})
if params is not None:
params = {str(k): str(v) for k, v in params.items() if v is not None}
request["params"] = params
if data is not None:
if not isinstance(data, dict):
raise self._exception(f"Data must be a dict, got: {str(data)}")
request["json"] = _prepare_dict(data)
if timeout is not None:
if not isinstance(timeout, int):
try:
timeout = int(timeout)
except TypeError as err:
raise self._exception(f"Timeout must be an int, got: {str(timeout)}") from err
request["timeout"] = timeout
return request
async def _arequest( # noqa: C901
self: "BaseExternal",
method: HttpMethod,
endpoint: str,
item: t.Union[str, int, None] = None,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
response_required: bool = False,
) -> t.Any:
"""Run HTTP POST operation."""
request = self._build_request(
method=method,
endpoint=endpoint,
item=item,
headers=None,
params=params,
data=data,
timeout=timeout,
response_required=response_required,
)
try:
response = await self._asession.request(**request)
if response.status_code not in range(200, 300):
status = httpx.codes(response.status_code)
error = self._parse_response(response)
raise self._exception(
f"{status.name.replace('_', ' ')}: {error}", level="danger"
) from None
except httpx.HTTPError as http_err:
raise self._exception(parse_exception(http_err), level="danger") from None
return self._parse_response(response)
async def _aget(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return await self._arequest(method="GET", endpoint=endpoint, **kwargs)
async def _apost(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return await self._arequest(method="POST", endpoint=endpoint, **kwargs)
async def _aput(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return await self._arequest(method="PUT", endpoint=endpoint, **kwargs)
async def _adelete(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return await self._arequest(method="DELETE", endpoint=endpoint, **kwargs)
async def _apatch(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return await self._arequest(method="PATCH", endpoint=endpoint, **kwargs)
async def _ahead(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return await self._arequest(method="HEAD", endpoint=endpoint, **kwargs)
def _request( # noqa: C901
self: "BaseExternal",
method: HttpMethod,
endpoint: str,
item: t.Union[str, int, None] = None,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
response_required: bool = False,
) -> t.Any:
"""Run HTTP POST operation."""
request = self._build_request(
method=method,
endpoint=endpoint,
item=item,
headers=None,
params=params,
data=data,
timeout=timeout,
response_required=response_required,
)
try:
response = self._session.request(**request)
if response.status_code not in range(200, 300):
status = httpx.codes(response.status_code)
error = self._parse_response(response)
raise self._exception(
f"{status.name.replace('_', ' ')}: {error}", level="danger"
) from None
except httpx.HTTPError as http_err:
raise self._exception(parse_exception(http_err), level="danger") from None
return self._parse_response(response)
def _get(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return self._request(method="GET", endpoint=endpoint, **kwargs)
def _post(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return self._request(method="POST", endpoint=endpoint, **kwargs)
def _put(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return self._request(method="PUT", endpoint=endpoint, **kwargs)
def _delete(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return self._request(method="DELETE", endpoint=endpoint, **kwargs)
def _patch(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return self._request(method="PATCH", endpoint=endpoint, **kwargs)
def _head(self: "BaseExternal", endpoint: str, **kwargs: t.Any) -> t.Any:
return self._request(method="HEAD", endpoint=endpoint, **kwargs)