Jeff Baskin e555acfdb4
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s
Replaced start up tests.
2025-06-22 17:04:37 -04:00

60 lines
1.7 KiB
Python

"""Create a cluster."""
from aiohttp import ClientSession
from release_tests.support import get_port
from release_tests.support.mttserver import MTTServer
from release_tests.support.translate import Translate
ADDR = "127.56.0.1"
class Cluster:
"""cluster of MoreThanText."""
def __init__(self, num=2, transurl=None, transreplies=None):
"""initialization"""
self.num = num
self.translate = Translate(url=transurl, replies=transreplies)
self.session = None
self.servers = []
async def start(self):
"""Start the cluster."""
await self.translate.start()
for _ in range(self.num):
port = get_port()
server = MTTServer("-a", ADDR, "-p", port)
await server.start()
self.servers.append(server)
self.session = ClientSession()
async def start_a_server(self, *args):
"""Creates a single server."""
self.num = 1
server = MTTServer(*args)
await server.start()
self.servers.append(server)
self.session = ClientSession()
async def stop(self):
"""Stops cluster."""
if self.session:
await self.session.close()
await self.translate.stop()
for server in self.servers:
await server.stop()
async def cleanup(self):
"""Cleans up the cluster."""
await self.stop()
for server in self.servers:
await server.cleanup()
async def get(self, url):
"""get from each server in the cluster."""
replies = []
for server in self.servers:
async with self.session.get(f"{server.baseurl}{url}") as resp:
replies.append(resp)
return replies