pub mod action; mod document; mod message; mod mtterror; pub mod name; mod queue; use document::{Clock, CreateDoc, Session}; use isolang::Language; use message::{wrapper::Message, MessageAction, MessageID}; use queue::{ data_director::{RegMsg, Register}, router::Queue, SenderID, }; use std::{ sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}, time::Duration, }; use uuid::Uuid; pub use action::*; pub use document::MissingTranslation; pub use mtterror::{ErrorID, MTTError}; pub use name::{Name, NameType}; pub use queue::data_director::{Include, Path}; #[cfg(test)] mod support_tests { use super::*; use std::time::Duration; pub static TIMEOUT: Duration = Duration::from_millis(500); pub fn random_name() -> Name { Name::english(Uuid::new_v4().to_string().as_str()) } } static TIMEOUT: Duration = Duration::from_secs(10); pub struct MTTClient { queue: Queue, rx: Receiver, sender_id: SenderID, session_id: Uuid, } impl MTTClient { fn new(mut queue: Queue, sess_id: Option, lang: Option) -> Self { let sess_name = Session::doc_names()[0].clone(); let (tx, rx) = channel(); let sender_id = queue.add_sender(tx); let msg_id = MessageID::new(); let paths = [ Path::new( Include::Just(msg_id.clone()), Include::Just(sess_name.clone().into()), Include::Just(Action::Records), ), Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::Error), ), ]; let mut add = Addition::new(Session::doc_names()[0].clone()); match lang { Some(language) => { let field: Field = language.into(); add.add_field(Session::language_field_names()[0].clone(), field); } None => {} } for path in paths.iter().cloned() { let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path)); queue.send(Message::with_id(msg_id.clone(), reg_msg)); let result = rx.recv().unwrap(); } match sess_id { Some(id) => { let sess_id = match Uuid::try_from(id.as_str()) { Ok(data) => { let mut qry = Query::new(Session::doc_names()[0].clone()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); calc.add_value(data.clone()).unwrap(); qry.add(Session::id_field_names()[0].clone(), calc); queue.send(Message::with_id(msg_id.clone(), qry)); } Err(_) => queue.send(Message::with_id(msg_id.clone(), add.clone())), }; } None => queue.send(Message::with_id(msg_id.clone(), add.clone())), }; let result = rx.recv().unwrap(); let session_id = match result.get_action() { MsgAction::Records(result) => { let mut holder = result.clone(); if holder.len() == 0 { queue.send(Message::with_id(msg_id.clone(), add)); let new_sess = rx.recv().unwrap(); holder = match new_sess.get_action() { MsgAction::Records(new_holder) => new_holder.clone(), _ => unreachable!("should only receive session records"), } } let rec = holder.iter().last().unwrap(); match rec.get(Session::id_field_names()[0].clone()).unwrap() { Field::Uuid(data) => data.clone(), _ => unreachable!("should always be uuid"), } } _ => unreachable!("should only receive session records"), }; Self { queue: queue, rx: rx, sender_id: sender_id, session_id: session_id, } } pub fn session_id(&self) -> String { self.session_id.to_string() } pub fn create_document(&self, docdef: DocDef) -> Result<(), MTTError> { let msg_id = MessageID::new(); let paths = [ Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::DocumentCreated), ), Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::Error), ), ]; for path in paths.iter() { let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone())); self.queue.send(Message::with_id(msg_id.clone(), reg_msg)); self.rx.recv().unwrap(); // Wait for completion. } self.queue.send(Message::with_id(msg_id.clone(), docdef)); match self.rx.recv_timeout(TIMEOUT) { Ok(data) => match data.get_action() { MsgAction::DocumentCreated => Ok(()), MsgAction::Error(err) => Err(err.clone()), _ => unreachable!("should only receive confirmation or errors"), }, Err(_) => Err(MTTError::new(ErrorID::TimeOut)), } } pub fn records(&self, request: UA) -> Result where UA: Into, { let req = request.into(); let doc_id = req.doc_name().clone(); let msg_id = MessageID::new(); let paths = [ Path::new( Include::Just(msg_id.clone()), Include::Just(doc_id.clone()), Include::Just(Action::Records), ), Path::new( Include::Just(msg_id.clone()), Include::All, Include::Just(Action::Error), ), ]; for path in paths.iter() { let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone())); self.queue.send(Message::new(reg_msg)); let result = self.rx.recv().unwrap(); let action = result.get_action(); match action { MsgAction::Register(status) => match status.get_msg() { RegMsg::Error(err) => { let mut error = err.clone(); error.add_parent(ErrorID::Document(doc_id.clone())); return Err(error); } _ => {} }, _ => unreachable!("got {:?} should have been a registry message", action), } } self.queue.send(Message::with_id(msg_id, req)); match self.rx.recv_timeout(TIMEOUT) { Ok(data) => match data.get_action() { MsgAction::Records(data) => Ok(data.clone()), MsgAction::Error(err) => Err(err.clone()), _ => unreachable!("should only receive records or errors"), }, Err(_) => Err(MTTError::new(ErrorID::TimeOut)), } } } impl Drop for MTTClient { fn drop(&mut self) { self.queue.remove_sender(&self.sender_id); } } #[derive(Clone)] pub struct MoreThanText { queue: Queue, } impl MoreThanText { pub fn new() -> Self { let queue = Queue::new(); CreateDoc::start(queue.clone()); // needs to be first. Clock::start(queue.clone()); Session::start(queue.clone()); Self { queue: queue } } pub fn client(&self) -> MTTClient { MTTClient::new(self.queue.clone(), None, None) } pub fn client_with_language(&self, lang: Language) -> MTTClient { MTTClient::new(self.queue.clone(), None, Some(lang)) } pub fn client_with_session(&self, id: String, lang: Option) -> MTTClient { MTTClient::new(self.queue.clone(), Some(id), lang) } pub fn get_document(&self, name: &str, id: &str) -> Result { if name == "page" { Ok("something".to_string()) } else { Err(MTTError::new(ErrorID::DocumentNotFound)) } } } pub struct TestMoreThanText { mtt: MoreThanText, queue: Queue, channel: Option>, } impl TestMoreThanText { pub fn new() -> Self { let mut mtt = MoreThanText::new(); let queue = mtt.queue.clone(); Self { mtt: mtt, queue: queue, channel: None, } } pub fn get_morethantext(&self) -> MoreThanText { self.mtt.clone() } pub fn send_time_pulse(&self) { let msg = Clock::gen_message(); self.queue.send(msg); } pub fn register_channel(&mut self, paths: Vec) { let mut queue = self.mtt.queue.clone(); let (tx, rx) = channel(); let sender_id = queue.add_sender(tx); for path in paths.iter() { let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone())); queue.send(Message::new(reg_msg)); rx.recv().unwrap(); // Wait for completion. } self.channel = Some(rx); } pub fn recv(&self) -> Result { match &self.channel { Some(rx) => rx.recv_timeout(Duration::from_millis(500)), None => panic!("test environment does not have a channel setup"), } } pub fn get_trigger_records(&self, action: Action) -> Records { let msg = self.recv().unwrap(); let msg_action = msg.get_action(); if action == msg_action.clone().into() { match msg_action { MsgAction::OnAddition(data) => data.clone(), MsgAction::OnDelete(data) => data.clone(), MsgAction::OnQuery(data) => data.clone(), MsgAction::OnUpdate(data) => data.clone(), _ => panic!("{:?} is not a trigger", action), } } else { panic!("received {:?} instead of {:?} trigger", msg, action); } } }