1
0
Fork 1
mirror of https://github.com/thatmattlove/hyperglass.git synced 2026-01-17 08:48:05 +00:00
thatmattlove-hyperglass/hyperglass/external/tests/test_base.py
Jason Hall 830f300822 Upgraded tooling and testing
Due to changes in tooling from the originals used file formats have changed.

pnpm	10.10.0
rye	0.44.0
ruff	0.11.8

CI is now testing on a matrix of pnpm, node, and python versions. This
will hopefully cover edgecases where users are running various version.
Still needs update to use python version in matrix with `rye`.

Installs OS deps in workflow

Adds 'packages' key in workspace form pnpm 9

Makes testing for BaseExternal configurable

Adds redis and httpbin as service containers

ruff lint changed dictionary comprehensions

adds environment variables for httpbin

Fixes runner to docker communications
2025-05-13 17:55:56 -04:00

67 lines
2.3 KiB
Python

"""Test external http client."""
# Standard Library
import asyncio
import os
# Third Party
import pytest
# Project
from hyperglass.exceptions.private import ExternalError
from hyperglass.models.config.logging import Http
# Local
from .._base import BaseExternal
@pytest.fixture
def httpbin_url():
# get HYPERGLASS_TEST_HTTPBIN
httpbin_host: str = os.environ.get("HYPERGLASS_HTTPBIN_HOST", "httpbin.org")
httpbin_port: int = os.environ.get("HYPERGLASS_HTTPBIN_PORT", 443)
httpbin_protocol: str = os.environ.get("HYPERGLASS_HTTPBIN_PROTOCOL", "https")
url = f"{httpbin_protocol}://{httpbin_host}"
if httpbin_port != 443 and httpbin_port != 80:
url = f"{url}:{httpbin_port}"
return url
@pytest.fixture
def httpbin_config(httpbin_url):
return Http(provider="generic", host=httpbin_url)
def test_base_external_sync(httpbin_url, httpbin_config):
with BaseExternal(base_url=httpbin_url, config=httpbin_config) as client:
res1 = client._get("/get")
res2 = client._get("/get", params={"key": "value"})
res3 = client._post("/post", data={"strkey": "value", "intkey": 1})
assert res1["url"] == f"{httpbin_url}/get"
assert res2["args"].get("key") == "value"
assert res3["json"].get("strkey") == "value"
assert res3["json"].get("intkey") == 1
with pytest.raises(ExternalError):
with BaseExternal(base_url=httpbin_url, config=httpbin_config, timeout=2) as client:
client._get("/delay/4")
async def _run_test_base_external_async(httpbin_url, httpbin_config):
async with BaseExternal(base_url=httpbin_url, config=httpbin_config) as client:
res1 = await client._aget("/get")
res2 = await client._aget("/get", params={"key": "value"})
res3 = await client._apost("/post", data={"strkey": "value", "intkey": 1})
assert res1["url"] == f"{httpbin_url}/get"
assert res2["args"].get("key") == "value"
assert res3["json"].get("strkey") == "value"
assert res3["json"].get("intkey") == 1
with pytest.raises(ExternalError):
async with BaseExternal(base_url=httpbin_url, config=httpbin_config, timeout=2) as client:
await client._get("/delay/4")
def test_base_external_async(httpbin_url, httpbin_config):
asyncio.run(_run_test_base_external_async(httpbin_url, httpbin_config))