forked from mirrors/thatmattlove-hyperglass
165 lines
4.8 KiB
Python
165 lines
4.8 KiB
Python
"""API Routes."""
|
|
|
|
# Standard Library
|
|
import time
|
|
import typing as t
|
|
from datetime import UTC, datetime
|
|
|
|
# Third Party
|
|
from litestar import Request, Response, get, post
|
|
from litestar.di import Provide
|
|
from litestar.background_tasks import BackgroundTask
|
|
|
|
# Project
|
|
from hyperglass.log import log
|
|
from hyperglass.state import HyperglassState
|
|
from hyperglass.exceptions import HyperglassError
|
|
from hyperglass.models.api import Query
|
|
from hyperglass.models.data import OutputDataModel
|
|
from hyperglass.util.typing import is_type
|
|
from hyperglass.execution.main import execute
|
|
from hyperglass.models.api.response import QueryResponse
|
|
from hyperglass.models.config.params import Params, APIParams
|
|
from hyperglass.models.config.devices import Devices, APIDevice
|
|
|
|
# Local
|
|
from .state import get_state, get_params, get_devices
|
|
from .tasks import send_webhook
|
|
from .fake_output import fake_output
|
|
|
|
__all__ = (
|
|
"device",
|
|
"devices",
|
|
"queries",
|
|
"info",
|
|
"query",
|
|
)
|
|
|
|
|
|
@get("/api/devices/{id:str}", dependencies={"devices": Provide(get_devices)})
|
|
async def device(devices: Devices, id: str) -> APIDevice:
|
|
"""Retrieve a device by ID."""
|
|
return devices[id].export_api()
|
|
|
|
|
|
@get("/api/devices", dependencies={"devices": Provide(get_devices)})
|
|
async def devices(devices: Devices) -> t.List[APIDevice]:
|
|
"""Retrieve all devices."""
|
|
return devices.export_api()
|
|
|
|
|
|
@get("/api/queries", dependencies={"devices": Provide(get_devices)})
|
|
async def queries(devices: Devices) -> t.List[str]:
|
|
"""Retrieve all directive names."""
|
|
return devices.directive_names()
|
|
|
|
|
|
@get("/api/info", dependencies={"params": Provide(get_params)})
|
|
async def info(params: Params) -> APIParams:
|
|
"""Retrieve looking glass parameters."""
|
|
return params.export_api()
|
|
|
|
|
|
@post("/api/query", dependencies={"_state": Provide(get_state)})
|
|
async def query(_state: HyperglassState, request: Request, data: Query) -> QueryResponse:
|
|
"""Ingest request data pass it to the backend application to perform the query."""
|
|
|
|
timestamp = datetime.now(UTC)
|
|
|
|
# Initialize cache
|
|
cache = _state.redis
|
|
|
|
# Use hashed `data` string as key for for k/v cache store so
|
|
# each command output value is unique.
|
|
cache_key = f"hyperglass.query.{data.digest()}"
|
|
|
|
_log = log.bind(query=data.summary())
|
|
|
|
_log.info("Starting query execution")
|
|
|
|
cache_response = cache.get_map(cache_key, "output")
|
|
json_output = False
|
|
cached = False
|
|
runtime = 65535
|
|
|
|
if cache_response:
|
|
_log.bind(cache_key=cache_key).debug("Cache hit")
|
|
|
|
# If a cached response exists, reset the expiration time.
|
|
cache.expire(cache_key, expire_in=_state.params.cache.timeout)
|
|
|
|
cached = True
|
|
runtime = 0
|
|
timestamp = cache.get_map(cache_key, "timestamp")
|
|
|
|
elif not cache_response:
|
|
_log.bind(cache_key=cache_key).debug("Cache miss")
|
|
|
|
timestamp = data.timestamp
|
|
|
|
starttime = time.time()
|
|
|
|
if _state.params.fake_output:
|
|
# Return fake, static data for development purposes, if enabled.
|
|
output = await fake_output(
|
|
query_type=data.query_type,
|
|
structured=data.device.structured_output or False,
|
|
)
|
|
else:
|
|
# Pass request to execution module
|
|
output = await execute(data)
|
|
|
|
endtime = time.time()
|
|
elapsedtime = round(endtime - starttime, 4)
|
|
_log.debug("Runtime: {!s} seconds", elapsedtime)
|
|
|
|
if output is None:
|
|
raise HyperglassError(message=_state.params.messages.general, alert="danger")
|
|
|
|
json_output = is_type(output, OutputDataModel)
|
|
|
|
if json_output:
|
|
raw_output = output.export_dict()
|
|
else:
|
|
raw_output = str(output)
|
|
|
|
cache.set_map_item(cache_key, "output", raw_output)
|
|
cache.set_map_item(cache_key, "timestamp", timestamp)
|
|
cache.expire(cache_key, expire_in=_state.params.cache.timeout)
|
|
|
|
_log.bind(cache_timeout=_state.params.cache.timeout).debug("Response cached")
|
|
|
|
runtime = int(round(elapsedtime, 0))
|
|
|
|
# If it does, return the cached entry
|
|
cache_response = cache.get_map(cache_key, "output")
|
|
|
|
json_output = is_type(cache_response, t.Dict)
|
|
response_format = "text/plain"
|
|
|
|
if json_output:
|
|
response_format = "application/json"
|
|
_log.info("Execution completed")
|
|
|
|
response = {
|
|
"output": cache_response,
|
|
"id": cache_key,
|
|
"cached": cached,
|
|
"runtime": runtime,
|
|
"timestamp": timestamp,
|
|
"format": response_format,
|
|
"random": data.random(),
|
|
"level": "success",
|
|
"keywords": [],
|
|
}
|
|
|
|
return Response(
|
|
response,
|
|
background=BackgroundTask(
|
|
send_webhook,
|
|
params=_state.params,
|
|
data=data,
|
|
request=request,
|
|
timestamp=timestamp,
|
|
),
|
|
)
|