1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-04-17 13:28:27 +00:00

feat: comprehensive IP enrichment and traceroute improvements

MAJOR ENHANCEMENTS:

IP Enrichment Service (hyperglass/external/ip_enrichment.py):
- Increase IXP data cache duration from 24 hours to 7 days (604800s) for better performance
- Fix critical cache refresh logic: ensure_data_loaded() now properly checks expiry before using existing pickle files
- Remove 'force' refresh parameters from public APIs and admin endpoints to prevent potential abuse/DDOS
- Implement automatic refresh based on file timestamps and cache duration
- Add comprehensive debug logging gated by Settings.debug throughout the module
- Clean up verbose comments and improve code readability
- Update configuration model to enforce 7-day minimum cache timeout

MikroTik Traceroute Processing:
- Refactor trace_route_mikrotik plugin to use garbage cleaner before structured parsing
- Only log raw router output when Settings.debug is enabled to reduce log verbosity
- Simplify MikrotikTracerouteTable parser to expect pre-cleaned input from garbage cleaner
- Remove complex multi-table detection, format detection, and deduplication logic (handled by cleaner)
- Add concise debug messages for processing decisions and configuration states

Traceroute IP Enrichment (traceroute_ip_enrichment.py):
- Implement concurrent reverse DNS lookups using asyncio.to_thread and asyncio.gather
- Add async wrapper for reverse DNS with proper error handling and fallbacks
- Significant performance improvement for multi-hop traceroutes (parallel vs sequential DNS)
- Proper debug logging gates: only detailed logs when Settings.debug=True
- Upgrade operational messages to log.info level (start/completion status)
- Maintain compatibility with different event loop contexts and runtime environments

Configuration Updates:
- Update structured.ip_enrichment.cache_timeout default to 604800 seconds
- Update documentation to reflect new cache defaults and behavior
- Remove force refresh options from admin API endpoints

MIGRATION NOTES:
- Operators should ensure /etc/hyperglass/ip_enrichment directory is writable
- Any code relying on force refresh parameters must be updated
- Monitor logs for automatic refresh behavior and performance improvements
- The 7-day cache significantly reduces PeeringDB API load

PERFORMANCE BENEFITS:
- Faster traceroute enrichment due to concurrent DNS lookups
- Reduced external API calls with longer IXP cache duration
- More reliable refresh logic prevents stale cache usage
- Cleaner, more focused debug output when debug mode is disabled

TECHNICAL DETAILS:
- Uses asyncio.to_thread for non-blocking DNS operations
- Implements process-wide file locking for safe concurrent cache updates
- Robust fallbacks for various asyncio execution contexts
- Maintains backward compatibility while improving performance

FILES MODIFIED:
- hyperglass/external/ip_enrichment.py
- hyperglass/models/config/structured.py
- hyperglass/api/routes.py
- hyperglass/plugins/_builtin/trace_route_mikrotik.py
- hyperglass/models/parsing/mikrotik.py
- hyperglass/plugins/_builtin/traceroute_ip_enrichment.py
- docs/pages/configuration/config/structured-output.mdx
This commit is contained in:
Wilhelm Schonfeldt 2025-10-05 21:25:58 +02:00
parent 93edf34ccf
commit 4a1057651f
No known key found for this signature in database
GPG key ID: 9A15BF796D5C3F1E
8 changed files with 482 additions and 783 deletions

View file

