forked from mirrors/thatmattlove-hyperglass
235 lines
13 KiB
Python
235 lines
13 KiB
Python
import sys
|
|
import logging
|
|
import toml
|
|
import re
|
|
from netaddr import *
|
|
|
|
# Local imports
|
|
import vars
|
|
|
|
log = logging.getLogger(__name__)
|
|
# Load TOML config file
|
|
devices = toml.load(open("./config/devices.toml"))
|
|
|
|
# Load TOML commands file
|
|
commands = toml.load(open("./config/commands.toml"))
|
|
|
|
# Filter config to router list
|
|
routers_list = devices["router"]
|
|
|
|
# Receives JSON from Flask, constucts the command that will be passed to the router
|
|
# Also handles input validation & error handling
|
|
def cmd_construct(router, cmd, ipprefix):
|
|
inputParams = router, cmd, ipprefix
|
|
log.warning(*inputParams)
|
|
try:
|
|
# Loop through routers config file, match input router with configured routers, set variables
|
|
for r in routers_list:
|
|
try:
|
|
if router == r["address"]:
|
|
type = r["type"]
|
|
src_addr_ipv4 = r["src_addr_ipv4"]
|
|
src_addr_ipv6 = r["src_addr_ipv6"]
|
|
# Loop through commands config file, set variables for matched commands
|
|
for nos in commands:
|
|
if type == nos:
|
|
nos = commands[type]
|
|
nos_commands = nos[0]
|
|
# Dual stack commands (agnostic of IPv4/IPv6)
|
|
dual_commands = nos_commands["dual"]
|
|
# IPv4 Specific Commands
|
|
ipv4_commands = nos_commands["ipv4"]
|
|
# IPv6 Specific Commands
|
|
ipv6_commands = nos_commands["ipv6"]
|
|
if cmd == "Query Type":
|
|
msg = "You must select a query type."
|
|
code = 415
|
|
log.error(msg, code, *inputParams)
|
|
return (msg, code, *inputParams)
|
|
# BGP Community Query
|
|
elif cmd in ["bgp_community"]:
|
|
# Extended Communities, new-format
|
|
if re.match("^([0-9]{0,5})\:([0-9]{1,5})$", ipprefix):
|
|
for a, c in dual_commands.items():
|
|
if a == cmd:
|
|
command = c.format(target=ipprefix)
|
|
msg = "{i} matched new-format community.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (msg, code, router, type, command)
|
|
# Extended Communities, 32 bit format
|
|
elif re.match("^[0-9]{1,10}$", ipprefix):
|
|
for a, c in dual_commands.items():
|
|
if a == cmd:
|
|
command = c.format(target=ipprefix)
|
|
msg = "{i} matched 32 bit community.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (msg, code, router, type, command)
|
|
# RFC 8092 Large Community Support
|
|
elif re.match(
|
|
"^([0-9]{1,10})\:([0-9]{1,10})\:[0-9]{1,10}$",
|
|
ipprefix,
|
|
):
|
|
for a, c in dual_commands.items():
|
|
if a == cmd:
|
|
command = c.format(target=ipprefix)
|
|
msg = "{i} matched large community.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (msg, code, router, type, command)
|
|
else:
|
|
msg = "{i} is an invalid BGP Community Format.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 415
|
|
log.error(msg, code, *inputParams)
|
|
return (msg, code, *inputParams)
|
|
# BGP AS_PATH Query
|
|
elif cmd in ["bgp_aspath"]:
|
|
if re.match(".*", ipprefix):
|
|
for a, c in dual_commands.items():
|
|
if a == cmd:
|
|
command = c.format(target=ipprefix)
|
|
msg = "{i} matched AS_PATH regex.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (msg, code, router, type, command)
|
|
else:
|
|
msg = "{i} is an invalid AS_PATH regex.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 415
|
|
log.error(msg, code, *inputParams)
|
|
return (msg, code, *inputParams)
|
|
# BGP Route Query
|
|
elif cmd in ["bgp_route"]:
|
|
try:
|
|
# Use netaddr library to verify if input is a valid IPv4 address or prefix
|
|
if IPNetwork(ipprefix).ip.version == 4:
|
|
for a, c in ipv4_commands.items():
|
|
if a == cmd:
|
|
command = c.format(target=ipprefix)
|
|
msg = "{i} is a valid IPv4 Adddress.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (
|
|
msg,
|
|
code,
|
|
router,
|
|
type,
|
|
command,
|
|
)
|
|
# Use netaddr library to verify if input is a valid IPv6 address or prefix
|
|
elif IPNetwork(ipprefix).ip.version == 6:
|
|
for a, c in ipv6_commands.items():
|
|
if a == cmd:
|
|
command = c.format(target=ipprefix)
|
|
msg = "{i} is a valid IPv6 Adddress.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (
|
|
msg,
|
|
code,
|
|
router,
|
|
type,
|
|
command,
|
|
)
|
|
# Exception from netaddr library will return a user-facing error
|
|
except:
|
|
msg = "{i} is an invalid IP Address.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 415
|
|
log.error(msg, code, *inputParams)
|
|
return (msg, code, *inputParams)
|
|
# Ping/Traceroute
|
|
elif cmd in ["ping", "traceroute"]:
|
|
try:
|
|
if IPNetwork(ipprefix).ip.version == 4:
|
|
for a, c in ipv4_commands.items():
|
|
if a == cmd:
|
|
command = c.format(
|
|
target=ipprefix,
|
|
src_addr_ipv4=src_addr_ipv4,
|
|
)
|
|
msg = "{i} is a valid IPv4 Adddress.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (
|
|
msg,
|
|
code,
|
|
router,
|
|
type,
|
|
command,
|
|
)
|
|
elif IPNetwork(ipprefix).ip.version == 6:
|
|
for a, c in ipv6_commands.items():
|
|
if a == cmd:
|
|
command = c.format(
|
|
target=ipprefix,
|
|
src_addr_ipv6=src_addr_ipv6,
|
|
)
|
|
msg = "{i} is a valid IPv6 Adddress.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 200
|
|
log.warning(
|
|
msg, code, router, type, command
|
|
)
|
|
return (
|
|
msg,
|
|
code,
|
|
router,
|
|
type,
|
|
command,
|
|
)
|
|
except:
|
|
msg = "{i} is an invalid IP Address.".format(
|
|
i=ipprefix
|
|
)
|
|
code = 415
|
|
log.error(msg, code, *inputParams)
|
|
return (msg, code, *inputParams)
|
|
else:
|
|
msg = "Command {i} not found.".format(i=cmd)
|
|
code = 415
|
|
log.error(msg, code, *inputParams)
|
|
return (msg, code, *inputParams)
|
|
except:
|
|
error_msg = log.error(
|
|
"Input router IP {router} does not match the configured router IP of {ip}".format(
|
|
router=router, ip=r["address"]
|
|
)
|
|
)
|
|
raise ValueError(error_msg)
|
|
except:
|
|
raise
|