Files
morethantext/src/lib.rs
Jeff Baskin 807b9ad456
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled
Removed some dead code.
2026-03-26 13:12:17 -04:00

316 lines
10 KiB
Rust

pub mod action;
mod document;
mod message;
mod mtterror;
pub mod name;
mod queue;
use document::{Clock, CreateDoc, Session};
use isolang::Language;
use message::{Message, MessageAction, MessageID};
use queue::{
data_director::{RegMsg, Register},
router::Queue,
SenderID,
};
use std::{
sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender},
time::Duration,
};
use uuid::Uuid;
pub use action::*;
pub use document::MissingTranslation;
pub use mtterror::{ErrorID, MTTError};
pub use name::{Name, NameType};
pub use queue::data_director::{Include, Path};
#[cfg(test)]
mod support_tests {
use super::*;
use std::time::Duration;
pub static TIMEOUT: Duration = Duration::from_millis(500);
pub fn random_name() -> Name {
Name::english(Uuid::new_v4().to_string().as_str())
}
}
static TIMEOUT: Duration = Duration::from_secs(10);
pub struct MTTClient {
queue: Queue,
rx: Receiver<Message>,
sender_id: SenderID,
session_id: Uuid,
}
impl MTTClient {
fn new(mut queue: Queue, sess_id: Option<String>, lang: Option<Language>) -> Self {
let sess_name = Session::doc_names()[0].clone();
let (tx, rx) = channel();
let sender_id = queue.add_sender(tx);
let msg_id = MessageID::new();
let paths = [
Path::new(
Include::Just(msg_id.clone()),
Include::Just(sess_name.clone().into()),
Include::Just(Action::Records),
),
Path::new(
Include::Just(msg_id.clone()),
Include::All,
Include::Just(Action::Error),
),
];
let mut add = Addition::new(Session::doc_names()[0].clone());
match lang {
Some(language) => {
let field: Field = language.into();
add.add_field(Session::language_field_names()[0].clone(), field);
}
None => {}
}
let msg = Message::default().set_id(msg_id.clone());
for path in paths.iter().cloned() {
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path));
queue.send(msg.set_action(reg_msg));
let result = rx.recv().unwrap();
}
match sess_id {
Some(id) => {
let sess_id = match Uuid::try_from(id.as_str()) {
Ok(data) => {
let mut qry = Query::new(Session::doc_names()[0].clone());
let mut calc = Calculation::new(Operand::Equal);
calc.add_value(CalcValue::Existing(FieldType::Uuid))
.unwrap();
calc.add_value(data.clone()).unwrap();
qry.add(Session::id_field_names()[0].clone(), calc);
queue.send(msg.set_action(qry));
}
Err(_) => queue.send(msg.set_action(add.clone())),
};
}
None => queue.send(msg.set_action(add.clone())),
};
let result = rx.recv().unwrap();
let session_id = match result.get_action() {
MsgAction::Records(result) => {
let mut holder = result.clone();
if holder.len() == 0 {
queue.send(msg.set_action(add));
let new_sess = rx.recv().unwrap();
holder = match new_sess.get_action() {
MsgAction::Records(new_holder) => new_holder.clone(),
_ => unreachable!("should only receive session records"),
}
}
let rec = holder.iter().last().unwrap();
match rec.get(Session::id_field_names()[0].clone()).unwrap() {
Field::Uuid(data) => data.clone(),
_ => unreachable!("should always be uuid"),
}
}
_ => unreachable!("should only receive session records"),
};
Self {
queue: queue,
rx: rx,
sender_id: sender_id,
session_id: session_id,
}
}
pub fn session_id(&self) -> String {
self.session_id.to_string()
}
pub fn create_document(&self, docdef: DocDef) -> Result<(), MTTError> {
let msg_id = MessageID::new();
let paths = [
Path::new(
Include::Just(msg_id.clone()),
Include::All,
Include::Just(Action::DocumentCreated),
),
Path::new(
Include::Just(msg_id.clone()),
Include::All,
Include::Just(Action::Error),
),
];
let msg = Message::default()
.set_id(msg_id)
.set_session(self.session_id.clone().into());
for path in paths.iter() {
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
self.queue.send(msg.set_action(reg_msg));
self.rx.recv().unwrap(); // Wait for completion.
}
self.queue.send(msg.set_action(docdef));
match self.rx.recv_timeout(TIMEOUT) {
Ok(data) => match data.get_action() {
MsgAction::DocumentCreated => Ok(()),
MsgAction::Error(err) => Err(err.clone()),
_ => unreachable!("should only receive confirmation or errors"),
},
Err(_) => Err(MTTError::new(ErrorID::TimeOut)),
}
}
pub fn records<UA>(&self, request: UA) -> Result<Records, MTTError>
where
UA: Into<ClientAction>,
{
let req = request.into();
let doc_id = req.doc_name().clone();
let msg_id = MessageID::new();
let paths = [
Path::new(
Include::Just(msg_id.clone()),
Include::Just(doc_id.clone()),
Include::Just(Action::Records),
),
Path::new(
Include::Just(msg_id.clone()),
Include::All,
Include::Just(Action::Error),
),
];
let msg = Message::default()
.set_id(msg_id.clone())
.set_session(self.session_id.clone().into());
for path in paths.iter() {
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
self.queue.send(msg.set_action(reg_msg));
let result = self.rx.recv().unwrap();
let action = result.get_action();
match action {
MsgAction::Register(status) => match status.get_msg() {
RegMsg::Error(err) => {
let mut error = err.clone();
error.add_parent(ErrorID::Document(doc_id.clone()));
return Err(error);
}
_ => {}
},
_ => unreachable!("got {:?} should have been a registry message", action),
}
}
self.queue.send(msg.set_action(req));
match self.rx.recv_timeout(TIMEOUT) {
Ok(data) => match data.get_action() {
MsgAction::Records(data) => Ok(data.clone()),
MsgAction::Error(err) => Err(err.clone()),
_ => unreachable!("should only receive records or errors"),
},
Err(_) => Err(MTTError::new(ErrorID::TimeOut)),
}
}
}
impl Drop for MTTClient {
fn drop(&mut self) {
self.queue.remove_sender(&self.sender_id);
}
}
#[derive(Clone)]
pub struct MoreThanText {
queue: Queue,
}
impl MoreThanText {
pub fn new() -> Self {
let queue = Queue::new();
CreateDoc::start(queue.clone()); // needs to be first.
Clock::start(queue.clone());
Session::start(queue.clone());
Self { queue: queue }
}
pub fn client(&self) -> MTTClient {
MTTClient::new(self.queue.clone(), None, None)
}
pub fn client_with_language(&self, lang: Language) -> MTTClient {
MTTClient::new(self.queue.clone(), None, Some(lang))
}
pub fn client_with_session(&self, id: String, lang: Option<Language>) -> MTTClient {
MTTClient::new(self.queue.clone(), Some(id), lang)
}
pub fn get_document(&self, name: &str, id: &str) -> Result<String, MTTError> {
if name == "page" {
Ok("something".to_string())
} else {
Err(MTTError::new(ErrorID::DocumentNotFound))
}
}
}
pub struct TestMoreThanText {
mtt: MoreThanText,
queue: Queue,
channel: Option<Receiver<Message>>,
}
impl TestMoreThanText {
pub fn new() -> Self {
let mut mtt = MoreThanText::new();
let queue = mtt.queue.clone();
Self {
mtt: mtt,
queue: queue,
channel: None,
}
}
pub fn get_morethantext(&self) -> MoreThanText {
self.mtt.clone()
}
pub fn send_time_pulse(&self) {
let msg = Clock::gen_message();
self.queue.send(msg);
}
pub fn register_channel(&mut self, paths: Vec<Path>) {
let mut queue = self.mtt.queue.clone();
let (tx, rx) = channel();
let sender_id = queue.add_sender(tx);
for path in paths.iter() {
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone()));
queue.send(Message::new(reg_msg));
rx.recv().unwrap(); // Wait for completion.
}
self.channel = Some(rx);
}
pub fn recv(&self) -> Result<Message, RecvTimeoutError> {
match &self.channel {
Some(rx) => rx.recv_timeout(Duration::from_millis(500)),
None => panic!("test environment does not have a channel setup"),
}
}
pub fn get_trigger_records(&self, action: Action) -> Records {
let msg = self.recv().unwrap();
let msg_action = msg.get_action();
if action == msg_action.clone().into() {
match msg_action {
MsgAction::OnAddition(data) => data.clone(),
MsgAction::OnDelete(data) => data.clone(),
MsgAction::OnQuery(data) => data.clone(),
MsgAction::OnUpdate(data) => data.clone(),
_ => panic!("{:?} is not a trigger", action),
}
} else {
panic!("received {:?} instead of {:?} trigger", msg, action);
}
}
}