diff --git a/hyperglass/api/models/query.py b/hyperglass/api/models/query.py index e2e3698..60decbf 100644 --- a/hyperglass/api/models/query.py +++ b/hyperglass/api/models/query.py @@ -86,6 +86,17 @@ class Query(BaseModel): """Create SHA256 hash digest of model representation.""" return hashlib.sha256(repr(self).encode()).hexdigest() + @property + def summary(self): + """Create abbreviated representation of instance.""" + items = ( + f"query_location={self.query_location}", + f"query_type={self.query_type}", + f"query_vrf={self.query_vrf.name}", + f"query_target={str(self.query_target)}", + ) + return f'Query({", ".join(items)})' + @validator("query_type") def validate_query_type(cls, value): """Ensure query_type is enabled. diff --git a/hyperglass/api/routes.py b/hyperglass/api/routes.py index 2bb4add..38db7dc 100644 --- a/hyperglass/api/routes.py +++ b/hyperglass/api/routes.py @@ -5,13 +5,13 @@ import os import time # Third Party -import aredis from fastapi import HTTPException from starlette.requests import Request from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html # Project from hyperglass.util import log, clean_name, import_public_key +from hyperglass.cache import Cache from hyperglass.encode import jwt_decode from hyperglass.exceptions import HyperglassError from hyperglass.configuration import REDIS_CONFIG, params, devices @@ -19,14 +19,16 @@ from hyperglass.api.models.query import Query from hyperglass.execution.execute import Execute from hyperglass.api.models.cert_import import EncodedRequest -Cache = aredis.StrictRedis(db=params.cache.database, **REDIS_CONFIG) - APP_PATH = os.environ["hyperglass_directory"] async def query(query_data: Query, request: Request): """Ingest request data pass it to the backend application to perform the query.""" + # Initialize cache + cache = Cache(db=params.cache.database, **REDIS_CONFIG) + log.debug("Initialized cache {}", repr(cache)) + # Use hashed query_data string as key for for k/v cache store so # each command output value is unique. cache_key = query_data.digest() @@ -35,10 +37,13 @@ async def query(query_data: Query, request: Request): cache_timeout = params.cache.timeout log.debug(f"Cache Timeout: {cache_timeout}") + log.info(f"Starting query execution for query {query_data.summary}") # Check if cached entry exists - if not await Cache.get(cache_key): - log.debug(f"Created new cache key {cache_key} entry for query {query_data}") - log.debug("Beginning query execution...") + if not await cache.get(cache_key): + log.debug(f"No existing cache entry for query {cache_key}") + log.debug( + f"Created new cache key {cache_key} entry for query {query_data.summary}" + ) # Pass request to execution module starttime = time.time() @@ -51,16 +56,16 @@ async def query(query_data: Query, request: Request): raise HyperglassError(message=params.messages.general, alert="danger") # Create a cache entry - await Cache.set(cache_key, str(cache_value)) - await Cache.expire(cache_key, cache_timeout) + await cache.set(cache_key, str(cache_value)) + await cache.expire(cache_key, seconds=cache_timeout) log.debug(f"Added cache entry for query: {cache_key}") # If it does, return the cached entry - cache_response = await Cache.get(cache_key) + cache_response = await cache.get(cache_key) - log.debug(f"Cache match for: {cache_key}, returning cached entry") - log.debug(f"Cache Output: {cache_response}") + log.debug(f"Cache match for {cache_key}:\n {cache_response}") + log.success(f"Completed query execution for {query_data.summary}") return {"output": cache_response, "level": "success", "keywords": []} diff --git a/hyperglass/cache.py b/hyperglass/cache.py new file mode 100644 index 0000000..d7fb769 --- /dev/null +++ b/hyperglass/cache.py @@ -0,0 +1,112 @@ +"""Redis cache handler.""" + +# Standard Library +import time +import asyncio + +# Third Party +from aredis import StrictRedis + + +class Cache: + """Redis cache handler.""" + + def __init__( + self, db, host="localhost", port=6379, decode_responses=True, **kwargs + ): + """Initialize Redis connection.""" + self.db: int = db + self.host: str = host + self.port: int = port + self.instance: StrictRedis = StrictRedis( + db=self.db, + host=self.host, + port=self.port, + decode_responses=decode_responses, + **kwargs, + ) + + def __repr__(self): + """Represent class state.""" + return f"ConfigCache(db={self.db}, host={self.host}, port={self.port})" + + def __getitem__(self, item): + """Enable subscriptable syntax.""" + return self.get(item) + + @staticmethod + async def _parse_types(value): + """Parse a string to standard python types.""" + import re + + async def _parse_string(str_value): + + is_float = (re.compile(r"^(\d+\.\d+)$"), float) + is_int = (re.compile(r"^(\d+)$"), int) + is_bool = (re.compile(r"^(True|true|False|false)$"), bool) + is_none = (re.compile(r"^(None|none|null|nil|\(nil\))$"), lambda v: None) + + for pattern, factory in (is_float, is_int, is_bool, is_none): + if isinstance(str_value, str) and bool(re.match(pattern, str_value)): + str_value = factory(str_value) + break + return str_value + + if isinstance(value, str): + value = await _parse_string(value) + elif isinstance(value, bytes): + value = await _parse_string(value.decode("utf-8")) + elif isinstance(value, list): + value = [await _parse_string(i) for i in value] + elif isinstance(value, tuple): + value = tuple(await _parse_string(i) for i in value) + + return value + + async def get(self, *args): + """Get item(s) from cache.""" + if len(args) == 1: + raw = await self.instance.get(args[0]) + else: + raw = await self.instance.mget(args) + return await self._parse_types(raw) + + async def set(self, key, value): + """Set cache values.""" + return await self.instance.set(key, value) + + async def wait(self, pubsub, timeout=30, **kwargs): + """Wait for pub/sub messages & return posted message.""" + now = time.time() + timeout = now + timeout + while now < timeout: + message = await pubsub.get_message(ignore_subscribe_messages=True, **kwargs) + if message is not None and message["type"] == "message": + data = message["data"] + return await self._parse_types(data) + + await asyncio.sleep(0.01) + now = time.time() + return None + + async def pubsub(self): + """Provide an aredis.pubsub.Pubsub instance.""" + return self.instance.pubsub() + + async def pub(self, key, value): + """Publish a value.""" + await asyncio.sleep(1) + await self.instance.publish(key, value) + + async def clear(self): + """Clear the cache.""" + await self.instance.flushdb() + + async def delete(self, *keys): + """Delete a cache key.""" + await self.instance.delete(*keys) + + async def expire(self, *keys, seconds): + """Set timeout of key in seconds.""" + for key in keys: + await self.instance.expire(key, seconds) diff --git a/hyperglass/configuration/models/cache.py b/hyperglass/configuration/models/cache.py index 72f6ad7..429823f 100644 --- a/hyperglass/configuration/models/cache.py +++ b/hyperglass/configuration/models/cache.py @@ -18,7 +18,7 @@ class Cache(HyperglassModel): ) port: StrictInt = Field(6379, title="Port", description="Redis server TCP port.") database: StrictInt = Field( - 0, title="Database ID", description="Redis server database ID." + 1, title="Database ID", description="Redis server database ID." ) timeout: StrictInt = Field( 120, @@ -36,4 +36,3 @@ class Cache(HyperglassModel): title = "Cache" description = "Redis server & cache timeout configuration." - schema_extra = {"level": 2}