1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-01-17 08:48:05 +00:00

add sync api to external handler

This commit is contained in:
checktheroads 2020-04-19 00:22:01 -07:00
parent 21c636a273
commit ffbbfffe33
4 changed files with 173 additions and 35 deletions

View file

@ -6,9 +6,9 @@ from ipaddress import ip_network
# Project
from hyperglass.log import log
from hyperglass.util import get_network_info
from hyperglass.exceptions import InputInvalid, InputNotAllowed
from hyperglass.configuration import params
from hyperglass.external.ripestat import RIPEStat
def _member_of(target, network):
@ -138,7 +138,10 @@ def validate_ip(value, query_type, query_vrf): # noqa: C901
# query.
elif query_type in ("bgp_route",) and vrf_afi.force_cidr:
valid_ip = get_network_info(valid_ip.network_address).get("prefix")
with RIPEStat() as ripe:
valid_ip = ripe.network_info_sync(valid_ip.network_address).get(
"prefix"
)
# For a host query with bgp_route query type and force_cidr
# disabled, convert the host query to a single IP address.

View file

@ -5,6 +5,7 @@ import re
import json as _json
import asyncio
from json import JSONDecodeError
import socket
from socket import gaierror
# Third Party
@ -47,9 +48,14 @@ class BaseExternal:
self.uri_suffix = uri_suffix.strip("/")
self.verify_ssl = verify_ssl
self.timeout = timeout
self._session = httpx.AsyncClient(
verify=self.verify_ssl, base_url=self.base_url, timeout=self.timeout
)
session_args = {
"verify": self.verify_ssl,
"base_url": self.base_url,
"timeout": self.timeout,
}
self._session = httpx.Client(**session_args)
self._asession = httpx.AsyncClient(**session_args)
@classmethod
def __init_subclass__(cls, name=None, **kwargs):
@ -59,7 +65,7 @@ class BaseExternal:
async def __aenter__(self):
"""Test connection on entry."""
available = await self._test()
available = await self._atest()
if available:
log.debug("Initialized session with {}", self.base_url)
@ -71,9 +77,23 @@ class BaseExternal:
"""Close connection on exit."""
log.debug("Closing session with {}", self.base_url)
await self._session.aclose()
await self._asession.aclose()
return True
def __enter__(self):
available = self._test()
if available:
log.debug("Initialized session with {}", self.base_url)
return self
else:
raise self._exception(f"Unable to create session to {self.name}")
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if exc_type is not None:
log.error(traceback)
self._session.close()
def __repr__(self):
"""Return user friendly representation of instance."""
return make_repr(self)
@ -85,7 +105,22 @@ class BaseExternal:
return HyperglassError(message, str(level), **kwargs)
async def _test(self):
def _test(self):
"""Open a low-level connection to the base URL to ensure its port is open."""
log.debug("Testing connection to {}", self.base_url)
try:
test_host = re.sub(r"http(s)?\:\/\/", "", self.base_url)
socket.socket().connect((test_host, 443))
except gaierror as err:
raise self._exception(
f"{self.name} appears to be unreachable", err
) from None
return True
async def _atest(self):
"""Open a low-level connection to the base URL to ensure its port is open."""
log.debug("Testing connection to {}", self.base_url)
@ -103,20 +138,15 @@ class BaseExternal:
else:
return False
async def _request( # noqa: C901
self,
method,
endpoint,
item=None,
params=None,
data=None,
timeout=None,
response_required=False,
):
"""Run HTTP POST operation."""
def build_request(self, **kwargs):
from operator import itemgetter
supported_methods = ("GET", "POST", "PUT", "DELETE", "HEAD", "PATCH")
method, endpoint, item, params, data, timeout, response_required = itemgetter(
*kwargs.keys()
)(kwargs)
if method.upper() not in supported_methods:
raise self._exception(
f'Method must be one of {", ".join(supported_methods)}. '
@ -159,11 +189,32 @@ class BaseExternal:
)
request["timeout"] = timeout
log.debug("Constructed url {}", "".join((self.base_url, endpoint)))
log.debug("Constructed request parameters {}", request)
return request
async def _arequest( # noqa: C901
self,
method,
endpoint,
item=None,
params=None,
data=None,
timeout=None,
response_required=False,
):
"""Run HTTP POST operation."""
request = self.build_request(
method=method,
endpoint=endpoint,
item=item,
params=params,
data=data,
timeout=timeout,
response_required=response_required,
)
try:
response = await self._session.request(**request)
response = await self._asession.request(**request)
if response.status_code not in range(200, 300):
status = StatusCode(response.status_code)
@ -177,20 +228,74 @@ class BaseExternal:
return _parse_response(response)
async def _get(self, endpoint, **kwargs):
return await self._request(method="GET", endpoint=endpoint, **kwargs)
async def _aget(self, endpoint, **kwargs):
return await self._arequest(method="GET", endpoint=endpoint, **kwargs)
async def _post(self, endpoint, **kwargs):
return await self._request(method="POST", endpoint=endpoint, **kwargs)
async def _apost(self, endpoint, **kwargs):
return await self._arequest(method="POST", endpoint=endpoint, **kwargs)
async def _put(self, endpoint, **kwargs):
return await self._request(method="PUT", endpoint=endpoint, **kwargs)
async def _aput(self, endpoint, **kwargs):
return await self._arequest(method="PUT", endpoint=endpoint, **kwargs)
async def _delete(self, endpoint, **kwargs):
return await self._request(method="DELETE", endpoint=endpoint, **kwargs)
async def _adelete(self, endpoint, **kwargs):
return await self._arequest(method="DELETE", endpoint=endpoint, **kwargs)
async def _patch(self, endpoint, **kwargs):
return await self._request(method="PATCH", endpoint=endpoint, **kwargs)
async def _apatch(self, endpoint, **kwargs):
return await self._arequest(method="PATCH", endpoint=endpoint, **kwargs)
async def _head(self, endpoint, **kwargs):
return await self._request(method="HEAD", endpoint=endpoint, **kwargs)
async def _ahead(self, endpoint, **kwargs):
return await self._arequest(method="HEAD", endpoint=endpoint, **kwargs)
def _request( # noqa: C901
self,
method,
endpoint,
item=None,
params=None,
data=None,
timeout=None,
response_required=False,
):
"""Run HTTP POST operation."""
request = self.build_request(
method=method,
endpoint=endpoint,
item=item,
params=params,
data=data,
timeout=timeout,
response_required=response_required,
)
try:
response = self._session.request(**request)
if response.status_code not in range(200, 300):
status = StatusCode(response.status_code)
error = _parse_response(response)
raise self._exception(
f'{status.name.replace("_", " ")}: {error}', level="danger"
) from None
except httpx.HTTPError as http_err:
raise self._exception(parse_exception(http_err), level="danger") from None
return _parse_response(response)
def _get(self, endpoint, **kwargs):
return self._request(method="GET", endpoint=endpoint, **kwargs)
def _post(self, endpoint, **kwargs):
return self._request(method="POST", endpoint=endpoint, **kwargs)
def _put(self, endpoint, **kwargs):
return self._request(method="PUT", endpoint=endpoint, **kwargs)
def _delete(self, endpoint, **kwargs):
return self._request(method="DELETE", endpoint=endpoint, **kwargs)
def _patch(self, endpoint, **kwargs):
return self._request(method="PATCH", endpoint=endpoint, **kwargs)
def _head(self, endpoint, **kwargs):
return self._request(method="HEAD", endpoint=endpoint, **kwargs)

