From 4a1057651fc3e19aba5cb7cd140dc645f7d707a2 Mon Sep 17 00:00:00 2001 From: Wilhelm Schonfeldt Date: Sun, 5 Oct 2025 21:25:58 +0200 Subject: [PATCH] feat: comprehensive IP enrichment and traceroute improvements MAJOR ENHANCEMENTS: IP Enrichment Service (hyperglass/external/ip_enrichment.py): - Increase IXP data cache duration from 24 hours to 7 days (604800s) for better performance - Fix critical cache refresh logic: ensure_data_loaded() now properly checks expiry before using existing pickle files - Remove 'force' refresh parameters from public APIs and admin endpoints to prevent potential abuse/DDOS - Implement automatic refresh based on file timestamps and cache duration - Add comprehensive debug logging gated by Settings.debug throughout the module - Clean up verbose comments and improve code readability - Update configuration model to enforce 7-day minimum cache timeout MikroTik Traceroute Processing: - Refactor trace_route_mikrotik plugin to use garbage cleaner before structured parsing - Only log raw router output when Settings.debug is enabled to reduce log verbosity - Simplify MikrotikTracerouteTable parser to expect pre-cleaned input from garbage cleaner - Remove complex multi-table detection, format detection, and deduplication logic (handled by cleaner) - Add concise debug messages for processing decisions and configuration states Traceroute IP Enrichment (traceroute_ip_enrichment.py): - Implement concurrent reverse DNS lookups using asyncio.to_thread and asyncio.gather - Add async wrapper for reverse DNS with proper error handling and fallbacks - Significant performance improvement for multi-hop traceroutes (parallel vs sequential DNS) - Proper debug logging gates: only detailed logs when Settings.debug=True - Upgrade operational messages to log.info level (start/completion status) - Maintain compatibility with different event loop contexts and runtime environments Configuration Updates: - Update structured.ip_enrichment.cache_timeout default to 604800 seconds - Update documentation to reflect new cache defaults and behavior - Remove force refresh options from admin API endpoints MIGRATION NOTES: - Operators should ensure /etc/hyperglass/ip_enrichment directory is writable - Any code relying on force refresh parameters must be updated - Monitor logs for automatic refresh behavior and performance improvements - The 7-day cache significantly reduces PeeringDB API load PERFORMANCE BENEFITS: - Faster traceroute enrichment due to concurrent DNS lookups - Reduced external API calls with longer IXP cache duration - More reliable refresh logic prevents stale cache usage - Cleaner, more focused debug output when debug mode is disabled TECHNICAL DETAILS: - Uses asyncio.to_thread for non-blocking DNS operations - Implements process-wide file locking for safe concurrent cache updates - Robust fallbacks for various asyncio execution contexts - Maintains backward compatibility while improving performance FILES MODIFIED: - hyperglass/external/ip_enrichment.py - hyperglass/models/config/structured.py - hyperglass/api/routes.py - hyperglass/plugins/_builtin/trace_route_mikrotik.py - hyperglass/models/parsing/mikrotik.py - hyperglass/plugins/_builtin/traceroute_ip_enrichment.py - docs/pages/configuration/config/structured-output.mdx --- .../config/structured-output.mdx | 2 +- hyperglass/api/routes.py | 6 +- hyperglass/external/ip_enrichment.py | 572 +++++++----------- hyperglass/models/config/structured.py | 8 +- hyperglass/models/parsing/mikrotik.py | 399 +++--------- .../_builtin/mikrotik_garbage_output.py | 27 +- .../plugins/_builtin/trace_route_mikrotik.py | 122 ++-- .../_builtin/traceroute_ip_enrichment.py | 129 +++- 8 files changed, 482 insertions(+), 783 deletions(-) diff --git a/docs/pages/configuration/config/structured-output.mdx b/docs/pages/configuration/config/structured-output.mdx index ae53d9c..c9924e1 100644 --- a/docs/pages/configuration/config/structured-output.mdx +++ b/docs/pages/configuration/config/structured-output.mdx @@ -31,7 +31,7 @@ For devices with structured traceroute support (Arista EOS, FRRouting, Huawei VR | `structured.communities.mode` | String | deny | Use `deny` to deny any communities listed, `permit` to _only_ permit communities listed, or `name` to append friendly names. | | `structured.communities.items` | List of Strings | | List of communities to match (used by `deny` and `permit` modes). | | `structured.communities.names` | Dict | | Dictionary mapping BGP community codes to friendly names (used by `name` mode). | -| `structured.ip_enrichment.cache_timeout` | Integer | 86400 | Cache timeout in seconds for IP enrichment data (minimum 24 hours/86400 seconds). | +| `structured.ip_enrichment.cache_timeout` | Integer | 604800 | Cache timeout in seconds for IP enrichment data (minimum 7 days/604800 seconds). | | `structured.ip_enrichment.enrich_traceroute`| Boolean | true | When `structured:` is present, enable IP enrichment of traceroute hops (ASN, org, IXP). This must be true for enrichment to run. | | `structured.enable_for_traceroute`| Boolean | (when structured present) true | When `structured:` is present this controls whether the structured traceroute table output is shown. Set to false to force raw router output. | | `structured.enable_for_bgp_route`| Boolean | (when structured present) true | When `structured:` is present this controls whether the structured BGP route table output is shown. Set to false to force raw router output. | diff --git a/hyperglass/api/routes.py b/hyperglass/api/routes.py index a9ce00a..1a8292c 100644 --- a/hyperglass/api/routes.py +++ b/hyperglass/api/routes.py @@ -204,7 +204,7 @@ async def query(_state: HyperglassState, request: Request, data: Query) -> Query async def _bg_refresh(): try: - await refresh_ip_enrichment_data(force=False) + await refresh_ip_enrichment_data() except Exception as e: _log.debug("Background IP enrichment refresh failed: {}", e) @@ -337,7 +337,7 @@ async def ip_enrichment_status() -> dict: @post("/api/admin/ip-enrichment/refresh") -async def ip_enrichment_refresh(force: bool = False) -> dict: +async def ip_enrichment_refresh() -> dict: """Manually refresh IP enrichment data.""" try: from hyperglass.external.ip_enrichment import refresh_ip_enrichment_data @@ -357,7 +357,7 @@ async def ip_enrichment_refresh(force: bool = False) -> dict: # If config can't be read, proceed with refresh call and let it decide pass - success = await refresh_ip_enrichment_data(force=force) + success = await refresh_ip_enrichment_data() return { "success": success, "message": ( diff --git a/hyperglass/external/ip_enrichment.py b/hyperglass/external/ip_enrichment.py index 5e49fab..572b1f7 100644 --- a/hyperglass/external/ip_enrichment.py +++ b/hyperglass/external/ip_enrichment.py @@ -18,82 +18,68 @@ import socket from hyperglass.log import log from hyperglass.state import use_state +from hyperglass.settings import Settings -# Process-wide lock to coordinate downloads across worker processes. -# Uses an on-disk lock directory so separate processes don't simultaneously -# download enrichment data and cause rate limits. +# Process-wide lock to coordinate downloads across worker processes +_download_lock: t.Optional["_ProcessFileLock"] = None class _ProcessFileLock: - """Async-friendly, process-wide filesystem lock. - - Provides an async context manager that runs blocking mkdir/remove - operations in an executor so multiple processes can coordinate. - """ + """Async-friendly, process-wide filesystem lock.""" def __init__(self, lock_path: Path, timeout: int = 300, poll_interval: float = 0.1): self.lock_path = lock_path self.timeout = timeout self.poll_interval = poll_interval self._lock_dir: t.Optional[str] = None - # Small startup jitter (seconds) to reduce thundering herd on many - # worker processes starting at the same time. - self._startup_jitter = 0.25 + self._startup_jitter = 0.25 # Reduce thundering herd on startup def _acquire_blocking(self) -> None: - # Use atomic mkdir on a .lck directory as the lock primitive. + """Acquire lock using atomic mkdir.""" import os import random import json import shutil lock_dir = str(self.lock_path) + ".lck" - - # Small jitter before first attempt to reduce concurrent mkdirs time.sleep(random.uniform(0, self._startup_jitter)) start = time.time() + if Settings.debug: + log.debug(f"Attempting to acquire process lock {lock_dir}") + while True: try: - # Try to create the lock directory atomically; on success we - # hold the lock. If it exists, retry until timeout. - os.mkdir(lock_dir) + os.mkdir(lock_dir) # Atomic lock acquisition - # Write a small owner metadata file to help debugging stale locks + # Write metadata for debugging try: owner = {"pid": os.getpid(), "created": datetime.now().isoformat()} with open(os.path.join(lock_dir, "owner.json"), "w") as f: json.dump(owner, f) except Exception: - # Not critical; proceed even if writing metadata fails - pass + pass # Not critical self._lock_dir = lock_dir - log.debug(f"Acquired process lock {lock_dir} (pid={os.getpid()})") + if Settings.debug: + log.debug(f"Acquired process lock {lock_dir} (pid={os.getpid()})") return + except FileExistsError: - # If the lock appears stale (older than timeout), try cleanup. + # Check for stale locks try: owner_file = os.path.join(lock_dir, "owner.json") - mtime = None - if os.path.exists(owner_file): - mtime = os.path.getmtime(owner_file) - else: - mtime = os.path.getmtime(lock_dir) + mtime = os.path.getmtime(owner_file if os.path.exists(owner_file) else lock_dir) - # If owner file/dir mtime is older than timeout, remove it if (time.time() - mtime) >= self.timeout: - log.warning(f"Removing stale lock directory {lock_dir}") + if Settings.debug: + log.debug(f"Removing stale lock directory {lock_dir}") try: shutil.rmtree(lock_dir) except Exception: - # If we can't remove it, we'll continue to wait until - # the timeout is reached by this acquisition attempt. pass - # After attempted cleanup, loop and try mkdir again - continue + continue # Try again after cleanup except Exception: - # Ignore issues during stale-check and continue waiting pass if (time.time() - start) >= self.timeout: @@ -101,41 +87,43 @@ class _ProcessFileLock: time.sleep(self.poll_interval) def _release_blocking(self) -> None: + """Release the lock by removing the lock directory.""" import os import shutil + if not self._lock_dir: + return + try: - if self._lock_dir: + # Clean up metadata file + owner_file = os.path.join(self._lock_dir, "owner.json") + if os.path.exists(owner_file): try: - owner_file = os.path.join(self._lock_dir, "owner.json") - if os.path.exists(owner_file): - try: - os.remove(owner_file) - except Exception: - pass - - # Attempt to remove the directory. If it's empty, rmdir will - # succeed; if not, fall back to recursive removal as a best-effort. - try: - os.rmdir(self._lock_dir) - except Exception: - try: - shutil.rmtree(self._lock_dir) - except Exception: - log.debug(f"Failed to fully remove lock dir {self._lock_dir}") - - log.debug(f"Released process lock {self._lock_dir}") - self._lock_dir = None + os.remove(owner_file) except Exception: - # Best-effort; ignore errors removing the lock dir pass + + # Remove lock directory + try: + os.rmdir(self._lock_dir) + except Exception: + try: + shutil.rmtree(self._lock_dir) + except Exception: + if Settings.debug: + log.debug(f"Failed to remove lock dir {self._lock_dir}") + + if Settings.debug: + log.debug(f"Released process lock {self._lock_dir}") + self._lock_dir = None + except Exception: - # Nothing we can do on release failure + if Settings.debug: + log.debug(f"Error releasing lock {self._lock_dir}") pass async def __aenter__(self): loop = asyncio.get_running_loop() - # Run blocking acquire in executor await loop.run_in_executor(None, self._acquire_blocking) return self @@ -144,11 +132,7 @@ class _ProcessFileLock: await loop.run_in_executor(None, self._release_blocking) -# Instantiate a process-global lock file in the data dir. The data dir may not yet -# exist at import time; the constant path is defined below and we'll initialize -# the actual _download_lock after the paths are declared. (See below.) - -# Optional dependencies - graceful fallback if not available +# Optional dependencies try: import httpx except ImportError: @@ -161,24 +145,18 @@ except ImportError: log.warning("aiofiles not available - IP enrichment will use slower sync I/O") aiofiles = None -# File paths for persistent storage +# File paths and constants IP_ENRICHMENT_DATA_DIR = Path("/etc/hyperglass/ip_enrichment") IXP_PICKLE_FILE = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle" LAST_UPDATE_FILE = IP_ENRICHMENT_DATA_DIR / "last_update.txt" +DEFAULT_CACHE_DURATION = 7 * 24 * 60 * 60 # 7 days -# Cache duration (seconds). Default: 24 hours. Can be overridden in config. -DEFAULT_CACHE_DURATION = 24 * 60 * 60 - - -# Lazily-created process-wide download lock. Create this after the data -# directory is ensured to exist to avoid open() failing due to a missing -# parent directory and to ensure the lock file lives under the same path -# for all workers. +# Global download lock (initialized when data directory exists) _download_lock: t.Optional[_ProcessFileLock] = None def get_cache_duration() -> int: - """Get cache duration from config, ensuring minimum of 24 hours.""" + """Get cache duration from config, ensuring minimum of 7 days.""" try: from hyperglass.state import use_state @@ -190,46 +168,34 @@ def get_cache_duration() -> int: return DEFAULT_CACHE_DURATION -def should_refresh_data(force_refresh: bool = False) -> tuple[bool, str]: - """Decide whether to refresh IXP data. Only PeeringDB IXP prefixes are - considered relevant for startup refresh; BGP.tools bulk files are not used. - """ - if force_refresh: - return True, "Force refresh requested" - - # No persistent backoff marker; decide refresh purely by file age / config - # and any transient network errors will be handled by the downloader's - # retry logic. - - # If an IXP file exists, prefer it and do not perform automatic refreshes - # unless the caller explicitly requested a force refresh. - if IXP_PICKLE_FILE.exists() and not force_refresh: - return False, "ixp_data.json exists; skipping automatic refresh" - - # If IXP file is missing, refresh is needed +def should_refresh_data() -> tuple[bool, str]: + """Check if IXP data needs refreshing based on cache age.""" if not IXP_PICKLE_FILE.exists(): - return True, "No ixp_data.json present" + return True, "No ixp_data.pickle present" - # Otherwise check timestamp age try: with open(LAST_UPDATE_FILE, "r") as f: cached_time = datetime.fromisoformat(f.read().strip()) age_seconds = (datetime.now() - cached_time).total_seconds() cache_duration = get_cache_duration() + if age_seconds >= cache_duration: age_hours = age_seconds / 3600 - return True, f"Data expired (age: {age_hours:.1f}h, max: {cache_duration/3600:.1f}h)" + reason = f"Data expired (age: {age_hours:.1f}h, max: {cache_duration/3600:.1f}h)" + if Settings.debug: + log.debug(f"IP enrichment cache check: {reason}") + return True, reason + except Exception as e: - # If reading timestamp fails, prefer a refresh so we don't rely on stale data + if Settings.debug: + log.debug(f"Failed to read cache timestamp: {e}") return True, f"Failed to read timestamp: {e}" + if Settings.debug: + log.debug("IP enrichment cache is fresh") return False, "Data is fresh" -# validate_data_files removed - legacy BGP.tools bulk files are no longer used - - -# Simple result classes class IPInfo: """Result of IP lookup.""" @@ -274,23 +240,21 @@ class IPEnrichmentService: ) # (net_int, mask_bits, asn, cidr) self._lookup_optimized = False - # Combined cache for ultra-fast loading + # Runtime caches self._combined_cache: t.Optional[t.Dict[str, t.Any]] = None - # Per-IP in-memory cache for bgp.tools lookups: ip -> (asn, asn_name, prefix, expires_at) self._per_ip_cache: t.Dict[ str, t.Tuple[t.Optional[int], t.Optional[str], t.Optional[str], float] ] = {} - # Small in-memory cache for per-IP lookups to avoid repeated websocket - # queries during runtime. Maps ip_str -> (asn, asn_name, prefix) self._ip_cache: t.Dict[str, t.Tuple[t.Optional[int], t.Optional[str], t.Optional[str]]] = {} - # Lock to serialize data load so concurrent callers don't duplicate work self._ensure_lock = asyncio.Lock() def _optimize_lookups(self): """Convert IP networks to integer format for faster lookups.""" if self._lookup_optimized: return - log.debug("Optimizing IP lookup structures...") + + if Settings.debug: + log.debug("Optimizing IP lookup structures...") optimize_start = datetime.now() self._ipv4_networks = [] @@ -310,182 +274,82 @@ class IPEnrichmentService: self._ipv6_networks.sort(key=lambda x: x[1]) optimize_time = (datetime.now() - optimize_start).total_seconds() - log.debug( - f"Optimized lookups: {len(self._ipv4_networks)} IPv4, {len(self._ipv6_networks)} IPv6 (took {optimize_time:.2f}s)" - ) + if Settings.debug: + log.debug( + f"Optimized lookups: {len(self._ipv4_networks)} IPv4, {len(self._ipv6_networks)} IPv6 (took {optimize_time:.2f}s)" + ) self._lookup_optimized = True def _try_load_pickle(self) -> bool: - """Attempt to load the optimized pickle from disk without triggering downloads. - - This is a best-effort, non-blocking load used during runtime lookups so - we don't attempt network refreshes or acquire process locks while - serving user requests. - """ + """Best-effort load of IXP data from pickle without blocking.""" try: pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle" if pickle_path.exists(): - try: - with open(pickle_path, "rb") as f: - parsed = pickle.load(f) - if parsed and isinstance(parsed, list) and len(parsed) > 0: - self.ixp_networks = [ - (ip_address(net), prefixlen, name) for net, prefixlen, name in parsed - ] + with open(pickle_path, "rb") as f: + parsed = pickle.load(f) + if parsed and isinstance(parsed, list) and len(parsed) > 0: + self.ixp_networks = [ + (ip_address(net), prefixlen, name) for net, prefixlen, name in parsed + ] + if Settings.debug: log.debug( - "Loaded {} IXP prefixes from optimized pickle (non-blocking)", - len(self.ixp_networks), + f"Loaded {len(self.ixp_networks)} IXP prefixes from pickle (non-blocking)" ) - return True - except Exception as e: - log.debug("Non-blocking pickle load failed: {}", e) - except Exception: - pass + return True + except Exception as e: + if Settings.debug: + log.debug(f"Non-blocking pickle load failed: {e}") return False - async def ensure_data_loaded(self, force_refresh: bool = False) -> bool: - """Ensure data is loaded and fresh from persistent files. - - New behavior: only load PeeringDB IXP prefixes at startup. Do NOT bulk - download BGP.tools CIDR or ASN data. Per-IP ASN lookups will query the - bgp.tools API (websocket preferred) on-demand. - """ - - # Create data directory if it doesn't exist + async def ensure_data_loaded(self) -> bool: + """Ensure IXP data is loaded. Downloads from PeeringDB if needed.""" IP_ENRICHMENT_DATA_DIR.mkdir(parents=True, exist_ok=True) - # Lazily instantiate the process-wide download lock now that the - # data directory exists and is guaranteed to be the same path for - # all worker processes. + # Initialize download lock global _download_lock if _download_lock is None: _download_lock = _ProcessFileLock(IP_ENRICHMENT_DATA_DIR / "download.lock") - # Fast-path: if already loaded in memory, return immediately if self.ixp_networks: return True - # Serialize loads to avoid duplicate file reads when multiple callers - # call ensure_data_loaded concurrently. async with self._ensure_lock: - # Double-check after acquiring the lock if self.ixp_networks: return True - # Fast-path: if an optimized pickle exists and the caller did not - # request a forced refresh, load it (fastest). Fall back to the - # legacy JSON IXP file or downloads if the pickle is missing or - # invalid. This ensures the pickle is the preferred on-disk cache - # for faster startup. - try: - pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle" - if pickle_path.exists() and not force_refresh: - try: - with open(pickle_path, "rb") as f: - parsed = pickle.load(f) - if parsed and isinstance(parsed, list) and len(parsed) > 0: - self.ixp_networks = [ - (ip_address(net), prefixlen, name) - for net, prefixlen, name in parsed - ] - log.info( - f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (fast-path)" - ) - return True - else: - log.warning( - "Optimized pickle exists but appears empty or invalid; falling back to JSON/load or refresh" - ) - except Exception as e: - log.warning( - f"Failed to load optimized pickle {pickle_path}: {e}; falling back" - ) - except Exception: - # Non-fatal; continue to JSON/download logic - pass + # Check if refresh is needed first + should_refresh, reason = should_refresh_data() - # Immediate guard: if an optimized pickle exists on disk and the - # caller did not request a forced refresh, prefer it and skip any - # network downloads. This keeps startup fast by loading the already - # generated optimized mapping. - try: - pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle" - if pickle_path.exists() and not force_refresh: - try: - with open(pickle_path, "rb") as f: - parsed = pickle.load(f) - if parsed and isinstance(parsed, list) and len(parsed) > 0: - self.ixp_networks = [ - (ip_address(net), prefixlen, name) - for net, prefixlen, name in parsed - ] - log.info( - f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (early guard)" - ) - return True - else: - log.warning( - "Optimized pickle exists but appears empty or invalid; will attempt to refresh" - ) - except Exception as e: - log.warning( - f"Failed to read optimized pickle: {e}; will attempt to refresh" - ) - except Exception: - # Ignore filesystem errors and continue to refresh logic - pass - - # No operator raw-dump conversion: rely on endpoint JSON files (ixpfx.json, - # ixlan.json, ix.json) in the data directory or download them from - # PeeringDB when a refresh is required. Determine whether we should - # refresh based on the backoff marker / cache duration. - should_refresh, reason = should_refresh_data(force_refresh) - - # If an optimized pickle exists, prefer it and avoid downloads unless forced. - try: - pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle" - if pickle_path.exists(): + if not should_refresh: + # Try to load existing pickle file if data is fresh try: - st = pickle_path.stat() - size = getattr(st, "st_size", None) - except Exception: - size = None - - # If file size indicates non-empty file try to load - if size is not None and size > 0: - try: + pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle" + if pickle_path.exists(): with open(pickle_path, "rb") as f: parsed = pickle.load(f) - except Exception as e: - log.warning(f"Failed to parse existing optimized IXP pickle: {e}") - parsed = None + if parsed and isinstance(parsed, list) and len(parsed) > 0: + self.ixp_networks = [ + (ip_address(net), prefixlen, name) + for net, prefixlen, name in parsed + ] + if Settings.debug: + log.debug( + f"Loaded {len(self.ixp_networks)} IXP prefixes from pickle" + ) + return True + else: + if Settings.debug: + log.debug("Pickle exists but appears empty, will refresh") + should_refresh = True + reason = "Pickle file is empty" + except Exception as e: + if Settings.debug: + log.debug(f"Failed to load pickle: {e}") + should_refresh = True + reason = f"Failed to load pickle: {e}" - if parsed and isinstance(parsed, list) and len(parsed) > 0: - self.ixp_networks = [ - (ip_address(net), prefixlen, name) for net, prefixlen, name in parsed - ] - log.info( - f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (size={size})" - ) - return True - else: - log.warning( - "Existing optimized pickle appears empty or invalid (size={}) ; will attempt to refresh", - size, - ) - else: - log.debug( - f"Optimized pickle exists but size indicates empty or very small (size={size})" - ) - except Exception as e: - log.warning(f"Failed to load existing optimized IXP data: {e}") - - # If we're currently under a backoff or refresh is not required, skip downloading if not should_refresh: - # If the optimized pickle is missing but the raw PeeringDB JSON files - # are present and the last_update timestamp is still within the - # configured cache duration, attempt to build the optimized pickle - # from the existing JSON files instead of downloading. + # Try to build pickle from existing JSON files try: pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle" json_paths = [ @@ -496,73 +360,65 @@ class IPEnrichmentService: have_all_json = all(p.exists() for p in json_paths) if not pickle_path.exists() and have_all_json and LAST_UPDATE_FILE.exists(): - try: - with open(LAST_UPDATE_FILE, "r") as f: - cached_time = datetime.fromisoformat(f.read().strip()) - age_seconds = (datetime.now() - cached_time).total_seconds() - cache_duration = get_cache_duration() - if age_seconds < cache_duration: - log.info("Building optimized pickle from existing PeeringDB JSON files") - loop = asyncio.get_running_loop() - ok = await loop.run_in_executor(None, self._combine_peeringdb_files) - if ok and pickle_path.exists(): - # Load the generated pickle into memory - try: - with open(pickle_path, "rb") as f: - parsed = pickle.load(f) - self.ixp_networks = [ - (ip_address(net), prefixlen, name) - for net, prefixlen, name in parsed - ] - log.info( - "Loaded %d IXP prefixes from generated pickle", - len(self.ixp_networks), - ) - return True - except Exception as e: - log.warning(f"Failed to load generated pickle: {e}") - except Exception: - # If reading last_update fails, fall through to skipping refresh - pass + with open(LAST_UPDATE_FILE, "r") as f: + cached_time = datetime.fromisoformat(f.read().strip()) + age_seconds = (datetime.now() - cached_time).total_seconds() + cache_duration = get_cache_duration() + if age_seconds < cache_duration: + if Settings.debug: + log.debug("Building pickle from existing JSON files") + loop = asyncio.get_running_loop() + ok = await loop.run_in_executor(None, self._combine_peeringdb_files) + if ok and pickle_path.exists(): + with open(pickle_path, "rb") as f: + parsed = pickle.load(f) + self.ixp_networks = [ + (ip_address(net), prefixlen, name) + for net, prefixlen, name in parsed + ] + if Settings.debug: + log.debug( + f"Loaded {len(self.ixp_networks)} IXP prefixes from generated pickle" + ) + return True + except Exception as e: + if Settings.debug: + log.debug(f"Failed to build pickle from JSON: {e}") - except Exception: - # Non-fatal; proceed to skip refresh - pass - - log.info("Skipping IXP refresh: {}", reason) + if Settings.debug: + log.debug(f"Skipping IXP refresh: {reason}") return False # Acquire lock and refresh IXP list only async with _download_lock: - # Double-check in case another worker refreshed - try: - # Double-check: if another worker already refreshed the IXP file - # while we were waiting for the lock, load it regardless of the - # general should_refresh flag. - if IXP_PICKLE_FILE.exists(): + # Double-check: if another worker already refreshed the IXP file + # while we were waiting for the lock, verify it's actually fresh + if IXP_PICKLE_FILE.exists(): + # Re-check if data is still expired after waiting for lock + should_refresh_postlock, reason_postlock = should_refresh_data() + if not should_refresh_postlock: try: with open(IXP_PICKLE_FILE, "rb") as f: parsed = pickle.load(f) + if parsed and isinstance(parsed, list) and len(parsed) > 0: + self.ixp_networks = [ + (ip_address(net), prefixlen, name) + for net, prefixlen, name in parsed + ] + if Settings.debug: + log.debug( + f"Another worker refreshed data while waiting for lock: {reason_postlock}" + ) + log.info( + f"Loaded {len(self.ixp_networks)} IXP prefixes from fresh pickle (post-lock)" + ) + return True except Exception as e: - log.warning( - f"Existing optimized pickle is invalid after lock wait: {e}; will attempt to refresh" - ) - parsed = None - - if not parsed or (isinstance(parsed, list) and len(parsed) == 0): - log.warning( - "Existing optimized pickle is empty after lock wait; will attempt to refresh", - ) - else: - self.ixp_networks = [ - (ip_address(net), prefixlen, name) for net, prefixlen, name in parsed - ] - log.info( - f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (post-lock)" - ) - return True - except Exception: - pass + if Settings.debug: + log.debug(f"Failed to load post-lock pickle: {e}") + else: + if Settings.debug: + log.debug(f"Data still expired after lock wait: {reason_postlock}") if not httpx: log.error("httpx not available: cannot download PeeringDB prefixes") @@ -1142,103 +998,93 @@ class IPEnrichmentService: async def lookup_ip(self, ip_str: str) -> IPInfo: """Lookup an IP address and return ASN or IXP information.""" - # Try to load IXP data, but continue even if the load fails. We still - # want to perform on-demand bgp.tools lookups for IPs when local data - # is missing; failing to load the IXP file should not prevent remote - # lookups. try: if not self.ixp_networks: - # Attempt a non-blocking pickle load only; don't trigger - # downloads or acquire locks while handling requests. self._try_load_pickle() except Exception: - log.debug("Non-blocking data load failed; continuing with on-demand lookups") + if Settings.debug: + log.debug("Non-blocking data load failed; continuing with on-demand lookups") - # Ensure lookup optimization is done self._optimize_lookups() - log.debug( - f"Looking up IP {ip_str} - have {len(self.cidr_networks)} CIDR entries, {len(self.asn_info)} ASN entries" - ) + if Settings.debug: + log.debug( + f"Looking up IP {ip_str} - have {len(self.cidr_networks)} CIDR entries, {len(self.asn_info)} ASN entries" + ) try: target_ip = ip_address(ip_str) except ValueError: - log.debug(f"Invalid IP address: {ip_str}") + if Settings.debug: + log.debug(f"Invalid IP address: {ip_str}") return IPInfo(ip_str) - # Check if it's a private/reserved/loopback address + # Check private/reserved addresses if target_ip.is_private or target_ip.is_reserved or target_ip.is_loopback: - log.debug(f"IP {ip_str} is in private/reserved range - returning AS0 'Private'") + if Settings.debug: + log.debug(f"IP {ip_str} is in private/reserved range") return IPInfo(ip_str, asn=0, asn_name="Private", prefix="Private Network") - # First check IXP networks (more specific usually) + # Check IXP networks first for net_addr, prefixlen, ixp_name in self.ixp_networks: try: network = ip_network(f"{net_addr}/{prefixlen}", strict=False) if target_ip in network: - log.debug(f"Found IXP match for {ip_str}: {ixp_name}") + if Settings.debug: + log.debug(f"Found IXP match for {ip_str}: {ixp_name}") return IPInfo(ip_str, is_ixp=True, ixp_name=ixp_name) except Exception: continue - # Fast integer-based lookup for ASN + # Fast integer-based ASN lookup target_int = int(target_ip) if isinstance(target_ip, IPv4Address): - # Use optimized IPv4 lookup for net_int, mask_bits, asn, cidr_string in self._ipv4_networks: if (target_int >> mask_bits) == (net_int >> mask_bits): asn_data = self.asn_info.get(asn, {}) asn_name = asn_data.get("name", f"AS{asn}") country = asn_data.get("country", "") - log.debug( - f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}" - ) + if Settings.debug: + log.debug( + f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}" + ) return IPInfo( ip_str, asn=asn, asn_name=asn_name, prefix=cidr_string, country=country ) - # Not found in local tables - do an on-demand query to bgp.tools - try: - asn, asn_name, prefix = await self._query_bgp_tools_for_ip(ip_str) - if asn: - # Update asn_info cache (best-effort) - try: - self.asn_info[int(asn)] = {"name": asn_name or f"AS{asn}", "country": ""} - except Exception: - pass - return IPInfo(ip_str, asn=asn, asn_name=asn_name, prefix=prefix) - except Exception: - pass - # Not found locally - try one-off query - try: - asn, asn_name, prefix = asyncio.get_event_loop().run_until_complete( - self._query_bgp_tools_for_ip(ip_str) - ) - if asn: - try: - self.asn_info[int(asn)] = {"name": asn_name or f"AS{asn}", "country": ""} - except Exception: - pass - return IPInfo(ip_str, asn=asn, asn_name=asn_name, prefix=prefix) - except Exception: - pass else: - # Use optimized IPv6 lookup for net_int, mask_bits, asn, cidr_string in self._ipv6_networks: if (target_int >> mask_bits) == (net_int >> mask_bits): asn_data = self.asn_info.get(asn, {}) asn_name = asn_data.get("name", f"AS{asn}") country = asn_data.get("country", "") - log.debug( - f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}" - ) + if Settings.debug: + log.debug( + f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}" + ) return IPInfo( ip_str, asn=asn, asn_name=asn_name, prefix=cidr_string, country=country ) - # No match found - return AS0 with "Unknown" to indicate missing data - log.debug(f"No enrichment data found for {ip_str} - returning AS0 'Unknown'") + # Query bgp.tools for unknown IPs + try: + asn, asn_name, prefix = await self._query_bgp_tools_for_ip(ip_str) + if asn: + # Cache result + try: + self.asn_info[int(asn)] = {"name": asn_name or f"AS{asn}", "country": ""} + except Exception: + pass + if Settings.debug: + log.debug(f"BGP.tools lookup for {ip_str}: AS{asn} ({asn_name})") + return IPInfo(ip_str, asn=asn, asn_name=asn_name, prefix=prefix) + except Exception as e: + if Settings.debug: + log.debug(f"BGP.tools lookup failed for {ip_str}: {e}") + + # No match found + if Settings.debug: + log.debug(f"No enrichment data found for {ip_str}") return IPInfo(ip_str, asn=0, asn_name="Unknown") async def lookup_asn_name(self, asn: int) -> str: @@ -1447,9 +1293,9 @@ async def lookup_asns_bulk(asns: t.List[t.Union[str, int]]) -> t.Dict[str, t.Dic return results -async def refresh_ip_enrichment_data(force: bool = False) -> bool: +async def refresh_ip_enrichment_data() -> bool: """Manually refresh IP enrichment data.""" - log.info(f"Manual refresh requested (force={force})") + log.info("Manual refresh requested") # Respect configuration: if IP enrichment is disabled, do not attempt # to refresh or download PeeringDB data. This prevents manual or UI- # triggered refreshes from hitting the network when the feature is @@ -1470,7 +1316,7 @@ async def refresh_ip_enrichment_data(force: bool = False) -> bool: # avoid silently ignoring an admin's request. pass - return await _service.ensure_data_loaded(force_refresh=force) + return await _service.ensure_data_loaded() def get_data_status() -> dict: diff --git a/hyperglass/models/config/structured.py b/hyperglass/models/config/structured.py index 63c86b3..3ed9685 100644 --- a/hyperglass/models/config/structured.py +++ b/hyperglass/models/config/structured.py @@ -46,13 +46,13 @@ class StructuredIpEnrichment(HyperglassModel): to explicitly disable them. """ - cache_timeout: int = 86400 # 24 hours in seconds (minimum) + cache_timeout: int = 604800 # 7 days in seconds (minimum) @field_validator("cache_timeout") def validate_cache_timeout(cls, value: int) -> int: - """Ensure cache timeout is at least 24 hours (86400 seconds).""" - if value < 86400: - return 86400 + """Ensure cache timeout is at least 7 days (604800 seconds).""" + if value < 604800: + return 604800 return value enrich_traceroute: bool = True diff --git a/hyperglass/models/parsing/mikrotik.py b/hyperglass/models/parsing/mikrotik.py index 82517c4..b440cda 100644 --- a/hyperglass/models/parsing/mikrotik.py +++ b/hyperglass/models/parsing/mikrotik.py @@ -320,374 +320,117 @@ class MikrotikTracerouteTable(MikrotikBase): @classmethod def parse_text(cls, text: str, target: str, source: str) -> "MikrotikTracerouteTable": - """Parse MikroTik traceroute output. + """Parse a cleaned MikroTik traceroute table. - MikroTik shows multiple complete tables over time as it builds the traceroute: + The input is expected to be a single, clean traceroute table that has already + been processed by the garbage cleaner to remove paging artifacts and duplicates. - Columns: ADDRESS, LOSS, SENT, LAST, AVG, BEST, WORST, STD-DEV - # ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV - 1 10.0.0.41 0% 1 0.5ms 0.5 0.5 0.5 0 - 2 185.73.201.193 0% 1 0.4ms 0.4 0.4 0.4 0 - 3 46.31.76.111 0% 1 0.5ms 0.5 0.5 0.5 0 - 4 0% 1 0ms - -- [Q quit|C-z pause] - Columns: ADDRESS, LOSS, SENT, LAST, AVG, BEST, WORST, STD-DEV - # ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV - 1 10.0.0.41 0% 1 0.5ms 0.5 0.5 0.5 0 - 2 185.73.201.193 0% 1 0.4ms 0.4 0.4 0.4 0 - ...more tables... - - We need to find the LAST/NEWEST table and use that as the final result. + Expected format: + ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV STATUS + 197.157.67.233 0% 3 0.4ms 0.2 0.1 0.4 0.1 + 100% 3 timeout + 41.78.188.153 0% 3 210.8ms 210.8 210.8 210.9 0 """ _log = log.bind(parser="MikrotikTracerouteTable") - # Minimal input summary to avoid excessive logs while keeping context _log.debug( - "Parsing MikroTik traceroute", + "Parsing cleaned MikroTik traceroute table", target=target, source=source, lines=len(text.splitlines()), ) - # Try to extract target from the traceroute command in the output - # Look for patterns like: "tool traceroute src-address=192.168.1.1 timeout=1 duration=30 count=3 8.8.8.8" - lines = text.split("\n") - extracted_target = target # Default to passed target + lines = text.strip().split("\n") + hops = [] + hop_number = 1 - for line in lines[:10]: # Check first 10 lines for command - line = line.strip() - if line.startswith("tool traceroute") or "traceroute" in line: - # Extract target from command line - it's typically the last argument - parts = line.split() - for part in reversed(parts): - # Skip parameters with = signs and common flags - if ( - "=" not in part - and not part.startswith("-") - and not part.startswith("[") - and part - not in ["tool", "traceroute", "src-address", "timeout", "duration", "count"] - ): - # This looks like a target (IP or hostname) - if len(part) > 3: # Reasonable minimum length - extracted_target = part - break + # Find the table header to start parsing from + header_found = False + data_start_index = 0 + + for i, line in enumerate(lines): + if "ADDRESS" in line and "LOSS" in line and "SENT" in line: + header_found = True + data_start_index = i + 1 break - # Use extracted target if found, otherwise keep the passed target - if extracted_target != target: - _log.info( - f"Updated target from '{target}' to '{extracted_target}' based on command output" - ) - target = extracted_target - - lines = text.strip().split("\n") - - # Find all table starts - handle both formats: - # Format 1: "Columns: ADDRESS, LOSS, SENT..." (newer format with hop numbers) - # Format 2: "ADDRESS LOSS SENT..." (older format, no hop numbers) - table_starts = [] - for i, line in enumerate(lines): - if ("Columns:" in line and "ADDRESS" in line) or ( - "ADDRESS" in line - and "LOSS" in line - and "SENT" in line - and not line.strip().startswith(("1", "2", "3", "4", "5", "6", "7", "8", "9")) - ): - table_starts.append(i) - - if not table_starts: - _log.warning("No traceroute table headers found in output") + if not header_found: + _log.warning("No traceroute table header found in cleaned output") return MikrotikTracerouteTable(target=target, source=source, hops=[]) - # Take the LAST table (newest/final results) - last_table_start = table_starts[-1] - _log.debug( - "Found traceroute tables", - tables_found=len(table_starts), - last_table_start=last_table_start, - ) - - # Determine format by checking the header line - header_line = lines[last_table_start].strip() - is_columnar_format = "Columns:" in header_line - _log.debug("Header determined", header=header_line, columnar=is_columnar_format) - - # Parse only the last table - hops = [] - in_data_section = False - current_hop_number = 1 # Track the current hop number - hop_counter = 1 # For old format without hop numbers - - # Start from the last table header - for i in range(last_table_start, len(lines)): - original_line = lines[i] # Keep original line with whitespace - line = original_line.strip() # Stripped version for most processing + # Parse data rows + for i in range(data_start_index, len(lines)): + line = lines[i].strip() # Skip empty lines if not line: continue - # Skip the column header lines - if ( - ("Columns:" in line) - or ("ADDRESS" in line and "LOSS" in line and "SENT" in line) - or line.startswith("#") - ): - in_data_section = True - continue - - # Skip paging prompts + # Stop at any remaining paging markers (shouldn't happen with cleaned input) if "-- [Q quit|C-z pause]" in line: - break # End of this table + break - if in_data_section and line: - # Process data line - try: - # Define helper function for RTT parsing - def parse_rtt(rtt_str: str) -> t.Optional[float]: - if rtt_str in ("timeout", "-", "0ms"): - return None - # Remove 'ms' suffix and convert to float - rtt_clean = re.sub(r"ms$", "", rtt_str) - try: - return float(rtt_clean) - except ValueError: - return None + try: + # Parse data line + parts = line.split() + if len(parts) < 3: + continue - # Check if this is a timeout/continuation line (starts with whitespace, has % and numbers) - # Use original_line to check for leading whitespace - if ( - (original_line.startswith(" ") or original_line.startswith("\t")) - and "%" in line - and ("timeout" in line or "0ms" in line) - ): - # This is a timeout/continuation hop - parts = line.split() + # Check if this is a timeout line (starts with percentage) + if parts[0].endswith("%"): + # Timeout hop: "100% 3 timeout" + ip_address = None + loss_pct = int(parts[0].rstrip("%")) + sent_count = int(parts[1]) + last_rtt = None + avg_rtt = None + best_rtt = None + worst_rtt = None + else: + # Normal hop: "197.157.67.233 0% 3 0.4ms 0.2 0.1 0.4 0.1" + ip_address = parts[0] + if len(parts) < 7: + continue - if len(parts) >= 2 and parts[0].endswith("%"): - ip_address = None - loss_pct = int(parts[0].rstrip("%")) - sent_count = int(parts[1]) + loss_pct = int(parts[1].rstrip("%")) + sent_count = int(parts[2]) - if "timeout" in parts: - last_rtt_str = "timeout" - avg_rtt_str = "timeout" - best_rtt_str = "timeout" - worst_rtt_str = "timeout" - else: - last_rtt_str = parts[2] if len(parts) > 2 else "0ms" - avg_rtt_str = "0" - best_rtt_str = "0" - worst_rtt_str = "0" - - # Create timeout hop - hop = MikrotikTracerouteHop( - hop_number=current_hop_number, - ip_address=ip_address, - hostname=None, - loss_pct=loss_pct, - sent_count=sent_count, - last_rtt=parse_rtt(last_rtt_str), - avg_rtt=parse_rtt(avg_rtt_str), - best_rtt=parse_rtt(best_rtt_str), - worst_rtt=parse_rtt(worst_rtt_str), - ) - hops.append(hop) - current_hop_number += 1 - continue - - if is_columnar_format: - # New format: "1 10.0.0.41 0% 1 0.5ms 0.5 0.5 0.5 0" - parts = line.split() - if len(parts) < 3: - continue - continue - - hop_number = int(parts[0]) - - # Check if there's an IP address or if it's empty (timeout hop) - if len(parts) >= 8 and not parts[1].endswith("%"): - # Normal hop with IP address - ip_address = parts[1] if parts[1] else None - loss_pct = int(parts[2].rstrip("%")) - sent_count = int(parts[3]) - last_rtt_str = parts[4] - avg_rtt_str = parts[5] - best_rtt_str = parts[6] - worst_rtt_str = parts[7] - elif len(parts) >= 4 and parts[1].endswith("%"): - # Timeout hop without IP address - ip_address = None - loss_pct = int(parts[1].rstrip("%")) - sent_count = int(parts[2]) - last_rtt_str = parts[3] if len(parts) > 3 else "timeout" - avg_rtt_str = "timeout" - best_rtt_str = "timeout" - worst_rtt_str = "timeout" - else: - continue - continue - else: - # Old format: "196.60.8.198 0% 1 17.1ms 17.1 17.1 17.1 0" - # We need to deduplicate by taking the LAST occurrence of each IP - parts = line.split() - if len(parts) < 6: - continue - continue - - ip_address = parts[0] if not parts[0].endswith("%") else None - - # Check for truncated IPv6 addresses - if ip_address and (ip_address.endswith("...") or ip_address.endswith("..")): - _log.warning( - "Truncated IP address detected, setting to None", - line=i, - ip=ip_address, - ) - ip_address = None - - if ip_address: - loss_pct = int(parts[1].rstrip("%")) - sent_count = int(parts[2]) - last_rtt_str = parts[3] - avg_rtt_str = parts[4] - best_rtt_str = parts[5] - worst_rtt_str = parts[6] if len(parts) > 6 else parts[5] - else: - # Timeout line or truncated address - if parts[0].endswith("%"): - # Normal timeout line - loss_pct = int(parts[0].rstrip("%")) - sent_count = int(parts[1]) - else: - # Truncated address - extract stats from remaining parts - loss_pct = int(parts[1].rstrip("%")) - sent_count = int(parts[2]) - last_rtt_str = "timeout" - avg_rtt_str = "timeout" - best_rtt_str = "timeout" - worst_rtt_str = "timeout" - - # Convert timing values + # Parse RTT values def parse_rtt(rtt_str: str) -> t.Optional[float]: if rtt_str in ("timeout", "-", "0ms", "*"): return None - # Remove 'ms' suffix and convert to float rtt_clean = re.sub(r"ms$", "", rtt_str) try: return float(rtt_clean) except ValueError: return None - if is_columnar_format: - # Use hop number from the data and update our tracker - final_hop_number = hop_number - current_hop_number = max(current_hop_number, hop_number + 1) - else: - # Use sequential numbering for old format - final_hop_number = hop_counter - hop_counter += 1 + last_rtt = parse_rtt(parts[3]) + avg_rtt = parse_rtt(parts[4]) + best_rtt = parse_rtt(parts[5]) + worst_rtt = parse_rtt(parts[6]) - hop_obj = MikrotikTracerouteHop( - hop_number=final_hop_number, - ip_address=ip_address, - hostname=None, # MikroTik doesn't do reverse DNS by default - loss_pct=loss_pct, - sent_count=sent_count, - last_rtt=parse_rtt(last_rtt_str), - avg_rtt=parse_rtt(avg_rtt_str), - best_rtt=parse_rtt(best_rtt_str), - worst_rtt=parse_rtt(worst_rtt_str), - ) + hop = MikrotikTracerouteHop( + hop_number=hop_number, + ip_address=ip_address, + hostname=None, # MikroTik doesn't do reverse DNS by default + loss_pct=loss_pct, + sent_count=sent_count, + last_rtt=last_rtt, + avg_rtt=avg_rtt, + best_rtt=best_rtt, + worst_rtt=worst_rtt, + ) - hops.append(hop_obj) + hops.append(hop) + hop_number += 1 - except (ValueError, IndexError) as e: - _log.debug("Failed to parse traceroute data line", line=line, error=str(e)) - continue - - # Snapshot before deduplication - orig_hop_count = len(hops) - - # For old format, we need to deduplicate by IP and take only final stats - if not is_columnar_format and hops: - _log.debug("Old format detected - deduplicating entries", total_entries=len(hops)) - - # Group by IP address and take the HIGHEST SENT count (final stats) - ip_to_final_hop = {} - ip_to_max_sent = {} - hop_order = [] - - for hop in hops: - # Use IP address if available, otherwise use hop position for timeouts - if hop.ip_address: - ip_key = hop.ip_address - else: - # No IP address means timeout hop - ip_key = f"timeout_{hop.hop_number}" - - # Track first appearance order - if ip_key not in hop_order: - hop_order.append(ip_key) - ip_to_max_sent[ip_key] = 0 - - # Keep hop with highest SENT count (most recent/final stats) - if hop.sent_count and hop.sent_count >= ip_to_max_sent[ip_key]: - ip_to_max_sent[ip_key] = hop.sent_count - ip_to_final_hop[ip_key] = hop - - # Rebuild hops list with final stats and correct hop numbers - final_hops = [] - for i, ip_key in enumerate(hop_order, 1): - final_hop = ip_to_final_hop[ip_key] - final_hop.hop_number = i # Correct hop numbering - final_hops.append(final_hop) - - hops = final_hops - _log.debug( - "Deduplication complete", - before=orig_hop_count, - after=len(hops), - ) - - # Filter excessive timeout hops ONLY at the end (no more valid hops after) - # Find the last hop with a valid IP address - last_valid_hop_index = -1 - for i, hop in enumerate(hops): - if hop.ip_address is not None and hop.loss_pct < 100: - last_valid_hop_index = i - - filtered_hops = [] - trailing_timeouts = 0 - - for i, hop in enumerate(hops): - if i > last_valid_hop_index and hop.ip_address is None and hop.loss_pct == 100: - # This is a trailing timeout hop (after the last valid hop) - trailing_timeouts += 1 - if trailing_timeouts <= 3: # Only keep first 3 trailing timeouts - filtered_hops.append(hop) - else: - # drop extra trailing timeouts - continue - else: - # This is either a valid hop or a timeout hop with valid hops after it - filtered_hops.append(hop) - - # Renumber the filtered hops - for i, hop in enumerate(filtered_hops, 1): - hop.hop_number = i - - hops = filtered_hops - if last_valid_hop_index >= 0: - _log.debug( - "Filtered trailing timeouts", - last_valid_index=last_valid_hop_index, - trailing_timeouts_removed=max(0, orig_hop_count - len(hops)), - ) + except (ValueError, IndexError) as e: + _log.debug("Failed to parse traceroute data line", line=line, error=str(e)) + continue result = MikrotikTracerouteTable(target=target, source=source, hops=hops) - _log.info("Parsed traceroute final table", hops=len(hops)) + _log.info("Parsed cleaned traceroute table", hops=len(hops)) return result def traceroute_result(self): diff --git a/hyperglass/plugins/_builtin/mikrotik_garbage_output.py b/hyperglass/plugins/_builtin/mikrotik_garbage_output.py index 4c67e24..175911d 100644 --- a/hyperglass/plugins/_builtin/mikrotik_garbage_output.py +++ b/hyperglass/plugins/_builtin/mikrotik_garbage_output.py @@ -51,7 +51,11 @@ class MikrotikGarbageOutput(OutputPlugin): stripped = line.strip() # Skip empty lines and interactive paging prompts - if not stripped or "-- [Q quit|C-z pause]" in stripped or "-- [Q quit|D dump|C-z pause]" in stripped: + if ( + not stripped + or "-- [Q quit|C-z pause]" in stripped + or "-- [Q quit|D dump|C-z pause]" in stripped + ): continue # Skip command echo lines @@ -126,7 +130,9 @@ class MikrotikGarbageOutput(OutputPlugin): else: sent = 0 - is_timeout = "timeout" in row.lower() or ("100%" in row and "timeout" in row.lower()) + is_timeout = "timeout" in row.lower() or ( + "100%" in row and "timeout" in row.lower() + ) # Prefer higher SENT, then prefer non-timeout, then later table (higher ti) pick = False @@ -150,7 +156,9 @@ class MikrotikGarbageOutput(OutputPlugin): # Collapse excessive trailing timeouts into an aggregation line trailing_timeouts = 0 for line in reversed(processed_lines): - if ("timeout" in line.lower()) or (sent_re.search(line) and sent_re.search(line).group(1) == "100"): + if ("timeout" in line.lower()) or ( + sent_re.search(line) and sent_re.search(line).group(1) == "100" + ): trailing_timeouts += 1 else: break @@ -158,13 +166,20 @@ class MikrotikGarbageOutput(OutputPlugin): if trailing_timeouts > 3: non_trailing = len(processed_lines) - trailing_timeouts # Keep first 2 of trailing timeouts and aggregate the rest - aggregated = processed_lines[:non_trailing] + processed_lines[non_trailing:non_trailing + 2] + aggregated = ( + processed_lines[:non_trailing] + processed_lines[non_trailing : non_trailing + 2] + ) remaining = trailing_timeouts - 2 - aggregated.append(f" ... ({remaining} more timeout hops)") + aggregated.append( + f" ... ({remaining} more timeout hops)" + ) processed_lines = aggregated # Prepend header line if we have one - header_to_use = header_line or "ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV STATUS" + header_to_use = ( + header_line + or "ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV STATUS" + ) cleaned = [header_to_use] + processed_lines return "\n".join(cleaned) diff --git a/hyperglass/plugins/_builtin/trace_route_mikrotik.py b/hyperglass/plugins/_builtin/trace_route_mikrotik.py index d0b93a4..7df6c18 100644 --- a/hyperglass/plugins/_builtin/trace_route_mikrotik.py +++ b/hyperglass/plugins/_builtin/trace_route_mikrotik.py @@ -8,6 +8,7 @@ from pydantic import PrivateAttr, ValidationError # Project from hyperglass.log import log, log as _log +from hyperglass.settings import Settings from hyperglass.exceptions.private import ParsingError from hyperglass.models.parsing.mikrotik import MikrotikTracerouteTable from hyperglass.state import use_state @@ -27,15 +28,11 @@ def _normalize_output(output: t.Union[str, t.Sequence[str]]) -> t.List[str]: return [output] return list(output) + def _clean_traceroute_only( output: t.Union[str, t.Sequence[str]], query: "Query" ) -> t.Union[str, t.Tuple[str, ...]]: - """Run only the traceroute-specific cleaner and return same-shaped result. - - This calls the internal _clean_traceroute_output method on the - MikrotikGarbageOutput plugin so the cleaned traceroute text is used - as the 'raw' output exposed to clients. - """ + """Clean traceroute output using MikrotikGarbageOutput plugin.""" from .mikrotik_garbage_output import MikrotikGarbageOutput out_list = _normalize_output(output) @@ -46,7 +43,6 @@ def _clean_traceroute_only( try: cleaned_piece = cleaner._clean_traceroute_output(piece) except Exception: - # If cleaner fails for any piece, fall back to the original piece cleaned_piece = piece cleaned_list.append(cleaned_piece) @@ -58,42 +54,32 @@ def _clean_traceroute_only( def parse_mikrotik_traceroute( output: t.Union[str, t.Sequence[str]], target: str, source: str ) -> "OutputDataModel": - """Parse a MikroTik traceroute text response.""" - result = None + """Parse a cleaned MikroTik traceroute text response.""" out_list = _normalize_output(output) - _log = log.bind(plugin=TraceroutePluginMikrotik.__name__) combined_output = "\n".join(out_list) - # Minimal summary of the input - avoid dumping full raw output to logs - contains_paging = "-- [Q quit|C-z pause]" in combined_output - contains_multiple_tables = combined_output.count("ADDRESS") > 1 - _log.debug( - "Received traceroute plugin input", - target=target, - source=source, - pieces=len(out_list), - combined_len=len(combined_output), - contains_paging=contains_paging, - multiple_tables=contains_multiple_tables, - ) + if Settings.debug: + _log.debug( + "Parsing cleaned traceroute input", + target=target, + source=source, + pieces=len(out_list), + combined_len=len(combined_output), + ) try: - # Pass the entire combined output to the parser at once validated = MikrotikTracerouteTable.parse_text(combined_output, target, source) result = validated.traceroute_result() - - # Store the CLEANED output (after garbage removal) for "Copy Raw" functionality - # This is the processed output from MikrotikGarbageOutput plugin, not the original raw router output result.raw_output = combined_output - # Concise structured logging for result - _log.debug( - "Parsed traceroute result", - hops=len(validated.hops), - target=result.target, - source=result.source, - ) + if Settings.debug: + _log.debug( + "Parsed traceroute result", + hops=len(validated.hops), + target=result.target, + source=result.source, + ) except ValidationError as err: _log.critical(err) @@ -114,38 +100,31 @@ class TraceroutePluginMikrotik(OutputPlugin): def process(self, *, output: "OutputType", query: "Query") -> "OutputDataModel": """Process the MikroTik traceroute output.""" - # Extract target from query target = getattr(query, "target", "unknown") source = getattr(query, "source", "unknown") - # Try to get target from query_target which is more reliable if hasattr(query, "query_target") and query.query_target: target = str(query.query_target) if hasattr(query, "device") and query.device: source = getattr(query.device, "name", source) - + _log = log.bind(plugin=TraceroutePluginMikrotik.__name__) - # Debug: emit the raw response exactly as returned by the router. - # Do not transform, join, or normalize the output — log it verbatim. - try: - # Ensure the router output is embedded in the log message body so it - # is visible regardless of the logger's formatter configuration. - if isinstance(output, (tuple, list)): - try: - combined_raw = "\n".join(output) - except Exception: - # Fall back to repr if join fails for non-string elements - combined_raw = repr(output) - else: - combined_raw = output if isinstance(output, str) else repr(output) + # Log raw router output only when debug is enabled + if Settings.debug: + try: + if isinstance(output, (tuple, list)): + try: + combined_raw = "\n".join(output) + except Exception: + combined_raw = repr(output) + else: + combined_raw = output if isinstance(output, str) else repr(output) - # Log the full verbatim router response (DEBUG level). - _log.debug("Router raw output:\n{}", combined_raw) - except Exception: - # Don't let logging interfere with normal processing - _log.exception("Failed to log router raw output") + _log.debug("Router raw output:\n{}", combined_raw) + except Exception: + _log.exception("Failed to log router raw output") try: params = use_state("params") @@ -154,14 +133,33 @@ class TraceroutePluginMikrotik(OutputPlugin): device = getattr(query, "device", None) + # Check if structured output is enabled if device is None: + if Settings.debug: + _log.debug("No device found, using cleanup-only mode") return _clean_traceroute_only(output, query) - else: - if params is None: - return _clean_traceroute_only(output, query) - if not getattr(params, "structured", None): - return _clean_traceroute_only(output, query) - if getattr(params.structured, "enable_for_traceroute", None) is False: - return _clean_traceroute_only(output, query) - return parse_mikrotik_traceroute(output, target, source) + if params is None: + if Settings.debug: + _log.debug("No params found, using cleanup-only mode") + return _clean_traceroute_only(output, query) + + if not getattr(params, "structured", None): + if Settings.debug: + _log.debug("Structured output not configured, using cleanup-only mode") + return _clean_traceroute_only(output, query) + + if getattr(params.structured, "enable_for_traceroute", None) is False: + if Settings.debug: + _log.debug("Structured output disabled for traceroute, using cleanup-only mode") + return _clean_traceroute_only(output, query) + + if Settings.debug: + _log.debug("Processing traceroute with structured output enabled") + + # Clean the output first using garbage cleaner before parsing + cleaned_output = _clean_traceroute_only(output, query) + if Settings.debug: + _log.debug("Applied garbage cleaning before structured parsing") + + return parse_mikrotik_traceroute(cleaned_output, target, source) diff --git a/hyperglass/plugins/_builtin/traceroute_ip_enrichment.py b/hyperglass/plugins/_builtin/traceroute_ip_enrichment.py index 33bb36e..fb260b3 100644 --- a/hyperglass/plugins/_builtin/traceroute_ip_enrichment.py +++ b/hyperglass/plugins/_builtin/traceroute_ip_enrichment.py @@ -36,22 +36,70 @@ class ZTracerouteIpEnrichment(OutputPlugin): def _reverse_dns_lookup(self, ip: str) -> t.Optional[str]: """Perform reverse DNS lookup for an IP address.""" + from hyperglass.settings import Settings + try: hostname = socket.gethostbyaddr(ip)[0] - log.debug(f"Reverse DNS for {ip}: {hostname}") + if Settings.debug: + log.debug(f"Reverse DNS for {ip}: {hostname}") return hostname except (socket.herror, socket.gaierror, socket.timeout) as e: - log.debug(f"Reverse DNS lookup failed for {ip}: {e}") + if Settings.debug: + log.debug(f"Reverse DNS lookup failed for {ip}: {e}") + return None + + async def _reverse_dns_lookup_async(self, ip: str) -> t.Optional[str]: + """Async wrapper around synchronous reverse DNS lookup using a thread. + + Uses asyncio.to_thread to avoid blocking the event loop and allows + multiple lookups to be scheduled concurrently. + """ + try: + return await asyncio.to_thread(self._reverse_dns_lookup, ip) + except Exception as e: + from hyperglass.settings import Settings + + if Settings.debug: + log.debug(f"Reverse DNS async lookup error for {ip}: {e}") return None async def _enrich_async(self, output: TracerouteResult) -> None: - """Async helper to enrich traceroute data.""" + """Async helper to enrich traceroute data. + + This performs IP enrichment (ASN lookups), ASN organization lookups, + and then runs reverse DNS lookups concurrently for hops missing hostnames. + """ # First enrich with IP information (ASN numbers) await output.enrich_with_ip_enrichment() # Then enrich ASN numbers with organization names await output.enrich_asn_organizations() + # Concurrent reverse DNS for hops missing hostnames + ips_to_lookup: list[str] = [ + hop.ip_address for hop in output.hops if hop.ip_address and hop.hostname is None + ] + if not ips_to_lookup: + return + + # Schedule lookups concurrently + tasks = [asyncio.create_task(self._reverse_dns_lookup_async(ip)) for ip in ips_to_lookup] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Apply results back to hops in order + idx = 0 + for hop in output.hops: + if hop.ip_address and hop.hostname is None: + res = results[idx] + idx += 1 + if isinstance(res, Exception): + from hyperglass.settings import Settings + + if Settings.debug: + log.debug(f"Reverse DNS lookup raised for {hop.ip_address}: {res}") + else: + hop.hostname = res + def process(self, *, output: "OutputDataModel", query: "Query") -> "OutputDataModel": """Enrich structured traceroute data with IP enrichment and reverse DNS information.""" @@ -59,9 +107,13 @@ class ZTracerouteIpEnrichment(OutputPlugin): return output _log = log.bind(plugin=self.__class__.__name__) - _log.debug(f"Starting IP enrichment for {len(output.hops)} traceroute hops") - # Check if IP enrichment is enabled in config + # Import Settings for debug gating + from hyperglass.settings import Settings + + _log.info( + f"Starting IP enrichment for {len(output.hops)} traceroute hops" + ) # Check if IP enrichment is enabled in config try: from hyperglass.state import use_state @@ -73,14 +125,62 @@ class ZTracerouteIpEnrichment(OutputPlugin): or not params.structured.ip_enrichment.enrich_traceroute or getattr(params.structured, "enable_for_traceroute", None) is False ): - _log.debug("IP enrichment for traceroute disabled in configuration") + if Settings.debug: + _log.debug("IP enrichment for traceroute disabled in configuration") # Still do reverse DNS if enrichment is disabled - for hop in output.hops: - if hop.ip_address and hop.hostname is None: - hop.hostname = self._reverse_dns_lookup(hop.ip_address) + # Perform concurrent reverse DNS lookups for hops needing hostnames + ips = [ + hop.ip_address for hop in output.hops if hop.ip_address and hop.hostname is None + ] + if ips: + try: + # Run lookups in an event loop + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # We're inside an event loop; run tasks via asyncio.run in thread + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit( + lambda: asyncio.run( + asyncio.gather( + *[self._reverse_dns_lookup_async(ip) for ip in ips] + ) + ) + ) + results = future.result() + else: + results = loop.run_until_complete( + asyncio.gather( + *[self._reverse_dns_lookup_async(ip) for ip in ips] + ) + ) + except RuntimeError: + results = asyncio.run( + asyncio.gather(*[self._reverse_dns_lookup_async(ip) for ip in ips]) + ) + + # Apply results + idx = 0 + for hop in output.hops: + if hop.ip_address and hop.hostname is None: + res = results[idx] + idx += 1 + if not isinstance(res, Exception): + hop.hostname = res + except Exception as e: + if Settings.debug: + _log.debug( + f"Concurrent reverse DNS failed (fallback to sequential): {e}" + ) + for hop in output.hops: + if hop.ip_address and hop.hostname is None: + hop.hostname = self._reverse_dns_lookup(hop.ip_address) return output except Exception as e: - _log.debug(f"Could not check IP enrichment config: {e}") + if Settings.debug: + _log.debug(f"Could not check IP enrichment config: {e}") # Use the built-in enrichment method from TracerouteResult try: @@ -100,14 +200,11 @@ class ZTracerouteIpEnrichment(OutputPlugin): except RuntimeError: # No event loop, create one asyncio.run(self._enrich_async(output)) - _log.debug("IP enrichment completed successfully") + _log.info("IP enrichment completed successfully") except Exception as e: _log.error(f"IP enrichment failed: {e}") - # Add reverse DNS lookups for any hops that don't have hostnames - for hop in output.hops: - if hop.ip_address and hop.hostname is None: - hop.hostname = self._reverse_dns_lookup(hop.ip_address) + # Reverse DNS lookups already handled in _enrich_async for missing hostnames - _log.debug(f"Completed enrichment for traceroute to {output.target}") + _log.info(f"Completed enrichment for traceroute to {output.target}") return output