@ -31,7 +31,7 @@ For devices with structured traceroute support (Arista EOS, FRRouting, Huawei VR
| `structured.communities.mode` | String | deny | Use `deny` to deny any communities listed, `permit` to _only_ permit communities listed, or `name` to append friendly names. |
| `structured.communities.items` | List of Strings | | List of communities to match (used by `deny` and `permit` modes). |
| `structured.communities.names` | Dict | | Dictionary mapping BGP community codes to friendly names (used by `name` mode). |
| `structured.ip_enrichment.cache_timeout` | Integer | 86400 | Cache timeout in seconds for IP enrichment data (minimum 24 hours/86400 seconds). |
| `structured.ip_enrichment.cache_timeout` | Integer | 604800 | Cache timeout in seconds for IP enrichment data (minimum 7 days/604800 seconds). |
| `structured.ip_enrichment.enrich_traceroute`| Boolean | true | When `structured:` is present, enable IP enrichment of traceroute hops (ASN, org, IXP). This must be true for enrichment to run. |
| `structured.enable_for_traceroute`| Boolean | (when structured present) true | When `structured:` is present this controls whether the structured traceroute table output is shown. Set to false to force raw router output. |
| `structured.enable_for_bgp_route`| Boolean | (when structured present) true | When `structured:` is present this controls whether the structured BGP route table output is shown. Set to false to force raw router output. |

View file

@ -204,7 +204,7 @@ async def query(_state: HyperglassState, request: Request, data: Query) -> Query
async def _bg_refresh():
try:
await refresh_ip_enrichment_data(force=False)
await refresh_ip_enrichment_data()
except Exception as e:
_log.debug("Background IP enrichment refresh failed: {}", e)
@ -337,7 +337,7 @@ async def ip_enrichment_status() -> dict:
@post("/api/admin/ip-enrichment/refresh")
async def ip_enrichment_refresh(force: bool = False) -> dict:
async def ip_enrichment_refresh() -> dict:
"""Manually refresh IP enrichment data."""
try:
from hyperglass.external.ip_enrichment import refresh_ip_enrichment_data
@ -357,7 +357,7 @@ async def ip_enrichment_refresh(force: bool = False) -> dict:
# If config can't be read, proceed with refresh call and let it decide
pass
success = await refresh_ip_enrichment_data(force=force)
success = await refresh_ip_enrichment_data()
return {
"success": success,
"message": (

View file

@ -18,82 +18,68 @@ import socket
from hyperglass.log import log
from hyperglass.state import use_state
from hyperglass.settings import Settings
# Process-wide lock to coordinate downloads across worker processes.
# Uses an on-disk lock directory so separate processes don't simultaneously
# download enrichment data and cause rate limits.
# Process-wide lock to coordinate downloads across worker processes
_download_lock: t.Optional["_ProcessFileLock"] = None
class _ProcessFileLock:
"""Async-friendly, process-wide filesystem lock.
Provides an async context manager that runs blocking mkdir/remove
operations in an executor so multiple processes can coordinate.
"""
"""Async-friendly, process-wide filesystem lock."""
def __init__(self, lock_path: Path, timeout: int = 300, poll_interval: float = 0.1):
self.lock_path = lock_path
self.timeout = timeout
self.poll_interval = poll_interval
self._lock_dir: t.Optional[str] = None
# Small startup jitter (seconds) to reduce thundering herd on many
# worker processes starting at the same time.
self._startup_jitter = 0.25
self._startup_jitter = 0.25 # Reduce thundering herd on startup
def _acquire_blocking(self) -> None:
# Use atomic mkdir on a .lck directory as the lock primitive.
"""Acquire lock using atomic mkdir."""
import os
import random
import json
import shutil
lock_dir = str(self.lock_path) + ".lck"
# Small jitter before first attempt to reduce concurrent mkdirs
time.sleep(random.uniform(0, self._startup_jitter))
start = time.time()
if Settings.debug:
log.debug(f"Attempting to acquire process lock {lock_dir}")
while True:
try:
# Try to create the lock directory atomically; on success we
# hold the lock. If it exists, retry until timeout.
os.mkdir(lock_dir)
os.mkdir(lock_dir) # Atomic lock acquisition
# Write a small owner metadata file to help debugging stale locks
# Write metadata for debugging
try:
owner = {"pid": os.getpid(), "created": datetime.now().isoformat()}
with open(os.path.join(lock_dir, "owner.json"), "w") as f:
json.dump(owner, f)
except Exception:
# Not critical; proceed even if writing metadata fails
pass
pass # Not critical
self._lock_dir = lock_dir
log.debug(f"Acquired process lock {lock_dir} (pid={os.getpid()})")
if Settings.debug:
log.debug(f"Acquired process lock {lock_dir} (pid={os.getpid()})")
return
except FileExistsError:
# If the lock appears stale (older than timeout), try cleanup.
# Check for stale locks
try:
owner_file = os.path.join(lock_dir, "owner.json")
mtime = None
if os.path.exists(owner_file):
mtime = os.path.getmtime(owner_file)
else:
mtime = os.path.getmtime(lock_dir)
mtime = os.path.getmtime(owner_file if os.path.exists(owner_file) else lock_dir)
# If owner file/dir mtime is older than timeout, remove it
if (time.time() - mtime) >= self.timeout:
log.warning(f"Removing stale lock directory {lock_dir}")
if Settings.debug:
log.debug(f"Removing stale lock directory {lock_dir}")
try:
shutil.rmtree(lock_dir)
except Exception:
# If we can't remove it, we'll continue to wait until
# the timeout is reached by this acquisition attempt.
pass
# After attempted cleanup, loop and try mkdir again
continue
continue # Try again after cleanup
except Exception:
# Ignore issues during stale-check and continue waiting
pass
if (time.time() - start) >= self.timeout:
@ -101,41 +87,43 @@ class _ProcessFileLock:
time.sleep(self.poll_interval)
def _release_blocking(self) -> None:
"""Release the lock by removing the lock directory."""
import os
import shutil
if not self._lock_dir:
return
try:
if self._lock_dir:
# Clean up metadata file
owner_file = os.path.join(self._lock_dir, "owner.json")
if os.path.exists(owner_file):
try:
owner_file = os.path.join(self._lock_dir, "owner.json")
if os.path.exists(owner_file):
try:
os.remove(owner_file)
except Exception:
pass
# Attempt to remove the directory. If it's empty, rmdir will
# succeed; if not, fall back to recursive removal as a best-effort.
try:
os.rmdir(self._lock_dir)
except Exception:
try:
shutil.rmtree(self._lock_dir)
except Exception:
log.debug(f"Failed to fully remove lock dir {self._lock_dir}")
log.debug(f"Released process lock {self._lock_dir}")
self._lock_dir = None
os.remove(owner_file)
except Exception:
# Best-effort; ignore errors removing the lock dir
pass
# Remove lock directory
try:
os.rmdir(self._lock_dir)
except Exception:
try:
shutil.rmtree(self._lock_dir)
except Exception:
if Settings.debug:
log.debug(f"Failed to remove lock dir {self._lock_dir}")
if Settings.debug:
log.debug(f"Released process lock {self._lock_dir}")
self._lock_dir = None
except Exception:
# Nothing we can do on release failure
if Settings.debug:
log.debug(f"Error releasing lock {self._lock_dir}")
pass
async def __aenter__(self):
loop = asyncio.get_running_loop()
# Run blocking acquire in executor
await loop.run_in_executor(None, self._acquire_blocking)
return self
@ -144,11 +132,7 @@ class _ProcessFileLock:
await loop.run_in_executor(None, self._release_blocking)
# Instantiate a process-global lock file in the data dir. The data dir may not yet
# exist at import time; the constant path is defined below and we'll initialize
# the actual _download_lock after the paths are declared. (See below.)
# Optional dependencies - graceful fallback if not available
# Optional dependencies
try:
import httpx
except ImportError:
@ -161,24 +145,18 @@ except ImportError:
log.warning("aiofiles not available - IP enrichment will use slower sync I/O")
aiofiles = None
# File paths for persistent storage
# File paths and constants
IP_ENRICHMENT_DATA_DIR = Path("/etc/hyperglass/ip_enrichment")
IXP_PICKLE_FILE = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle"
LAST_UPDATE_FILE = IP_ENRICHMENT_DATA_DIR / "last_update.txt"
DEFAULT_CACHE_DURATION = 7 * 24 * 60 * 60 # 7 days
# Cache duration (seconds). Default: 24 hours. Can be overridden in config.
DEFAULT_CACHE_DURATION = 24 * 60 * 60
# Lazily-created process-wide download lock. Create this after the data
# directory is ensured to exist to avoid open() failing due to a missing
# parent directory and to ensure the lock file lives under the same path
# for all workers.
# Global download lock (initialized when data directory exists)
_download_lock: t.Optional[_ProcessFileLock] = None
def get_cache_duration() -> int:
"""Get cache duration from config, ensuring minimum of 24 hours."""
"""Get cache duration from config, ensuring minimum of 7 days."""
try:
from hyperglass.state import use_state
@ -190,46 +168,34 @@ def get_cache_duration() -> int:
return DEFAULT_CACHE_DURATION
def should_refresh_data(force_refresh: bool = False) -> tuple[bool, str]:
"""Decide whether to refresh IXP data. Only PeeringDB IXP prefixes are
considered relevant for startup refresh; BGP.tools bulk files are not used.
"""
if force_refresh:
return True, "Force refresh requested"
# No persistent backoff marker; decide refresh purely by file age / config
# and any transient network errors will be handled by the downloader's
# retry logic.
# If an IXP file exists, prefer it and do not perform automatic refreshes
# unless the caller explicitly requested a force refresh.
if IXP_PICKLE_FILE.exists() and not force_refresh:
return False, "ixp_data.json exists; skipping automatic refresh"
# If IXP file is missing, refresh is needed
def should_refresh_data() -> tuple[bool, str]:
"""Check if IXP data needs refreshing based on cache age."""
if not IXP_PICKLE_FILE.exists():
return True, "No ixp_data.json present"
return True, "No ixp_data.pickle present"
# Otherwise check timestamp age
try:
with open(LAST_UPDATE_FILE, "r") as f:
cached_time = datetime.fromisoformat(f.read().strip())
age_seconds = (datetime.now() - cached_time).total_seconds()
cache_duration = get_cache_duration()
if age_seconds >= cache_duration:
age_hours = age_seconds / 3600
return True, f"Data expired (age: {age_hours:.1f}h, max: {cache_duration/3600:.1f}h)"
reason = f"Data expired (age: {age_hours:.1f}h, max: {cache_duration/3600:.1f}h)"
if Settings.debug:
log.debug(f"IP enrichment cache check: {reason}")
return True, reason
except Exception as e:
# If reading timestamp fails, prefer a refresh so we don't rely on stale data
if Settings.debug:
log.debug(f"Failed to read cache timestamp: {e}")
return True, f"Failed to read timestamp: {e}"
if Settings.debug:
log.debug("IP enrichment cache is fresh")
return False, "Data is fresh"
# validate_data_files removed - legacy BGP.tools bulk files are no longer used
# Simple result classes
class IPInfo:
"""Result of IP lookup."""
@ -274,23 +240,21 @@ class IPEnrichmentService:
) # (net_int, mask_bits, asn, cidr)
self._lookup_optimized = False
# Combined cache for ultra-fast loading
# Runtime caches
self._combined_cache: t.Optional[t.Dict[str, t.Any]] = None
# Per-IP in-memory cache for bgp.tools lookups: ip -> (asn, asn_name, prefix, expires_at)
self._per_ip_cache: t.Dict[
str, t.Tuple[t.Optional[int], t.Optional[str], t.Optional[str], float]
] = {}
# Small in-memory cache for per-IP lookups to avoid repeated websocket
# queries during runtime. Maps ip_str -> (asn, asn_name, prefix)
self._ip_cache: t.Dict[str, t.Tuple[t.Optional[int], t.Optional[str], t.Optional[str]]] = {}
# Lock to serialize data load so concurrent callers don't duplicate work
self._ensure_lock = asyncio.Lock()
def _optimize_lookups(self):
"""Convert IP networks to integer format for faster lookups."""
if self._lookup_optimized:
return
log.debug("Optimizing IP lookup structures...")
if Settings.debug:
log.debug("Optimizing IP lookup structures...")
optimize_start = datetime.now()
self._ipv4_networks = []
@ -310,182 +274,82 @@ class IPEnrichmentService:
self._ipv6_networks.sort(key=lambda x: x[1])
optimize_time = (datetime.now() - optimize_start).total_seconds()
log.debug(
f"Optimized lookups: {len(self._ipv4_networks)} IPv4, {len(self._ipv6_networks)} IPv6 (took {optimize_time:.2f}s)"
)
if Settings.debug:
log.debug(
f"Optimized lookups: {len(self._ipv4_networks)} IPv4, {len(self._ipv6_networks)} IPv6 (took {optimize_time:.2f}s)"
)
self._lookup_optimized = True
def _try_load_pickle(self) -> bool:
"""Attempt to load the optimized pickle from disk without triggering downloads.
This is a best-effort, non-blocking load used during runtime lookups so
we don't attempt network refreshes or acquire process locks while
serving user requests.
"""
"""Best-effort load of IXP data from pickle without blocking."""
try:
pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle"
if pickle_path.exists():
try:
with open(pickle_path, "rb") as f:
parsed = pickle.load(f)
if parsed and isinstance(parsed, list) and len(parsed) > 0:
self.ixp_networks = [
(ip_address(net), prefixlen, name) for net, prefixlen, name in parsed
]
with open(pickle_path, "rb") as f:
parsed = pickle.load(f)
if parsed and isinstance(parsed, list) and len(parsed) > 0:
self.ixp_networks = [
(ip_address(net), prefixlen, name) for net, prefixlen, name in parsed
]
if Settings.debug:
log.debug(
"Loaded {} IXP prefixes from optimized pickle (non-blocking)",
len(self.ixp_networks),
f"Loaded {len(self.ixp_networks)} IXP prefixes from pickle (non-blocking)"
)
return True
except Exception as e:
log.debug("Non-blocking pickle load failed: {}", e)
except Exception:
pass
return True
except Exception as e:
if Settings.debug:
log.debug(f"Non-blocking pickle load failed: {e}")
return False
async def ensure_data_loaded(self, force_refresh: bool = False) -> bool:
"""Ensure data is loaded and fresh from persistent files.
New behavior: only load PeeringDB IXP prefixes at startup. Do NOT bulk
download BGP.tools CIDR or ASN data. Per-IP ASN lookups will query the
bgp.tools API (websocket preferred) on-demand.
"""
# Create data directory if it doesn't exist
async def ensure_data_loaded(self) -> bool:
"""Ensure IXP data is loaded. Downloads from PeeringDB if needed."""
IP_ENRICHMENT_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Lazily instantiate the process-wide download lock now that the
# data directory exists and is guaranteed to be the same path for
# all worker processes.
# Initialize download lock
global _download_lock
if _download_lock is None:
_download_lock = _ProcessFileLock(IP_ENRICHMENT_DATA_DIR / "download.lock")
# Fast-path: if already loaded in memory, return immediately
if self.ixp_networks:
return True
# Serialize loads to avoid duplicate file reads when multiple callers
# call ensure_data_loaded concurrently.
async with self._ensure_lock:
# Double-check after acquiring the lock
if self.ixp_networks:
return True
# Fast-path: if an optimized pickle exists and the caller did not
# request a forced refresh, load it (fastest). Fall back to the
# legacy JSON IXP file or downloads if the pickle is missing or
# invalid. This ensures the pickle is the preferred on-disk cache
# for faster startup.
try:
pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle"
if pickle_path.exists() and not force_refresh:
try:
with open(pickle_path, "rb") as f:
parsed = pickle.load(f)
if parsed and isinstance(parsed, list) and len(parsed) > 0:
self.ixp_networks = [
(ip_address(net), prefixlen, name)
for net, prefixlen, name in parsed
]
log.info(
f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (fast-path)"
)
return True
else:
log.warning(
"Optimized pickle exists but appears empty or invalid; falling back to JSON/load or refresh"
)
except Exception as e:
log.warning(
f"Failed to load optimized pickle {pickle_path}: {e}; falling back"
)
except Exception:
# Non-fatal; continue to JSON/download logic
pass
# Check if refresh is needed first
should_refresh, reason = should_refresh_data()
# Immediate guard: if an optimized pickle exists on disk and the
# caller did not request a forced refresh, prefer it and skip any
# network downloads. This keeps startup fast by loading the already
# generated optimized mapping.
try:
pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle"
if pickle_path.exists() and not force_refresh:
try:
with open(pickle_path, "rb") as f:
parsed = pickle.load(f)
if parsed and isinstance(parsed, list) and len(parsed) > 0:
self.ixp_networks = [
(ip_address(net), prefixlen, name)
for net, prefixlen, name in parsed
]
log.info(
f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (early guard)"
)
return True
else:
log.warning(
"Optimized pickle exists but appears empty or invalid; will attempt to refresh"
)
except Exception as e:
log.warning(
f"Failed to read optimized pickle: {e}; will attempt to refresh"
)
except Exception:
# Ignore filesystem errors and continue to refresh logic
pass
# No operator raw-dump conversion: rely on endpoint JSON files (ixpfx.json,
# ixlan.json, ix.json) in the data directory or download them from
# PeeringDB when a refresh is required. Determine whether we should
# refresh based on the backoff marker / cache duration.
should_refresh, reason = should_refresh_data(force_refresh)
# If an optimized pickle exists, prefer it and avoid downloads unless forced.
try:
pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle"
if pickle_path.exists():
if not should_refresh:
# Try to load existing pickle file if data is fresh
try:
st = pickle_path.stat()
size = getattr(st, "st_size", None)
except Exception:
size = None
# If file size indicates non-empty file try to load
if size is not None and size > 0:
try:
pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle"
if pickle_path.exists():
with open(pickle_path, "rb") as f:
parsed = pickle.load(f)
except Exception as e:
log.warning(f"Failed to parse existing optimized IXP pickle: {e}")
parsed = None
if parsed and isinstance(parsed, list) and len(parsed) > 0:
self.ixp_networks = [
(ip_address(net), prefixlen, name)
for net, prefixlen, name in parsed
]
if Settings.debug:
log.debug(
f"Loaded {len(self.ixp_networks)} IXP prefixes from pickle"
)
return True
else:
if Settings.debug:
log.debug("Pickle exists but appears empty, will refresh")
should_refresh = True
reason = "Pickle file is empty"
except Exception as e:
if Settings.debug:
log.debug(f"Failed to load pickle: {e}")
should_refresh = True
reason = f"Failed to load pickle: {e}"
if parsed and isinstance(parsed, list) and len(parsed) > 0:
self.ixp_networks = [
(ip_address(net), prefixlen, name) for net, prefixlen, name in parsed
]
log.info(
f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (size={size})"
)
return True
else:
log.warning(
"Existing optimized pickle appears empty or invalid (size={}) ; will attempt to refresh",
size,
)
else:
log.debug(
f"Optimized pickle exists but size indicates empty or very small (size={size})"
)
except Exception as e:
log.warning(f"Failed to load existing optimized IXP data: {e}")
# If we're currently under a backoff or refresh is not required, skip downloading
if not should_refresh:
# If the optimized pickle is missing but the raw PeeringDB JSON files
# are present and the last_update timestamp is still within the
# configured cache duration, attempt to build the optimized pickle
# from the existing JSON files instead of downloading.
# Try to build pickle from existing JSON files
try:
pickle_path = IP_ENRICHMENT_DATA_DIR / "ixp_data.pickle"
json_paths = [
@ -496,73 +360,65 @@ class IPEnrichmentService:
have_all_json = all(p.exists() for p in json_paths)
if not pickle_path.exists() and have_all_json and LAST_UPDATE_FILE.exists():
try:
with open(LAST_UPDATE_FILE, "r") as f:
cached_time = datetime.fromisoformat(f.read().strip())
age_seconds = (datetime.now() - cached_time).total_seconds()
cache_duration = get_cache_duration()
if age_seconds < cache_duration:
log.info("Building optimized pickle from existing PeeringDB JSON files")
loop = asyncio.get_running_loop()
ok = await loop.run_in_executor(None, self._combine_peeringdb_files)
if ok and pickle_path.exists():
# Load the generated pickle into memory
try:
with open(pickle_path, "rb") as f:
parsed = pickle.load(f)
self.ixp_networks = [
(ip_address(net), prefixlen, name)
for net, prefixlen, name in parsed
]
log.info(
"Loaded %d IXP prefixes from generated pickle",
len(self.ixp_networks),
)
return True
except Exception as e:
log.warning(f"Failed to load generated pickle: {e}")
except Exception:
# If reading last_update fails, fall through to skipping refresh
pass
with open(LAST_UPDATE_FILE, "r") as f:
cached_time = datetime.fromisoformat(f.read().strip())
age_seconds = (datetime.now() - cached_time).total_seconds()
cache_duration = get_cache_duration()
if age_seconds < cache_duration:
if Settings.debug:
log.debug("Building pickle from existing JSON files")
loop = asyncio.get_running_loop()
ok = await loop.run_in_executor(None, self._combine_peeringdb_files)
if ok and pickle_path.exists():
with open(pickle_path, "rb") as f:
parsed = pickle.load(f)
self.ixp_networks = [
(ip_address(net), prefixlen, name)
for net, prefixlen, name in parsed
]
if Settings.debug:
log.debug(
f"Loaded {len(self.ixp_networks)} IXP prefixes from generated pickle"
)
return True
except Exception as e:
if Settings.debug:
log.debug(f"Failed to build pickle from JSON: {e}")
except Exception:
# Non-fatal; proceed to skip refresh
pass
log.info("Skipping IXP refresh: {}", reason)
if Settings.debug:
log.debug(f"Skipping IXP refresh: {reason}")
return False
# Acquire lock and refresh IXP list only
async with _download_lock:
# Double-check in case another worker refreshed
try:
# Double-check: if another worker already refreshed the IXP file
# while we were waiting for the lock, load it regardless of the
# general should_refresh flag.
if IXP_PICKLE_FILE.exists():
# Double-check: if another worker already refreshed the IXP file
# while we were waiting for the lock, verify it's actually fresh
if IXP_PICKLE_FILE.exists():
# Re-check if data is still expired after waiting for lock
should_refresh_postlock, reason_postlock = should_refresh_data()
if not should_refresh_postlock:
try:
with open(IXP_PICKLE_FILE, "rb") as f:
parsed = pickle.load(f)
if parsed and isinstance(parsed, list) and len(parsed) > 0:
self.ixp_networks = [
(ip_address(net), prefixlen, name)
for net, prefixlen, name in parsed
]
if Settings.debug:
log.debug(
f"Another worker refreshed data while waiting for lock: {reason_postlock}"
)
log.info(
f"Loaded {len(self.ixp_networks)} IXP prefixes from fresh pickle (post-lock)"
)
return True
except Exception as e:
log.warning(
f"Existing optimized pickle is invalid after lock wait: {e}; will attempt to refresh"
)
parsed = None
if not parsed or (isinstance(parsed, list) and len(parsed) == 0):
log.warning(
"Existing optimized pickle is empty after lock wait; will attempt to refresh",
)
else:
self.ixp_networks = [
(ip_address(net), prefixlen, name) for net, prefixlen, name in parsed
]
log.info(
f"Loaded {len(self.ixp_networks)} IXP prefixes from optimized pickle (post-lock)"
)
return True
except Exception:
pass
if Settings.debug:
log.debug(f"Failed to load post-lock pickle: {e}")
else:
if Settings.debug:
log.debug(f"Data still expired after lock wait: {reason_postlock}")
if not httpx:
log.error("httpx not available: cannot download PeeringDB prefixes")
@ -1142,103 +998,93 @@ class IPEnrichmentService:
async def lookup_ip(self, ip_str: str) -> IPInfo:
"""Lookup an IP address and return ASN or IXP information."""
# Try to load IXP data, but continue even if the load fails. We still
# want to perform on-demand bgp.tools lookups for IPs when local data
# is missing; failing to load the IXP file should not prevent remote
# lookups.
try:
if not self.ixp_networks:
# Attempt a non-blocking pickle load only; don't trigger
# downloads or acquire locks while handling requests.
self._try_load_pickle()
except Exception:
log.debug("Non-blocking data load failed; continuing with on-demand lookups")
if Settings.debug:
log.debug("Non-blocking data load failed; continuing with on-demand lookups")
# Ensure lookup optimization is done
self._optimize_lookups()
log.debug(
f"Looking up IP {ip_str} - have {len(self.cidr_networks)} CIDR entries, {len(self.asn_info)} ASN entries"
)
if Settings.debug:
log.debug(
f"Looking up IP {ip_str} - have {len(self.cidr_networks)} CIDR entries, {len(self.asn_info)} ASN entries"
)
try:
target_ip = ip_address(ip_str)
except ValueError:
log.debug(f"Invalid IP address: {ip_str}")
if Settings.debug:
log.debug(f"Invalid IP address: {ip_str}")
return IPInfo(ip_str)
# Check if it's a private/reserved/loopback address
# Check private/reserved addresses
if target_ip.is_private or target_ip.is_reserved or target_ip.is_loopback:
log.debug(f"IP {ip_str} is in private/reserved range - returning AS0 'Private'")
if Settings.debug:
log.debug(f"IP {ip_str} is in private/reserved range")
return IPInfo(ip_str, asn=0, asn_name="Private", prefix="Private Network")
# First check IXP networks (more specific usually)
# Check IXP networks first
for net_addr, prefixlen, ixp_name in self.ixp_networks:
try:
network = ip_network(f"{net_addr}/{prefixlen}", strict=False)
if target_ip in network:
log.debug(f"Found IXP match for {ip_str}: {ixp_name}")
if Settings.debug:
log.debug(f"Found IXP match for {ip_str}: {ixp_name}")
return IPInfo(ip_str, is_ixp=True, ixp_name=ixp_name)
except Exception:
continue
# Fast integer-based lookup for ASN
# Fast integer-based ASN lookup
target_int = int(target_ip)
if isinstance(target_ip, IPv4Address):
# Use optimized IPv4 lookup
for net_int, mask_bits, asn, cidr_string in self._ipv4_networks:
if (target_int >> mask_bits) == (net_int >> mask_bits):
asn_data = self.asn_info.get(asn, {})
asn_name = asn_data.get("name", f"AS{asn}")
country = asn_data.get("country", "")
log.debug(
f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}"
)
if Settings.debug:
log.debug(
f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}"
)
return IPInfo(
ip_str, asn=asn, asn_name=asn_name, prefix=cidr_string, country=country
)
# Not found in local tables - do an on-demand query to bgp.tools
try:
asn, asn_name, prefix = await self._query_bgp_tools_for_ip(ip_str)
if asn:
# Update asn_info cache (best-effort)
try:
self.asn_info[int(asn)] = {"name": asn_name or f"AS{asn}", "country": ""}
except Exception:
pass
return IPInfo(ip_str, asn=asn, asn_name=asn_name, prefix=prefix)
except Exception:
pass
# Not found locally - try one-off query
try:
asn, asn_name, prefix = asyncio.get_event_loop().run_until_complete(
self._query_bgp_tools_for_ip(ip_str)
)
if asn:
try:
self.asn_info[int(asn)] = {"name": asn_name or f"AS{asn}", "country": ""}
except Exception:
pass
return IPInfo(ip_str, asn=asn, asn_name=asn_name, prefix=prefix)
except Exception:
pass
else:
# Use optimized IPv6 lookup
for net_int, mask_bits, asn, cidr_string in self._ipv6_networks:
if (target_int >> mask_bits) == (net_int >> mask_bits):
asn_data = self.asn_info.get(asn, {})
asn_name = asn_data.get("name", f"AS{asn}")
country = asn_data.get("country", "")
log.debug(
f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}"
)
if Settings.debug:
log.debug(
f"Found ASN match for {ip_str}: AS{asn} ({asn_name}) in {cidr_string}"
)
return IPInfo(
ip_str, asn=asn, asn_name=asn_name, prefix=cidr_string, country=country
)
# No match found - return AS0 with "Unknown" to indicate missing data
log.debug(f"No enrichment data found for {ip_str} - returning AS0 'Unknown'")
# Query bgp.tools for unknown IPs
try:
asn, asn_name, prefix = await self._query_bgp_tools_for_ip(ip_str)
if asn:
# Cache result
try:
self.asn_info[int(asn)] = {"name": asn_name or f"AS{asn}", "country": ""}
except Exception:
pass
if Settings.debug:
log.debug(f"BGP.tools lookup for {ip_str}: AS{asn} ({asn_name})")
return IPInfo(ip_str, asn=asn, asn_name=asn_name, prefix=prefix)
except Exception as e:
if Settings.debug:
log.debug(f"BGP.tools lookup failed for {ip_str}: {e}")
# No match found
if Settings.debug:
log.debug(f"No enrichment data found for {ip_str}")
return IPInfo(ip_str, asn=0, asn_name="Unknown")
async def lookup_asn_name(self, asn: int) -> str:
@ -1447,9 +1293,9 @@ async def lookup_asns_bulk(asns: t.List[t.Union[str, int]]) -> t.Dict[str, t.Dic
return results
async def refresh_ip_enrichment_data(force: bool = False) -> bool:
async def refresh_ip_enrichment_data() -> bool:
"""Manually refresh IP enrichment data."""
log.info(f"Manual refresh requested (force={force})")
log.info("Manual refresh requested")
# Respect configuration: if IP enrichment is disabled, do not attempt
# to refresh or download PeeringDB data. This prevents manual or UI-
# triggered refreshes from hitting the network when the feature is
@ -1470,7 +1316,7 @@ async def refresh_ip_enrichment_data(force: bool = False) -> bool:
# avoid silently ignoring an admin's request.
pass
return await _service.ensure_data_loaded(force_refresh=force)
return await _service.ensure_data_loaded()
def get_data_status() -> dict:

View file

@ -46,13 +46,13 @@ class StructuredIpEnrichment(HyperglassModel):
to explicitly disable them.
"""
cache_timeout: int = 86400 # 24 hours in seconds (minimum)
cache_timeout: int = 604800 # 7 days in seconds (minimum)
@field_validator("cache_timeout")
def validate_cache_timeout(cls, value: int) -> int:
"""Ensure cache timeout is at least 24 hours (86400 seconds)."""
if value < 86400:
return 86400
"""Ensure cache timeout is at least 7 days (604800 seconds)."""
if value < 604800:
return 604800
return value
enrich_traceroute: bool = True

View file

@ -320,374 +320,117 @@ class MikrotikTracerouteTable(MikrotikBase):
@classmethod
def parse_text(cls, text: str, target: str, source: str) -> "MikrotikTracerouteTable":
"""Parse MikroTik traceroute output.
"""Parse a cleaned MikroTik traceroute table.
MikroTik shows multiple complete tables over time as it builds the traceroute:
The input is expected to be a single, clean traceroute table that has already
been processed by the garbage cleaner to remove paging artifacts and duplicates.
Columns: ADDRESS, LOSS, SENT, LAST, AVG, BEST, WORST, STD-DEV
# ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV
1 10.0.0.41 0% 1 0.5ms 0.5 0.5 0.5 0
2 185.73.201.193 0% 1 0.4ms 0.4 0.4 0.4 0
3 46.31.76.111 0% 1 0.5ms 0.5 0.5 0.5 0
4 0% 1 0ms
-- [Q quit|C-z pause]
Columns: ADDRESS, LOSS, SENT, LAST, AVG, BEST, WORST, STD-DEV
# ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV
1 10.0.0.41 0% 1 0.5ms 0.5 0.5 0.5 0
2 185.73.201.193 0% 1 0.4ms 0.4 0.4 0.4 0
...more tables...
We need to find the LAST/NEWEST table and use that as the final result.
Expected format:
ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV STATUS
197.157.67.233 0% 3 0.4ms 0.2 0.1 0.4 0.1
100% 3 timeout
41.78.188.153 0% 3 210.8ms 210.8 210.8 210.9 0
"""
_log = log.bind(parser="MikrotikTracerouteTable")
# Minimal input summary to avoid excessive logs while keeping context
_log.debug(
"Parsing MikroTik traceroute",
"Parsing cleaned MikroTik traceroute table",
target=target,
source=source,
lines=len(text.splitlines()),
)
# Try to extract target from the traceroute command in the output
# Look for patterns like: "tool traceroute src-address=192.168.1.1 timeout=1 duration=30 count=3 8.8.8.8"
lines = text.split("\n")
extracted_target = target # Default to passed target
lines = text.strip().split("\n")
hops = []
hop_number = 1
for line in lines[:10]: # Check first 10 lines for command
line = line.strip()
if line.startswith("tool traceroute") or "traceroute" in line:
# Extract target from command line - it's typically the last argument
parts = line.split()
for part in reversed(parts):
# Skip parameters with = signs and common flags
if (
"=" not in part
and not part.startswith("-")
and not part.startswith("[")
and part
not in ["tool", "traceroute", "src-address", "timeout", "duration", "count"]
):
# This looks like a target (IP or hostname)
if len(part) > 3: # Reasonable minimum length
extracted_target = part
break
# Find the table header to start parsing from
header_found = False
data_start_index = 0
for i, line in enumerate(lines):
if "ADDRESS" in line and "LOSS" in line and "SENT" in line:
header_found = True
data_start_index = i + 1
break
# Use extracted target if found, otherwise keep the passed target
if extracted_target != target:
_log.info(
f"Updated target from '{target}' to '{extracted_target}' based on command output"
)
target = extracted_target
lines = text.strip().split("\n")
# Find all table starts - handle both formats:
# Format 1: "Columns: ADDRESS, LOSS, SENT..." (newer format with hop numbers)
# Format 2: "ADDRESS LOSS SENT..." (older format, no hop numbers)
table_starts = []
for i, line in enumerate(lines):
if ("Columns:" in line and "ADDRESS" in line) or (
"ADDRESS" in line
and "LOSS" in line
and "SENT" in line
and not line.strip().startswith(("1", "2", "3", "4", "5", "6", "7", "8", "9"))
):
table_starts.append(i)
if not table_starts:
_log.warning("No traceroute table headers found in output")
if not header_found:
_log.warning("No traceroute table header found in cleaned output")
return MikrotikTracerouteTable(target=target, source=source, hops=[])
# Take the LAST table (newest/final results)
last_table_start = table_starts[-1]
_log.debug(
"Found traceroute tables",
tables_found=len(table_starts),
last_table_start=last_table_start,
)
# Determine format by checking the header line
header_line = lines[last_table_start].strip()
is_columnar_format = "Columns:" in header_line
_log.debug("Header determined", header=header_line, columnar=is_columnar_format)
# Parse only the last table
hops = []
in_data_section = False
current_hop_number = 1 # Track the current hop number
hop_counter = 1 # For old format without hop numbers
# Start from the last table header
for i in range(last_table_start, len(lines)):
original_line = lines[i] # Keep original line with whitespace
line = original_line.strip() # Stripped version for most processing
# Parse data rows
for i in range(data_start_index, len(lines)):
line = lines[i].strip()
# Skip empty lines
if not line:
continue
# Skip the column header lines
if (
("Columns:" in line)
or ("ADDRESS" in line and "LOSS" in line and "SENT" in line)
or line.startswith("#")
):
in_data_section = True
continue
# Skip paging prompts
# Stop at any remaining paging markers (shouldn't happen with cleaned input)
if "-- [Q quit|C-z pause]" in line:
break # End of this table
break
if in_data_section and line:
# Process data line
try:
# Define helper function for RTT parsing
def parse_rtt(rtt_str: str) -> t.Optional[float]:
if rtt_str in ("timeout", "-", "0ms"):
return None
# Remove 'ms' suffix and convert to float
rtt_clean = re.sub(r"ms$", "", rtt_str)
try:
return float(rtt_clean)
except ValueError:
return None
try:
# Parse data line
parts = line.split()
if len(parts) < 3:
continue
# Check if this is a timeout/continuation line (starts with whitespace, has % and numbers)
# Use original_line to check for leading whitespace
if (
(original_line.startswith(" ") or original_line.startswith("\t"))
and "%" in line
and ("timeout" in line or "0ms" in line)
):
# This is a timeout/continuation hop
parts = line.split()
# Check if this is a timeout line (starts with percentage)
if parts[0].endswith("%"):
# Timeout hop: "100% 3 timeout"
ip_address = None
loss_pct = int(parts[0].rstrip("%"))
sent_count = int(parts[1])
last_rtt = None
avg_rtt = None
best_rtt = None
worst_rtt = None
else:
# Normal hop: "197.157.67.233 0% 3 0.4ms 0.2 0.1 0.4 0.1"
ip_address = parts[0]
if len(parts) < 7:
continue
if len(parts) >= 2 and parts[0].endswith("%"):
ip_address = None
loss_pct = int(parts[0].rstrip("%"))
sent_count = int(parts[1])
loss_pct = int(parts[1].rstrip("%"))
sent_count = int(parts[2])
if "timeout" in parts:
last_rtt_str = "timeout"
avg_rtt_str = "timeout"
best_rtt_str = "timeout"
worst_rtt_str = "timeout"
else:
last_rtt_str = parts[2] if len(parts) > 2 else "0ms"
avg_rtt_str = "0"
best_rtt_str = "0"
worst_rtt_str = "0"
# Create timeout hop
hop = MikrotikTracerouteHop(
hop_number=current_hop_number,
ip_address=ip_address,
hostname=None,
loss_pct=loss_pct,
sent_count=sent_count,
last_rtt=parse_rtt(last_rtt_str),
avg_rtt=parse_rtt(avg_rtt_str),
best_rtt=parse_rtt(best_rtt_str),
worst_rtt=parse_rtt(worst_rtt_str),
)
hops.append(hop)
current_hop_number += 1
continue
if is_columnar_format:
# New format: "1 10.0.0.41 0% 1 0.5ms 0.5 0.5 0.5 0"
parts = line.split()
if len(parts) < 3:
continue
continue
hop_number = int(parts[0])
# Check if there's an IP address or if it's empty (timeout hop)
if len(parts) >= 8 and not parts[1].endswith("%"):
# Normal hop with IP address
ip_address = parts[1] if parts[1] else None
loss_pct = int(parts[2].rstrip("%"))
sent_count = int(parts[3])
last_rtt_str = parts[4]
avg_rtt_str = parts[5]
best_rtt_str = parts[6]
worst_rtt_str = parts[7]
elif len(parts) >= 4 and parts[1].endswith("%"):
# Timeout hop without IP address
ip_address = None
loss_pct = int(parts[1].rstrip("%"))
sent_count = int(parts[2])
last_rtt_str = parts[3] if len(parts) > 3 else "timeout"
avg_rtt_str = "timeout"
best_rtt_str = "timeout"
worst_rtt_str = "timeout"
else:
continue
continue
else:
# Old format: "196.60.8.198 0% 1 17.1ms 17.1 17.1 17.1 0"
# We need to deduplicate by taking the LAST occurrence of each IP
parts = line.split()
if len(parts) < 6:
continue
continue
ip_address = parts[0] if not parts[0].endswith("%") else None
# Check for truncated IPv6 addresses
if ip_address and (ip_address.endswith("...") or ip_address.endswith("..")):
_log.warning(
"Truncated IP address detected, setting to None",
line=i,
ip=ip_address,
)
ip_address = None
if ip_address:
loss_pct = int(parts[1].rstrip("%"))
sent_count = int(parts[2])
last_rtt_str = parts[3]
avg_rtt_str = parts[4]
best_rtt_str = parts[5]
worst_rtt_str = parts[6] if len(parts) > 6 else parts[5]
else:
# Timeout line or truncated address
if parts[0].endswith("%"):
# Normal timeout line
loss_pct = int(parts[0].rstrip("%"))
sent_count = int(parts[1])
else:
# Truncated address - extract stats from remaining parts
loss_pct = int(parts[1].rstrip("%"))
sent_count = int(parts[2])
last_rtt_str = "timeout"
avg_rtt_str = "timeout"
best_rtt_str = "timeout"
worst_rtt_str = "timeout"
# Convert timing values
# Parse RTT values
def parse_rtt(rtt_str: str) -> t.Optional[float]:
if rtt_str in ("timeout", "-", "0ms", "*"):
return None
# Remove 'ms' suffix and convert to float
rtt_clean = re.sub(r"ms$", "", rtt_str)
try:
return float(rtt_clean)
except ValueError:
return None
if is_columnar_format:
# Use hop number from the data and update our tracker
final_hop_number = hop_number
current_hop_number = max(current_hop_number, hop_number + 1)
else:
# Use sequential numbering for old format
final_hop_number = hop_counter
hop_counter += 1
last_rtt = parse_rtt(parts[3])
avg_rtt = parse_rtt(parts[4])
best_rtt = parse_rtt(parts[5])
worst_rtt = parse_rtt(parts[6])
hop_obj = MikrotikTracerouteHop(
hop_number=final_hop_number,
ip_address=ip_address,
hostname=None, # MikroTik doesn't do reverse DNS by default
loss_pct=loss_pct,
sent_count=sent_count,
last_rtt=parse_rtt(last_rtt_str),
avg_rtt=parse_rtt(avg_rtt_str),
best_rtt=parse_rtt(best_rtt_str),
worst_rtt=parse_rtt(worst_rtt_str),
)
hop = MikrotikTracerouteHop(
hop_number=hop_number,
ip_address=ip_address,
hostname=None, # MikroTik doesn't do reverse DNS by default
loss_pct=loss_pct,
sent_count=sent_count,
last_rtt=last_rtt,
avg_rtt=avg_rtt,
best_rtt=best_rtt,
worst_rtt=worst_rtt,
)
hops.append(hop_obj)
hops.append(hop)
hop_number += 1
except (ValueError, IndexError) as e:
_log.debug("Failed to parse traceroute data line", line=line, error=str(e))
continue
# Snapshot before deduplication
orig_hop_count = len(hops)
# For old format, we need to deduplicate by IP and take only final stats
if not is_columnar_format and hops:
_log.debug("Old format detected - deduplicating entries", total_entries=len(hops))
# Group by IP address and take the HIGHEST SENT count (final stats)
ip_to_final_hop = {}
ip_to_max_sent = {}
hop_order = []
for hop in hops:
# Use IP address if available, otherwise use hop position for timeouts
if hop.ip_address:
ip_key = hop.ip_address
else:
# No IP address means timeout hop
ip_key = f"timeout_{hop.hop_number}"
# Track first appearance order
if ip_key not in hop_order:
hop_order.append(ip_key)
ip_to_max_sent[ip_key] = 0
# Keep hop with highest SENT count (most recent/final stats)
if hop.sent_count and hop.sent_count >= ip_to_max_sent[ip_key]:
ip_to_max_sent[ip_key] = hop.sent_count
ip_to_final_hop[ip_key] = hop
# Rebuild hops list with final stats and correct hop numbers
final_hops = []
for i, ip_key in enumerate(hop_order, 1):
final_hop = ip_to_final_hop[ip_key]
final_hop.hop_number = i # Correct hop numbering
final_hops.append(final_hop)
hops = final_hops
_log.debug(
"Deduplication complete",
before=orig_hop_count,
after=len(hops),
)
# Filter excessive timeout hops ONLY at the end (no more valid hops after)
# Find the last hop with a valid IP address
last_valid_hop_index = -1
for i, hop in enumerate(hops):
if hop.ip_address is not None and hop.loss_pct < 100:
last_valid_hop_index = i
filtered_hops = []
trailing_timeouts = 0
for i, hop in enumerate(hops):
if i > last_valid_hop_index and hop.ip_address is None and hop.loss_pct == 100:
# This is a trailing timeout hop (after the last valid hop)
trailing_timeouts += 1
if trailing_timeouts <= 3: # Only keep first 3 trailing timeouts
filtered_hops.append(hop)
else:
# drop extra trailing timeouts
continue
else:
# This is either a valid hop or a timeout hop with valid hops after it
filtered_hops.append(hop)
# Renumber the filtered hops
for i, hop in enumerate(filtered_hops, 1):
hop.hop_number = i
hops = filtered_hops
if last_valid_hop_index >= 0:
_log.debug(
"Filtered trailing timeouts",
last_valid_index=last_valid_hop_index,
trailing_timeouts_removed=max(0, orig_hop_count - len(hops)),
)
except (ValueError, IndexError) as e:
_log.debug("Failed to parse traceroute data line", line=line, error=str(e))
continue
result = MikrotikTracerouteTable(target=target, source=source, hops=hops)
_log.info("Parsed traceroute final table", hops=len(hops))
_log.info("Parsed cleaned traceroute table", hops=len(hops))
return result
def traceroute_result(self):

View file

@ -51,7 +51,11 @@ class MikrotikGarbageOutput(OutputPlugin):
stripped = line.strip()
# Skip empty lines and interactive paging prompts
if not stripped or "-- [Q quit|C-z pause]" in stripped or "-- [Q quit|D dump|C-z pause]" in stripped:
if (
not stripped
or "-- [Q quit|C-z pause]" in stripped
or "-- [Q quit|D dump|C-z pause]" in stripped
):
continue
# Skip command echo lines
@ -126,7 +130,9 @@ class MikrotikGarbageOutput(OutputPlugin):
else:
sent = 0
is_timeout = "timeout" in row.lower() or ("100%" in row and "timeout" in row.lower())
is_timeout = "timeout" in row.lower() or (
"100%" in row and "timeout" in row.lower()
)
# Prefer higher SENT, then prefer non-timeout, then later table (higher ti)
pick = False
@ -150,7 +156,9 @@ class MikrotikGarbageOutput(OutputPlugin):
# Collapse excessive trailing timeouts into an aggregation line
trailing_timeouts = 0
for line in reversed(processed_lines):
if ("timeout" in line.lower()) or (sent_re.search(line) and sent_re.search(line).group(1) == "100"):
if ("timeout" in line.lower()) or (
sent_re.search(line) and sent_re.search(line).group(1) == "100"
):
trailing_timeouts += 1
else:
break
@ -158,13 +166,20 @@ class MikrotikGarbageOutput(OutputPlugin):
if trailing_timeouts > 3:
non_trailing = len(processed_lines) - trailing_timeouts
# Keep first 2 of trailing timeouts and aggregate the rest
aggregated = processed_lines[:non_trailing] + processed_lines[non_trailing:non_trailing + 2]
aggregated = (
processed_lines[:non_trailing] + processed_lines[non_trailing : non_trailing + 2]
)
remaining = trailing_timeouts - 2
aggregated.append(f" ... ({remaining} more timeout hops)")
aggregated.append(
f" ... ({remaining} more timeout hops)"
)
processed_lines = aggregated
# Prepend header line if we have one
header_to_use = header_line or "ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV STATUS"
header_to_use = (
header_line
or "ADDRESS LOSS SENT LAST AVG BEST WORST STD-DEV STATUS"
)
cleaned = [header_to_use] + processed_lines
return "\n".join(cleaned)

View file

@ -8,6 +8,7 @@ from pydantic import PrivateAttr, ValidationError
# Project
from hyperglass.log import log, log as _log
from hyperglass.settings import Settings
from hyperglass.exceptions.private import ParsingError
from hyperglass.models.parsing.mikrotik import MikrotikTracerouteTable
from hyperglass.state import use_state
@ -27,15 +28,11 @@ def _normalize_output(output: t.Union[str, t.Sequence[str]]) -> t.List[str]:
return [output]
return list(output)
def _clean_traceroute_only(
output: t.Union[str, t.Sequence[str]], query: "Query"
) -> t.Union[str, t.Tuple[str, ...]]:
"""Run only the traceroute-specific cleaner and return same-shaped result.
This calls the internal _clean_traceroute_output method on the
MikrotikGarbageOutput plugin so the cleaned traceroute text is used
as the 'raw' output exposed to clients.
"""
"""Clean traceroute output using MikrotikGarbageOutput plugin."""
from .mikrotik_garbage_output import MikrotikGarbageOutput
out_list = _normalize_output(output)
@ -46,7 +43,6 @@ def _clean_traceroute_only(
try:
cleaned_piece = cleaner._clean_traceroute_output(piece)
except Exception:
# If cleaner fails for any piece, fall back to the original piece
cleaned_piece = piece
cleaned_list.append(cleaned_piece)
@ -58,42 +54,32 @@ def _clean_traceroute_only(
def parse_mikrotik_traceroute(
output: t.Union[str, t.Sequence[str]], target: str, source: str
) -> "OutputDataModel":
"""Parse a MikroTik traceroute text response."""
result = None
"""Parse a cleaned MikroTik traceroute text response."""
out_list = _normalize_output(output)
_log = log.bind(plugin=TraceroutePluginMikrotik.__name__)
combined_output = "\n".join(out_list)
# Minimal summary of the input - avoid dumping full raw output to logs
contains_paging = "-- [Q quit|C-z pause]" in combined_output
contains_multiple_tables = combined_output.count("ADDRESS") > 1
_log.debug(
"Received traceroute plugin input",
target=target,
source=source,
pieces=len(out_list),
combined_len=len(combined_output),
contains_paging=contains_paging,
multiple_tables=contains_multiple_tables,
)
if Settings.debug:
_log.debug(
"Parsing cleaned traceroute input",
target=target,
source=source,
pieces=len(out_list),
combined_len=len(combined_output),
)
try:
# Pass the entire combined output to the parser at once
validated = MikrotikTracerouteTable.parse_text(combined_output, target, source)
result = validated.traceroute_result()
# Store the CLEANED output (after garbage removal) for "Copy Raw" functionality
# This is the processed output from MikrotikGarbageOutput plugin, not the original raw router output
result.raw_output = combined_output
# Concise structured logging for result
_log.debug(
"Parsed traceroute result",
hops=len(validated.hops),
target=result.target,
source=result.source,
)
if Settings.debug:
_log.debug(
"Parsed traceroute result",
hops=len(validated.hops),
target=result.target,
source=result.source,
)
except ValidationError as err:
_log.critical(err)
@ -114,38 +100,31 @@ class TraceroutePluginMikrotik(OutputPlugin):
def process(self, *, output: "OutputType", query: "Query") -> "OutputDataModel":
"""Process the MikroTik traceroute output."""
# Extract target from query
target = getattr(query, "target", "unknown")
source = getattr(query, "source", "unknown")
# Try to get target from query_target which is more reliable
if hasattr(query, "query_target") and query.query_target:
target = str(query.query_target)
if hasattr(query, "device") and query.device:
source = getattr(query.device, "name", source)
_log = log.bind(plugin=TraceroutePluginMikrotik.__name__)
# Debug: emit the raw response exactly as returned by the router.
# Do not transform, join, or normalize the output — log it verbatim.
try:
# Ensure the router output is embedded in the log message body so it
# is visible regardless of the logger's formatter configuration.
if isinstance(output, (tuple, list)):
try:
combined_raw = "\n".join(output)
except Exception:
# Fall back to repr if join fails for non-string elements
combined_raw = repr(output)
else:
combined_raw = output if isinstance(output, str) else repr(output)
# Log raw router output only when debug is enabled
if Settings.debug:
try:
if isinstance(output, (tuple, list)):
try:
combined_raw = "\n".join(output)
except Exception:
combined_raw = repr(output)
else:
combined_raw = output if isinstance(output, str) else repr(output)
# Log the full verbatim router response (DEBUG level).
_log.debug("Router raw output:\n{}", combined_raw)
except Exception:
# Don't let logging interfere with normal processing
_log.exception("Failed to log router raw output")
_log.debug("Router raw output:\n{}", combined_raw)
except Exception:
_log.exception("Failed to log router raw output")
try:
params = use_state("params")
@ -154,14 +133,33 @@ class TraceroutePluginMikrotik(OutputPlugin):
device = getattr(query, "device", None)
# Check if structured output is enabled
if device is None:
if Settings.debug:
_log.debug("No device found, using cleanup-only mode")
return _clean_traceroute_only(output, query)
else:
if params is None:
return _clean_traceroute_only(output, query)
if not getattr(params, "structured", None):
return _clean_traceroute_only(output, query)
if getattr(params.structured, "enable_for_traceroute", None) is False:
return _clean_traceroute_only(output, query)
return parse_mikrotik_traceroute(output, target, source)
if params is None:
if Settings.debug:
_log.debug("No params found, using cleanup-only mode")
return _clean_traceroute_only(output, query)
if not getattr(params, "structured", None):
if Settings.debug:
_log.debug("Structured output not configured, using cleanup-only mode")
return _clean_traceroute_only(output, query)
if getattr(params.structured, "enable_for_traceroute", None) is False:
if Settings.debug:
_log.debug("Structured output disabled for traceroute, using cleanup-only mode")
return _clean_traceroute_only(output, query)
if Settings.debug:
_log.debug("Processing traceroute with structured output enabled")
# Clean the output first using garbage cleaner before parsing
cleaned_output = _clean_traceroute_only(output, query)
if Settings.debug:
_log.debug("Applied garbage cleaning before structured parsing")
return parse_mikrotik_traceroute(cleaned_output, target, source)

View file

@ -36,22 +36,70 @@ class ZTracerouteIpEnrichment(OutputPlugin):
def _reverse_dns_lookup(self, ip: str) -> t.Optional[str]:
"""Perform reverse DNS lookup for an IP address."""
from hyperglass.settings import Settings
try:
hostname = socket.gethostbyaddr(ip)[0]
log.debug(f"Reverse DNS for {ip}: {hostname}")
if Settings.debug:
log.debug(f"Reverse DNS for {ip}: {hostname}")
return hostname
except (socket.herror, socket.gaierror, socket.timeout) as e:
log.debug(f"Reverse DNS lookup failed for {ip}: {e}")
if Settings.debug:
log.debug(f"Reverse DNS lookup failed for {ip}: {e}")
return None
async def _reverse_dns_lookup_async(self, ip: str) -> t.Optional[str]:
"""Async wrapper around synchronous reverse DNS lookup using a thread.
Uses asyncio.to_thread to avoid blocking the event loop and allows
multiple lookups to be scheduled concurrently.
"""
try:
return await asyncio.to_thread(self._reverse_dns_lookup, ip)
except Exception as e:
from hyperglass.settings import Settings
if Settings.debug:
log.debug(f"Reverse DNS async lookup error for {ip}: {e}")
return None
async def _enrich_async(self, output: TracerouteResult) -> None:
"""Async helper to enrich traceroute data."""
"""Async helper to enrich traceroute data.
This performs IP enrichment (ASN lookups), ASN organization lookups,
and then runs reverse DNS lookups concurrently for hops missing hostnames.
"""
# First enrich with IP information (ASN numbers)
await output.enrich_with_ip_enrichment()
# Then enrich ASN numbers with organization names
await output.enrich_asn_organizations()
# Concurrent reverse DNS for hops missing hostnames
ips_to_lookup: list[str] = [
hop.ip_address for hop in output.hops if hop.ip_address and hop.hostname is None
]
if not ips_to_lookup:
return
# Schedule lookups concurrently
tasks = [asyncio.create_task(self._reverse_dns_lookup_async(ip)) for ip in ips_to_lookup]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Apply results back to hops in order
idx = 0
for hop in output.hops:
if hop.ip_address and hop.hostname is None:
res = results[idx]
idx += 1
if isinstance(res, Exception):
from hyperglass.settings import Settings
if Settings.debug:
log.debug(f"Reverse DNS lookup raised for {hop.ip_address}: {res}")
else:
hop.hostname = res
def process(self, *, output: "OutputDataModel", query: "Query") -> "OutputDataModel":
"""Enrich structured traceroute data with IP enrichment and reverse DNS information."""
@ -59,9 +107,13 @@ class ZTracerouteIpEnrichment(OutputPlugin):
return output
_log = log.bind(plugin=self.__class__.__name__)
_log.debug(f"Starting IP enrichment for {len(output.hops)} traceroute hops")
# Check if IP enrichment is enabled in config
# Import Settings for debug gating
from hyperglass.settings import Settings
_log.info(
f"Starting IP enrichment for {len(output.hops)} traceroute hops"
) # Check if IP enrichment is enabled in config
try:
from hyperglass.state import use_state
@ -73,14 +125,62 @@ class ZTracerouteIpEnrichment(OutputPlugin):
or not params.structured.ip_enrichment.enrich_traceroute
or getattr(params.structured, "enable_for_traceroute", None) is False
):
_log.debug("IP enrichment for traceroute disabled in configuration")
if Settings.debug:
_log.debug("IP enrichment for traceroute disabled in configuration")
# Still do reverse DNS if enrichment is disabled
for hop in output.hops:
if hop.ip_address and hop.hostname is None:
hop.hostname = self._reverse_dns_lookup(hop.ip_address)
# Perform concurrent reverse DNS lookups for hops needing hostnames
ips = [
hop.ip_address for hop in output.hops if hop.ip_address and hop.hostname is None
]
if ips:
try:
# Run lookups in an event loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# We're inside an event loop; run tasks via asyncio.run in thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
lambda: asyncio.run(
asyncio.gather(
*[self._reverse_dns_lookup_async(ip) for ip in ips]
)
)
)
results = future.result()
else:
results = loop.run_until_complete(
asyncio.gather(
*[self._reverse_dns_lookup_async(ip) for ip in ips]
)
)
except RuntimeError:
results = asyncio.run(
asyncio.gather(*[self._reverse_dns_lookup_async(ip) for ip in ips])
)
# Apply results
idx = 0
for hop in output.hops:
if hop.ip_address and hop.hostname is None:
res = results[idx]
idx += 1
if not isinstance(res, Exception):
hop.hostname = res
except Exception as e:
if Settings.debug:
_log.debug(
f"Concurrent reverse DNS failed (fallback to sequential): {e}"
)
for hop in output.hops:
if hop.ip_address and hop.hostname is None:
hop.hostname = self._reverse_dns_lookup(hop.ip_address)
return output
except Exception as e:
_log.debug(f"Could not check IP enrichment config: {e}")
if Settings.debug:
_log.debug(f"Could not check IP enrichment config: {e}")
# Use the built-in enrichment method from TracerouteResult
try:
@ -100,14 +200,11 @@ class ZTracerouteIpEnrichment(OutputPlugin):
except RuntimeError:
# No event loop, create one
asyncio.run(self._enrich_async(output))
_log.debug("IP enrichment completed successfully")
_log.info("IP enrichment completed successfully")
except Exception as e:
_log.error(f"IP enrichment failed: {e}")
# Add reverse DNS lookups for any hops that don't have hostnames
for hop in output.hops:
if hop.ip_address and hop.hostname is None:
hop.hostname = self._reverse_dns_lookup(hop.ip_address)
# Reverse DNS lookups already handled in _enrich_async for missing hostnames
_log.debug(f"Completed enrichment for traceroute to {output.target}")
_log.info(f"Completed enrichment for traceroute to {output.target}")
return output