View file

@ -18,6 +18,36 @@ class RIPEStat(BaseExternal, name="RIPEStat"):
base_url="https://stat.ripe.net", uri_prefix="/data", uri_suffix="data.json"
)
def network_info_sync(self, resource, serialize=False):
"""Get network info via RIPE's Network Info API endpoint (synchronously).
See: https://stat.ripe.net/docs/data_api#network-info
"""
try:
valid_ip = ip_address(resource)
if not valid_ip.is_global:
log.debug("IP {ip} is not a global address", ip=str(valid_ip))
return {"prefix": None, "asn": None}
except ValueError:
log.debug("'{resource}' is not a valid IP address", resource=resource)
return {"prefix": None, "asn": None}
raw = self._get(endpoint="network-info", params={"resource": valid_ip})
data = {
"asns": raw["data"]["asns"],
"prefix": ip_network(raw["data"]["prefix"]),
}
if serialize:
data["prefix"] = str(data["prefix"])
data["asns"] = data["asns"][0]
log.debug("Collected network info from RIPEState: {i}", i=str(data))
return data
async def network_info(self, resource, serialize=False):
"""Get network info via RIPE's Network Info API endpoint.
@ -34,7 +64,7 @@ class RIPEStat(BaseExternal, name="RIPEStat"):
log.debug("'{resource}' is not a valid IP address", resource=resource)
return {"prefix": None, "asn": None}
raw = await self._get(endpoint="network-info", params={"resource": valid_ip})
raw = await self._aget(endpoint="network-info", params={"resource": valid_ip})
data = {
"asns": raw["data"]["asns"],

View file

@ -21,4 +21,4 @@ class SlackHook(BaseExternal, name="Slack"):
log.debug("Sending query data to Slack:\n{}", payload)
return await self._post(endpoint=provider.host.path, data=payload.slack())
return await self._apost(endpoint=provider.host.path, data=payload.slack())