Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s
57 lines
2.0 KiB
Python
57 lines
2.0 KiB
Python
"""Testing the the cluster."""
|
|
|
|
from unittest import IsolatedAsyncioTestCase
|
|
from uuid import uuid4
|
|
from aiohttp import ClientSession
|
|
from release_tests.support import ADDR
|
|
from release_tests.support.cluster import Cluster
|
|
|
|
|
|
class ClusterTC(IsolatedAsyncioTestCase):
|
|
"""Test for the MoreThanText cluster."""
|
|
|
|
async def test_create_default_cluster(self):
|
|
"""create a complete two server cluster."""
|
|
cluster = Cluster()
|
|
await cluster.start()
|
|
async with ClientSession() as session:
|
|
url = cluster.translate.baseurl
|
|
async with session.get(url) as resp:
|
|
text = await resp.text()
|
|
self.assertEqual(resp.status, 200, text)
|
|
self.assertEqual(len(cluster.servers), 2)
|
|
for server in cluster.servers:
|
|
url = server.baseurl
|
|
async with session.get(url) as resp:
|
|
text = await resp.text()
|
|
self.assertEqual(resp.status, 200, text)
|
|
await cluster.stop()
|
|
|
|
async def test_server_numbers(self):
|
|
"""Control the number of servers."""
|
|
count = 3
|
|
cluster = Cluster(num=count)
|
|
self.assertEqual(len(cluster.servers), count)
|
|
|
|
async def test_set_translation_mocking(self):
|
|
"""Is the translation mocking setup"""
|
|
transurl = f"/{uuid4()}"
|
|
reply = str(uuid4())
|
|
cluster = Cluster(transurl=transurl, transreplies=[reply])
|
|
await cluster.start()
|
|
async with ClientSession() as session:
|
|
url = f"http://{ADDR}:{cluster.translate.port}{transurl}"
|
|
async with session.get(url) as resp:
|
|
text = await resp.text()
|
|
self.assertEqual(resp.status, 200, text)
|
|
await cluster.stop()
|
|
|
|
async def test_get_responses(self):
|
|
"""Get pulls responsre from each server"""
|
|
count = 4
|
|
cluster = Cluster(num=count)
|
|
await cluster.start()
|
|
resp = await cluster.get("/")
|
|
self.assertEqual(len(resp), count)
|
|
await cluster.stop()
|