fix ununsed import

This commit is contained in:
checktheroads 2020-01-16 02:51:10 -07:00
parent 7aed7aff5b
commit b901568997
4 changed files with 457 additions and 413 deletions

View file

@ -16,6 +16,7 @@ from hyperglass.configuration.models import routers as _routers
from hyperglass.constants import LOG_HANDLER
from hyperglass.constants import LOG_HANDLER_FILE
from hyperglass.constants import LOG_LEVELS
from hyperglass.constants import Supported
from hyperglass.exceptions import ConfigError
from hyperglass.exceptions import ConfigInvalid
from hyperglass.exceptions import ConfigMissing
@ -234,7 +235,10 @@ def _build_frontend_devices():
"location": device.location,
"network": device.network.display_name,
"display_name": device.display_name,
"vrfs": [vrf.display_name for vrf in device.vrfs],
"vrfs": [
{"id": vrf.name, "display_name": vrf.display_name}
for vrf in device.vrfs
],
}
)
elif device.name not in frontend_dict:
@ -242,7 +246,10 @@ def _build_frontend_devices():
"location": device.location,
"network": device.network.display_name,
"display_name": device.display_name,
"vrfs": [vrf.display_name for vrf in device.vrfs],
"vrfs": [
{"id": vrf.name, "display_name": vrf.display_name}
for vrf in device.vrfs
],
}
if not frontend_dict:
raise ConfigError(error_msg="Unable to build network to device mapping")
@ -258,37 +265,87 @@ def _build_networks():
Returns:
{dict} -- Networks & devices
"""
networks_dict = {}
for device in devices.routers:
if device.network.display_name in networks_dict:
networks_dict[device.network.display_name].append(
{
"location": device.location,
"hostname": device.name,
"display_name": device.display_name,
"vrfs": [vrf.name for vrf in device.vrfs],
}
)
elif device.network.display_name not in networks_dict:
networks_dict[device.network.display_name] = [
{
"location": device.location,
"hostname": device.name,
"display_name": device.display_name,
"vrfs": [vrf.name for vrf in device.vrfs],
}
]
if not networks_dict:
networks = []
_networks = list(set({device.network.display_name for device in devices.routers}))
for _network in _networks:
network_def = {"display_name": _network, "locations": []}
for device in devices.routers:
if device.network.display_name == _network:
network_def["locations"].append(
{
"name": device.name,
"location": device.location,
"display_name": device.display_name,
"network": device.network.display_name,
"vrfs": [
{"id": vrf.name, "display_name": vrf.display_name}
for vrf in device.vrfs
],
}
)
networks.append(network_def)
if not networks:
raise ConfigError(error_msg="Unable to build network to device mapping")
return networks_dict
return networks
_frontend_fields = {
"general": {"debug", "request_timeout"},
"branding": {"text"},
"messages": ...,
}
frontend_params = params.dict(include=_frontend_fields)
def _build_vrfs():
vrfs = []
for device in devices.routers:
for vrf in device.vrfs:
vrf_dict = {"id": vrf.name, "display_name": vrf.display_name}
if vrf_dict not in vrfs:
vrfs.append(vrf_dict)
return vrfs
def _build_queries():
"""Build a dict of supported query types and their display names.
Returns:
{dict} -- Supported query dict
"""
queries = []
for query in Supported.query_types:
display_name = getattr(params.branding.text, query)
queries.append({"name": query, "display_name": display_name})
return queries
vrfs = _build_vrfs()
queries = _build_queries()
networks = _build_networks()
frontend_networks = _build_frontend_networks()
frontend_devices = _build_frontend_devices()
_frontend_fields = {
"general": {
"debug",
"primary_asn",
"request_timeout",
"org_name",
"google_analytics",
"opengraph",
"site_descriptin",
},
"branding": ...,
"features": {
"bgp_route": {"enable"},
"bgp_community": {"enable"},
"bgp_aspath": {"enable"},
"ping": {"enable"},
"traceroute": {"enable"},
},
"messages": ...,
}
_frontend_params = params.dict(include=_frontend_fields)
_frontend_params.update(
{
"queries": queries,
"devices": frontend_devices,
"networks": networks,
"vrfs": vrfs,
}
)
frontend_params = _frontend_params

View file

@ -25,6 +25,112 @@ LOG_HANDLER = {"sink": sys.stdout, "format": LOG_FMT, "level": "INFO"}
LOG_HANDLER_FILE = {"format": LOG_FMT, "level": "INFO"}
DEFAULT_TERMS = """
---
template: footer
---
By using {{ branding.site_name }}, you agree to be bound by the following terms of \
use: All queries executed on this page are logged for analysis and troubleshooting. \
Users are prohibited from automating queries, or attempting to process queries in \
bulk. This service is provided on a best effort basis, and {{ general.org_name }} \
makes no availability or performance warranties or guarantees whatsoever.
"""
DEFAULT_DETAILS = {
"bgp_aspath": r"""
---
template: bgp_aspath
title: Supported AS Path Patterns
---
{{ branding.site_name }} accepts the following `AS_PATH` regular expression patterns:
| Expression | Match |
| :------------------- | :-------------------------------------------- |
| `_65000$` | Originated by 65000 |
| `^65000_` | Received from 65000 |
| `_65000_` | Via 65000 |
| `_65000_65001_` | Via 65000 and 65001 |
| `_65000(_.+_)65001$` | Anything from 65001 that passed through 65000 |
""",
"bgp_community": """
---
template: bgp_community
title: BGP Communities
---
{{ branding.site_name }} makes use of the following BGP communities:
| Community | Description |
| :-------- | :---------- |
| `65000:1` | Example 1 |
| `65000:2` | Example 2 |
| `65000:3` | Example 3 |
""",
}
DEFAULT_INFO = {
"bgp_route": """
---
template: bgp_route
---
Performs BGP table lookup based on IPv4/IPv6 prefix.
""",
"bgp_community": """
---
template: bgp_community
---
Performs BGP table lookup based on <a href="https://tools.ietf.org/html/rfc4360" target\
="_blank">Extended</a> or <a href="https://tools.ietf.org/html/rfc8195" target=\
"_blank">Large</a> community value.
""",
"bgp_aspath": """
---
template: bgp_aspath
---
Performs BGP table lookup based on `AS_PATH` regular expression.
""",
"ping": """
---
template: ping
---
Sends 5 ICMP echo requests to the target.
""",
"traceroute": """
---
template: traceroute
---
Performs UDP Based traceroute to the target.<br>For information about how to \
interpret traceroute results, <a href="https://hyperglass.readthedocs.io/en/latest/ass\
ets/traceroute_nanog.pdf" target="_blank">click here</a>.
""",
}
DEFAULT_HELP = """
---
template: default_help
---
##### BGP Route
Performs BGP table lookup based on IPv4/IPv6 prefix.
<hr>
##### BGP Community
Performs BGP table lookup based on <a href="https://tools.ietf.org/html/rfc4360" target\
="_blank">Extended</a> or <a href="https://tools.ietf.org/html/rfc8195" target=\
"_blank">Large</a> community value.
<hr>
##### BGP AS Path
Performs BGP table lookup based on `AS_PATH` regular expression.
<hr>
##### Ping
Sends 5 ICMP echo requests to the target.
<hr>
##### Traceroute
Performs UDP Based traceroute to the target.<br>For information about how to \
interpret traceroute results, <a href="https://hyperglass.readthedocs.io/en/latest/ass\
ets/traceroute_nanog.pdf" target="_blank">click here</a>.
"""
class Supported:
"""Define items supported by hyperglass.

