1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-05-03 21:26:30 +00:00

Update mikrotik_garbage_output.py

This commit is contained in:
Carlos Santos 2025-09-13 00:16:17 -03:00 committed by GitHub
parent 4432f48f9b
commit 504c0f1463
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -8,6 +8,7 @@ import typing as t
from pydantic import PrivateAttr
# Project
from hyperglass.log import log
from hyperglass.types import Series
# Local
@ -19,10 +20,11 @@ if t.TYPE_CHECKING:
class MikrotikGarbageOutput(OutputPlugin):
"""Parse Mikrotik output to remove garbage."""
"""Parse Mikrotik output to remove garbage before structured parsing."""
_hyperglass_builtin: bool = PrivateAttr(True)
platforms: t.Sequence[str] = ("mikrotik_routeros", "mikrotik_switchos")
platforms: t.Sequence[str] = ("mikrotik_routeros", "mikrotik_switchos", "mikrotik")
# Aplicar a todos os comandos para garantir a limpeza
directives: t.Sequence[str] = (
"__hyperglass_mikrotik_bgp_aspath__",
"__hyperglass_mikrotik_bgp_community__",
@ -32,51 +34,64 @@ class MikrotikGarbageOutput(OutputPlugin):
)
def process(self, *, output: OutputType, query: "Query") -> Series[str]:
"""Parse Mikrotik output to remove garbage."""
"""
Clean raw output from a MikroTik device.
This plugin removes command echoes, prompts, flag legends, and interactive help text.
"""
# O 'output' é uma tupla de strings, onde cada string é a saída de um comando.
# Vamos processar cada uma delas.
cleaned_outputs = []
result = ()
for raw_output in output:
# Se a saída já estiver vazia, não há nada a fazer.
if not raw_output or not raw_output.strip():
cleaned_outputs.append("")
continue
for each_output in output:
if len(each_output) != 0:
if each_output.split()[-1] in ("DISTANCE", "STATUS"):
# Mikrotik shows the columns with no rows if there is no data.
# Rather than send back an empty table, send back an empty
# response which is handled with a warning message.
each_output = ""
else:
remove_lines = ()
all_lines = each_output.splitlines()
# Starting index for rows (after the column row).
start = 1
# Extract the column row.
column_line = " ".join(all_lines[0].split())
# 1. Dividir a saída em linhas para processamento individual.
lines = raw_output.splitlines()
# 2. Filtrar as linhas de "lixo" conhecidas.
filtered_lines = []
in_flags_section = False
for line in lines:
stripped_line = line.strip()
for i, line in enumerate(all_lines[1:]):
# Remove all the newline characters (which differ line to
# line) for comparison purposes.
normalized = " ".join(line.split())
# Ignorar prompts e ecos de comando
if stripped_line.startswith("@") and stripped_line.endswith("] >"):
continue
# Ignorar a linha de ajuda interativa
if "[Q quit|D dump|C-z pause]" in stripped_line:
continue
# Remove ansii characters that aren't caught by Netmiko.
normalized = re.sub(r"\\x1b\[\S{2}\s", "", normalized)
# Iniciar a detecção da seção de Flags
if stripped_line.startswith("Flags:"):
in_flags_section = True
continue # Pula a própria linha "Flags:"
if column_line in normalized:
# Mikrotik often re-inserts the column row in the output,
# effectively 'starting over'. In that case, re-assign
# the column row and starting index to that point.
column_line = re.sub(r"\[\S{2}\s", "", line)
start = i + 2
# Se estivermos na seção de flags, verificar se a linha ainda é parte dela.
# Uma linha de dados de rota real geralmente começa com flags (ex: "Ab") ou é indentada.
# Uma linha da legenda de flags não.
if in_flags_section:
# Se a linha não começar com espaço ou não tiver um "=" (sinal de dado),
# é provável que seja parte da legenda.
# A forma mais segura é procurar pelo fim da legenda.
# A primeira linha de dados real começa com flags ou indentação.
# Vamos assumir que a legenda termina quando encontramos uma linha que contém "=".
if "=" in stripped_line:
in_flags_section = False
else:
continue # Pula as linhas da legenda de flags
if "[Q quit|D dump|C-z pause]" in normalized:
# Remove Mikrotik's unhelpful helpers from the output.
remove_lines += (i + 1,)
filtered_lines.append(line)
# Combine the column row and the data rows from the starting
# index onward.
lines = [column_line, *all_lines[start:]]
# 3. Juntar as linhas limpas de volta em uma única string.
cleaned_output = "\n".join(filtered_lines)
cleaned_outputs.append(cleaned_output)
# Remove any lines marked for removal and re-join with a single
# newline character.
lines = [line for idx, line in enumerate(lines) if idx not in remove_lines]
result += ("\n".join(lines),)
log.debug(f"MikrotikGarbageOutput cleaned {len(output)} output blocks.")
return tuple(cleaned_outputs)
return result