View file

@ -2,7 +2,6 @@
# Standard Library Imports
import asyncio
import operator
import os
import tempfile
import time
@ -16,7 +15,7 @@ from prometheus_client import Counter
from prometheus_client import generate_latest
from prometheus_client import multiprocess
from sanic import Sanic
from sanic import response
from sanic import response as sanic_response
from sanic.exceptions import InvalidUsage
from sanic.exceptions import NotFound
from sanic.exceptions import ServerError
@ -26,10 +25,8 @@ from sanic_limiter import RateLimitExceeded
from sanic_limiter import get_remote_address
# Project Imports
from hyperglass.command.execute import Execute
from hyperglass.configuration import devices
from hyperglass.configuration import frontend_params
from hyperglass.configuration import params
from hyperglass.constants import Supported
from hyperglass.exceptions import AuthError
from hyperglass.exceptions import DeviceTimeout
from hyperglass.exceptions import HyperglassError
@ -38,6 +35,8 @@ from hyperglass.exceptions import InputNotAllowed
from hyperglass.exceptions import ResponseEmpty
from hyperglass.exceptions import RestError
from hyperglass.exceptions import ScrapeError
from hyperglass.execution.execute import Execute
from hyperglass.models.query import Query
from hyperglass.render import render_html
from hyperglass.util import check_python
from hyperglass.util import cpu_count
@ -46,7 +45,7 @@ from hyperglass.util import log
# Verify Python version meets minimum requirement
try:
python_version = check_python()
log.info(f"Python {python_version} detected.")
log.info(f"Python {python_version} detected")
except RuntimeError as r:
raise HyperglassError(str(r), alert="danger") from None
@ -152,6 +151,21 @@ count_notfound = Counter(
)
@app.middleware("request")
async def request_middleware(request):
"""Respond to OPTIONS methods."""
if request.method == "OPTIONS": # noqa: R503
return sanic_response.json({"content": "ok"}, status=204)
@app.middleware("response")
async def response_middleware(request, response):
"""Add CORS headers to responses."""
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add("Access-Control-Allow-Headers", "Content-Type")
response.headers.add("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
@app.route("/metrics")
@limiter.exempt
async def metrics(request):
@ -159,7 +173,7 @@ async def metrics(request):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
latest = generate_latest(registry)
return response.text(
return sanic_response.text(
latest,
headers={
"Content-Type": CONTENT_TYPE_LATEST,
@ -183,7 +197,7 @@ async def handle_frontend_errors(request, exception):
request.json.get("target"),
).inc()
log.error(f'Error: {error["message"]}, Source: {client_addr}')
return response.json(
return sanic_response.json(
{"output": error["message"], "alert": alert, "keywords": error["keywords"]},
status=400,
)
@ -204,7 +218,7 @@ async def handle_backend_errors(request, exception):
request.json.get("target"),
).inc()
log.error(f'Error: {error["message"]}, Source: {client_addr}')
return response.json(
return sanic_response.json(
{"output": error["message"], "alert": alert, "keywords": error["keywords"]},
status=503,
)
@ -218,7 +232,7 @@ async def handle_404(request, exception):
client_addr = get_remote_address(request)
count_notfound.labels(exception, path, client_addr).inc()
log.error(f"Error: {exception}, Path: {path}, Source: {client_addr}")
return response.html(html, status=404)
return sanic_response.html(html, status=404)
@app.exception(RateLimitExceeded)
@ -228,7 +242,7 @@ async def handle_429(request, exception):
client_addr = get_remote_address(request)
count_ratelimit.labels(exception, client_addr).inc()
log.error(f"Error: {exception}, Source: {client_addr}")
return response.html(html, status=429)
return sanic_response.html(html, status=429)
@app.exception(ServerError)
@ -238,7 +252,7 @@ async def handle_500(request, exception):
count_errors.labels(500, exception, client_addr, None, None, None).inc()
log.error(f"Error: {exception}, Source: {client_addr}")
html = render_html("500")
return response.html(html, status=500)
return sanic_response.html(html, status=500)
async def clear_cache():
@ -251,187 +265,25 @@ async def clear_cache():
raise HyperglassError(f"Error clearing cache: {error_exception}")
@app.route("/", methods=["GET"])
@app.route("/", methods=["GET", "OPTIONS"])
@limiter.limit(rate_limit_site, error_message="Site")
async def site(request):
"""Serve main application front end."""
return response.html(render_html("form", primary_asn=params.general.primary_asn))
html = await render_html("form", primary_asn=params.general.primary_asn)
return sanic_response.html(html)
async def validate_input(query_data): # noqa: C901
"""Delete any globally unsupported query parameters.
@app.route("/config", methods=["GET", "OPTIONS"])
async def frontend_config(request):
"""Provide validated user/default config for front end consumption.
Performs validation functions per input type:
- query_target:
- Verifies input is not empty
- Verifies input is a string
- query_location:
- Verfies input is not empty
- Verifies input is a list
- Verifies locations in list are defined
- query_type:
- Verifies input is not empty
- Verifies input is a string
- Verifies query type is enabled and supported
- query_vrf: (if feature enabled)
- Verfies input is a list
- Verifies VRFs in list are defined
Returns:
{dict} -- Filtered configuration
"""
# Delete any globally unsupported parameters
supported_query_data = {
k: v for k, v in query_data.items() if k in Supported.query_parameters
}
# Unpack query data
query_location = supported_query_data.get("query_location", "")
query_type = supported_query_data.get("query_type", "")
query_target = supported_query_data.get("query_target", "")
query_vrf = supported_query_data.get("query_vrf", "")
device = getattr(devices, query_location)
# Verify that query_target is not empty
if not query_target:
log.debug("No input specified")
raise InvalidUsage(
{
"message": params.messages.no_input.format(
field=params.branding.text.query_target
),
"alert": "warning",
"keywords": [params.branding.text.query_target],
}
)
# Verify that query_target is a string
if not isinstance(query_target, str):
log.debug("Target is not a string")
raise InvalidUsage(
{
"message": params.messages.invalid_field.format(
input=query_target, field=params.branding.text.query_target
),
"alert": "warning",
"keywords": [params.branding.text.query_target, query_target],
}
)
# Verify that query_location is not empty
if not query_location:
log.debug("No selection specified")
raise InvalidUsage(
{
"message": params.messages.no_input.format(
field=params.branding.text.query_location
),
"alert": "warning",
"keywords": [params.branding.text.query_location],
}
)
# Verify that query_location is a string
if not isinstance(query_location, str):
log.debug("Query Location is not a string")
raise InvalidUsage(
{
"message": params.messages.invalid_field.format(
input=query_location, field=params.branding.text.query_location
),
"alert": "warning",
"keywords": [params.branding.text.query_location, query_location],
}
)
# Verify that locations in query_location are actually defined
if query_location not in devices.hostnames:
raise InvalidUsage(
{
"message": params.messages.invalid_field.format(
input=query_location, field=params.branding.text.query_location
),
"alert": "warning",
"keywords": [params.branding.text.query_location, query_location],
}
)
# Verify that query_type is not empty
if not query_type:
log.debug("No query specified")
raise InvalidUsage(
{
"message": params.messages.no_input.format(
field=params.branding.text.query_type
),
"alert": "warning",
"keywords": [params.branding.text.query_location],
}
)
if not isinstance(query_type, str):
log.debug("Query Type is not a string")
raise InvalidUsage(
{
"message": params.messages.invalid_field.format(
input=query_type, field=params.branding.text.query_type
),
"alert": "warning",
"keywords": [params.branding.text.query_type, query_type],
}
)
# Verify that query_type is actually supported
query_is_supported = Supported.is_supported_query(query_type)
if not query_is_supported:
log.debug("Query not supported")
raise InvalidUsage(
{
"message": params.messages.invalid_field.format(
input=query_type, field=params.branding.text.query_type
),
"alert": "warning",
"keywords": [params.branding.text.query_location, query_type],
}
)
elif query_is_supported:
query_is_enabled = operator.attrgetter(f"{query_type}.enable")(params.features)
if not query_is_enabled:
raise InvalidUsage(
{
"message": params.messages.invalid_field.format(
input=query_type, field=params.branding.text.query_type
),
"alert": "warning",
"keywords": [params.branding.text.query_location, query_type],
}
)
# Verify that query_vrf is a string
if query_vrf and not isinstance(query_vrf, str):
raise InvalidUsage(
{
"message": params.messages.invalid_field.format(
input=query_vrf, field=params.branding.text.query_vrf
),
"alert": "warning",
"keywords": [params.branding.text.query_vrf, query_vrf],
}
)
# Verify that vrfs in query_vrf are defined
if query_vrf and not any(vrf in query_vrf for vrf in devices.display_vrfs):
raise InvalidUsage(
{
"message": params.messages.vrf_not_associated.format(
vrf_name=query_vrf, device_name=device.display_name
),
"alert": "warning",
"keywords": [query_vrf, query_location],
}
)
# If VRF display name from UI/API matches a configured display name, set the
# query_vrf value to the configured VRF key name
if query_vrf:
for vrf in device.vrfs:
if vrf.display_name == query_vrf:
supported_query_data["query_vrf"] = vrf.name
if not query_vrf:
supported_query_data["query_vrf"] = "default"
log.debug(f"Validated Query: {supported_query_data}")
return supported_query_data
return sanic_response.json(frontend_params)
@app.route("/query", methods=["POST"])
@app.route("/query", methods=["POST", "OPTIONS"])
@limiter.limit(
rate_limit_query,
error_message={
@ -452,7 +304,11 @@ async def hyperglass_main(request):
log.debug(f"Unvalidated input: {raw_query_data}")
# Perform basic input validation
query_data = await validate_input(raw_query_data)
# query_data = await validate_input(raw_query_data)
try:
query_data = Query(**raw_query_data)
except InputInvalid as he:
raise InvalidUsage(he.__dict__())
# Get client IP address for Prometheus logging & rate limiting
client_addr = get_remote_address(request)
@ -460,18 +316,17 @@ async def hyperglass_main(request):
# Increment Prometheus counter
count_data.labels(
client_addr,
query_data.get("query_type"),
query_data.get("query_location"),
query_data.get("query_target"),
query_data.get("query_vrf"),
query_data.query_type,
query_data.query_location,
query_data.query_target,
query_data.query_vrf,
).inc()
log.debug(f"Client Address: {client_addr}")
# Stringify the form response containing serialized JSON for the
# request, use as key for k/v cache store so each command output
# value is unique
cache_key = str(query_data)
# Use hashed query_data string as key for for k/v cache store so
# each command output value is unique.
cache_key = hash(query_data)
# Define cache entry expiry time
cache_timeout = params.features.cache.timeout
@ -479,7 +334,8 @@ async def hyperglass_main(request):
# Check if cached entry exists
if not await r_cache.get(cache_key):
log.debug(f"Sending query {cache_key} to execute module...")
log.debug(f"Created new cache key {cache_key} entry for query {query_data}")
log.debug("Beginning query execution...")
# Pass request to execution module
try:
@ -516,4 +372,4 @@ async def hyperglass_main(request):
log.debug(f"Cache match for: {cache_key}, returning cached entry")
log.debug(f"Cache Output: {response_output}")
return response.json({"output": response_output}, status=200)
return sanic_response.json({"output": response_output}, status=200)

View file

@ -6,196 +6,126 @@ from pathlib import Path
# Third Party Imports
import jinja2
import yaml
from aiofile import AIOFile
from markdown2 import Markdown
# Project Imports
from hyperglass.configuration import devices
from hyperglass.configuration import networks
from hyperglass.configuration import params
from hyperglass.constants import DEFAULT_DETAILS
from hyperglass.constants import DEFAULT_HELP
from hyperglass.constants import DEFAULT_TERMS
from hyperglass.exceptions import ConfigError
from hyperglass.exceptions import HyperglassError
from hyperglass.util import log
# Module Directories
working_directory = Path(__file__).resolve().parent
hyperglass_root = working_directory.parent
file_loader = jinja2.FileSystemLoader(str(working_directory))
env = jinja2.Environment(
loader=file_loader, autoescape=True, extensions=["jinja2.ext.autoescape"]
WORKING_DIR = Path(__file__).resolve().parent
JINJA_LOADER = jinja2.FileSystemLoader(str(WORKING_DIR))
JINJA_ENV = jinja2.Environment(
loader=JINJA_LOADER,
autoescape=True,
extensions=["jinja2.ext.autoescape"],
enable_async=True,
)
default_details = {
"footer": """
---
template: footer
---
By using {{ branding.site_name }}, you agree to be bound by the following terms of \
use: All queries executed on this page are logged for analysis and troubleshooting. \
Users are prohibited from automating queries, or attempting to process queries in \
bulk. This service is provided on a best effort basis, and {{ general.org_name }} \
makes no availability or performance warranties or guarantees whatsoever.
""",
"bgp_aspath": r"""
---
template: bgp_aspath
title: Supported AS Path Patterns
---
{{ branding.site_name }} accepts the following `AS_PATH` regular expression patterns:
| Expression | Match |
| :------------------- | :-------------------------------------------- |
| `_65000$` | Originated by 65000 |
| `^65000_` | Received from 65000 |
| `_65000_` | Via 65000 |
| `_65000_65001_` | Via 65000 and 65001 |
| `_65000(_.+_)65001$` | Anything from 65001 that passed through 65000 |
""",
"bgp_community": """
---
template: bgp_community
title: BGP Communities
---
{{ branding.site_name }} makes use of the following BGP communities:
| Community | Description |
| :-------- | :---------- |
| `65000:1` | Example 1 |
| `65000:2` | Example 2 |
| `65000:3` | Example 3 |
""",
}
default_info = {
"bgp_route": """
---
template: bgp_route
---
Performs BGP table lookup based on IPv4/IPv6 prefix.
""",
"bgp_community": """
---
template: bgp_community
---
Performs BGP table lookup based on <a href="https://tools.ietf.org/html/rfc4360" target\
="_blank">Extended</a> or <a href="https://tools.ietf.org/html/rfc8195" target=\
"_blank">Large</a> community value.
""",
"bgp_aspath": """
---
template: bgp_aspath
---
Performs BGP table lookup based on `AS_PATH` regular expression.
""",
"ping": """
---
template: ping
---
Sends 5 ICMP echo requests to the target.
""",
"traceroute": """
---
template: traceroute
---
Performs UDP Based traceroute to the target.<br>For information about how to \
interpret traceroute results, <a href="https://hyperglass.readthedocs.io/en/latest/ass\
ets/traceroute_nanog.pdf" target="_blank">click here</a>.
""",
_MD_CONFIG = {
"extras": {
"break-on-newline": True,
"code-friendly": True,
"tables": True,
"html-classes": {"table": "table"},
}
}
MARKDOWN = Markdown(**_MD_CONFIG)
default_help = """
---
template: default_help
---
##### BGP Route
Performs BGP table lookup based on IPv4/IPv6 prefix.
<hr>
##### BGP Community
Performs BGP table lookup based on <a href="https://tools.ietf.org/html/rfc4360" target\
="_blank">Extended</a> or <a href="https://tools.ietf.org/html/rfc8195" target=\
"_blank">Large</a> community value.
<hr>
##### BGP AS Path
Performs BGP table lookup based on `AS_PATH` regular expression.
<hr>
##### Ping
Sends 5 ICMP echo requests to the target.
<hr>
##### Traceroute
Performs UDP Based traceroute to the target.<br>For information about how to \
interpret traceroute results, <a href="https://hyperglass.readthedocs.io/en/latest/ass\
ets/traceroute_nanog.pdf" target="_blank">click here</a>.
"""
async def parse_md(raw_file):
file_list = raw_file.split("---", 2)
file_list_len = len(file_list)
if file_list_len == 1:
fm = {}
content = file_list[0]
elif file_list_len == 3 and file_list[1].strip():
try:
fm = yaml.safe_load(file_list[1])
except yaml.YAMLError as ye:
raise ConfigError(str(ye)) from None
content = file_list[2]
else:
fm = {}
content = ""
return (fm, content)
def generate_markdown(section, file_name=None):
"""Render markdown as HTML.
Arguments:
section {str} -- Section name
Keyword Arguments:
file_name {str} -- Markdown file name (default: {None})
Raises:
HyperglassError: Raised if YAML front matter is unreadable
Returns:
{dict} -- Frontmatter dictionary
"""
if section == "help":
file = working_directory.joinpath("templates/info/help.md")
if file.exists():
with file.open(mode="r") as file_raw:
yaml_raw = file_raw.read()
else:
yaml_raw = default_help
elif section == "details":
file = working_directory.joinpath(f"templates/info/details/{file_name}.md")
if file.exists():
with file.open(mode="r") as file_raw:
yaml_raw = file_raw.read()
else:
yaml_raw = default_details[file_name]
_, frontmatter, content = yaml_raw.split("---", 2)
html_classes = {"table": "table"}
markdown = Markdown(
extras={
"break-on-newline": True,
"code-friendly": True,
"tables": True,
"html-classes": html_classes,
}
)
frontmatter_rendered = (
jinja2.Environment(
loader=jinja2.BaseLoader,
autoescape=True,
extensions=["jinja2.ext.autoescape"],
)
.from_string(frontmatter)
.render(params)
)
if frontmatter_rendered:
frontmatter_loaded = yaml.safe_load(frontmatter_rendered)
elif not frontmatter_rendered:
frontmatter_loaded = {"frontmatter": None}
content_rendered = (
jinja2.Environment(
loader=jinja2.BaseLoader,
autoescape=True,
extensions=["jinja2.ext.autoescape"],
)
.from_string(content)
.render(params, info=frontmatter_loaded)
)
help_dict = dict(content=markdown.convert(content_rendered), **frontmatter_loaded)
if not help_dict:
raise HyperglassError(f"Error reading YAML frontmatter for {file_name}")
return help_dict
async def get_file(path_obj):
async with AIOFile(path_obj, "r") as raw_file:
file = await raw_file.read()
return file
def render_html(template_name, **kwargs):
async def render_help():
if params.branding.help_menu.file is not None:
help_file = await get_file(params.branding.help_menu.file)
else:
help_file = DEFAULT_HELP
fm, content = await parse_md(help_file)
content_template = JINJA_ENV.from_string(content)
content_rendered = await content_template.render_async(params, info=fm)
return {"content": MARKDOWN.convert(content_rendered), **fm}
async def render_terms():
if params.branding.terms.file is not None:
terms_file = await get_file(params.branding.terms.file)
else:
terms_file = DEFAULT_TERMS
fm, content = await parse_md(terms_file)
content_template = JINJA_ENV.from_string(content)
content_rendered = await content_template.render_async(params, info=fm)
return {"content": MARKDOWN.convert(content_rendered), **fm}
async def render_details():
details = []
for vrf in devices.vrf_objects:
detail = {"name": vrf.name, "display_name": vrf.display_name}
info_attrs = ("bgp_aspath", "bgp_community")
command_info = []
for attr in info_attrs:
file = getattr(vrf.info, attr)
if file is not None:
raw_content = await get_file(file)
fm, content = await parse_md(raw_content)
else:
fm, content = await parse_md(DEFAULT_DETAILS[attr])
content_template = JINJA_ENV.from_string(content)
content_rendered = await content_template.render_async(params, info=fm)
content_html = MARKDOWN.convert(content_rendered)
command_info.append(
{
"id": f"{vrf.name}-{attr}",
"name": attr,
"frontmatter": fm,
"content": content_html,
}
)
detail.update({"commands": command_info})
details.append(detail)
return details
async def render_html(template_name, **kwargs):
"""Render Jinja2 HTML templates.
Arguments:
@ -207,23 +137,118 @@ def render_html(template_name, **kwargs):
Returns:
{str} -- Rendered template
"""
details_name_list = ["footer", "bgp_aspath", "bgp_community"]
details_dict = {}
for details_name in details_name_list:
details_data = generate_markdown("details", details_name)
details_dict.update({details_name: details_data})
rendered_help = generate_markdown("help")
log.debug(rendered_help)
try:
template_file = f"templates/{template_name}.html.j2"
template = env.get_template(template_file)
return template.render(
params,
rendered_help=rendered_help,
details=details_dict,
networks=networks,
**kwargs,
)
template = JINJA_ENV.get_template(template_file)
except jinja2.TemplateNotFound as template_error:
log.error(f"Error rendering Jinja2 template {Path(template_file).resolve()}.")
log.error(
f"Error rendering Jinja2 template {str(Path(template_file).resolve())}."
)
raise HyperglassError(template_error)
rendered_help = await render_help()
rendered_terms = await render_terms()
rendered_details = await render_details()
sub_templates = {
"details": rendered_details,
"help": rendered_help,
"terms": rendered_terms,
"networks": networks,
**kwargs,
}
return await template.render_async(params, **sub_templates)
# async def generate_markdown(section, file_name=None):
# """Render markdown as HTML.
# Arguments:
# section {str} -- Section name
# Keyword Arguments:
# file_name {str} -- Markdown file name (default: {None})
# Raises:
# HyperglassError: Raised if YAML front matter is unreadable
# Returns:
# {dict} -- Frontmatter dictionary
# """
# if section == "help" and params.branding.help_menu.file is not None:
# info = await get_file(params.branding.help_menu.file)
# elif section == "help" and params.branding.help_menu.file is None:
# info = DEFAULT_HELP
# elif section == "details":
# file = WORKING_DIR.joinpath(f"templates/info/details/{file_name}.md")
# if file.exists():
# with file.open(mode="r") as file_raw:
# yaml_raw = file_raw.read()
# else:
# yaml_raw = DEFAULT_DETAILS[file_name]
# _, frontmatter, content = yaml_raw.split("---", 2)
# md_config = {
# "extras": {
# "break-on-newline": True,
# "code-friendly": True,
# "tables": True,
# "html-classes": {"table": "table"},
# }
# }
# markdown = Markdown(**md_config)
# frontmatter_rendered = JINJA_ENV.from_string(frontmatter).render(params)
# if frontmatter_rendered:
# frontmatter_loaded = yaml.safe_load(frontmatter_rendered)
# elif not frontmatter_rendered:
# frontmatter_loaded = {"frontmatter": None}
# content_rendered = await JINJA_ENV.from_string(content).render_async(
# params, info=frontmatter_loaded
# )
# help_dict = dict(content=markdown.convert(content_rendered), **frontmatter_loaded)
# if not help_dict:
# raise HyperglassError(f"Error reading YAML frontmatter for {file_name}")
# return help_dict
# async def render_html(template_name, **kwargs):
# """Render Jinja2 HTML templates.
# Arguments:
# template_name {str} -- Jinja2 template name
# Raises:
# HyperglassError: Raised if template is not found
# Returns:
# {str} -- Rendered template
# """
# detail_items = ("footer", "bgp_aspath", "bgp_community")
# details = {}
# for details_name in detail_items:
# details_data = await generate_markdown("details", details_name)
# details.update({details_name: details_data})
# rendered_help = await generate_markdown("help")
# try:
# template_file = f"templates/{template_name}.html.j2"
# template = JINJA_ENV.get_template(template_file)
# except jinja2.TemplateNotFound as template_error:
# log.error(f"Error rendering Jinja2 template {Path(template_file).resolve()}.")
# raise HyperglassError(template_error)
# return await template.render_async(
# params,
# rendered_help=rendered_help,
# details=details,
# networks=networks,
# **kwargs,
# )