use crate::{ action::{Action, CalcValue, Calculation, MsgAction, Query}, document::{ definition::{DocDef, DocFuncType}, field::Field, }, message::wrapper::{InternalRecord, InternalRecords, Message, Oid, Records, Reply, Update}, mtterror::MTTError, name::NameType, queue::{ data_director::{Include, Path, RegMsg, Register, RouteID}, router::Queue, }, }; use std::{ collections::{HashMap, HashSet}, sync::mpsc::{channel, Receiver}, thread::spawn, }; use uuid::Uuid; pub struct CreateDoc { queue: Queue, rx: Receiver, } impl CreateDoc { fn new(queue: Queue, rx: Receiver) -> Self { Self { queue: queue, rx: rx, } } pub fn start(mut queue: Queue) { let (tx, rx) = channel(); let routes = [Path::new( Include::All, Include::All, Include::Just(Action::Create), )] .to_vec(); let id = queue.add_sender(tx); for route in routes.iter() { let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone())); queue.send(Message::new(NameType::None, regmsg)); rx.recv().unwrap(); } let doc = CreateDoc::new(queue, rx); spawn(move || { doc.listen(); }); } fn listen(&self) { loop { let msg = self.rx.recv().unwrap(); DocumentFile::start(self.queue.clone(), msg); } } } #[cfg(test)] mod createdocs { use super::*; use crate::{name::Name, support_tests::TIMEOUT}; struct TestCreateDoc { queue: Queue, rx: Receiver, rx_id: Uuid, } impl TestCreateDoc { fn new() -> Self { let mut queue = Queue::new(); let (tx, rx) = channel(); let id = queue.add_sender(tx); CreateDoc::start(queue.clone()); Self { queue: queue, rx: rx, rx_id: id, } } fn get_queue(&self) -> Queue { self.queue.clone() } fn get_receiver(&self) -> &Receiver { &self.rx } fn register_paths(&self, paths: Vec) { for path in paths.iter() { let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone())); self.queue.send(Message::new(NameType::None, regmsg)); self.rx.recv_timeout(TIMEOUT).unwrap(); } } } #[test] fn create_document_creation() { let doc_creator = TestCreateDoc::new(); let paths = [ Path::new(Include::All, Include::All, Include::Just(Action::Reply)), Path::new(Include::All, Include::All, Include::Just(Action::Records)), ] .to_vec(); doc_creator.register_paths(paths); let queue = doc_creator.get_queue(); let rx = doc_creator.get_receiver(); let name = Name::english("project"); let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); queue.send(msg1.clone()); let result1 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!( result1.get_message_id(), msg1.get_message_id(), "received {:?} from create message.", rx ); match result1.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } let msg2 = Message::new(name.clone(), Query::new(name.clone())); queue.send(msg2.clone()); let result2 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result2.get_message_id(), msg2.get_message_id()); match result2.get_action() { MsgAction::Records(data) => assert_eq!(data.len(), 0), _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } } #[test] fn does_duplicates_generate_error() { let doc_creator = TestCreateDoc::new(); let paths = [Path::new( Include::All, Include::All, Include::Just(Action::Error), )] .to_vec(); doc_creator.register_paths(paths); let queue = doc_creator.get_queue(); let rx = doc_creator.get_receiver(); let name = Name::english("duplicate"); let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); let msg2 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); queue.send(msg1.clone()); queue.send(msg2.clone()); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg2.get_message_id()); match result.get_action() { MsgAction::Error(err) => match err { MTTError::NameDuplicate(data) => assert_eq!(data, &name), _ => unreachable!("got {:?}: should have been a duplicate name", err), }, _ => unreachable!("got {:?}: should have been a reply.", result.get_action()), } } } #[derive(Clone, Debug)] pub enum IndexType { Index, Unique, } impl IndexType { fn create_index(&self) -> Index { match self { Self::Index => Index::new(), Self::Unique => Index::new_unique(), } } } #[derive(Debug)] struct Index { data: HashMap>, unique: bool, } impl Index { fn new() -> Self { Self { data: HashMap::new(), unique: false, } } fn new_unique() -> Self { Self { data: HashMap::new(), unique: true, } } fn internal_add(&mut self, field: &Field, oid: Oid) { let storage = match self.data.get_mut(field) { Some(data) => data, None => { let data = HashSet::new(); self.data.insert(field.clone(), data); self.data.get_mut(field).unwrap() } }; storage.insert(oid); } fn add(&mut self, field: Field, oid: Oid) -> Result<(), MTTError> { let oids = match self.data.get_mut(&field) { Some(data) => data, None => { self.data.insert(field.clone(), HashSet::new()); self.data.get_mut(&field).unwrap() } }; if self.unique && oids.len() > 0 { return Err(MTTError::FieldDuplicate); } else { oids.insert(oid); } Ok(()) } fn pull(&self, calc: &Calculation) -> Result, MTTError> { let mut output = HashSet::new(); for (key, value) in self.data.iter() { match calc.calculate(key) { Field::Boolean(data) => { if data { output = output.union(&value).cloned().collect(); } } _ => return Err(MTTError::FieldInvalidType), } } Ok(output) } fn remove(&mut self, field: &Field, oid: &Oid) { match self.data.get_mut(field) { Some(oids) => { oids.remove(oid); if oids.len() == 0 { self.data.remove(field); } } None => {} }; } fn validate(&self, field: &Field) -> Result<(), MTTError> { if self.unique { match self.data.get(field) { Some(_) => return Err(MTTError::FieldDuplicate), None => {} } } Ok(()) } } struct Indexes { data: HashMap, } impl Indexes { fn new(settings: &HashMap) -> Self { let mut output = HashMap::new(); for (key, value) in settings.iter() { output.insert(key.clone(), value.create_index()); } Self { data: output } } fn index_ids(&self) -> HashSet<&Uuid> { self.data.keys().collect::>() } fn pull(&self, field_id: &Uuid, calc: &Calculation) -> Result, MTTError> { self.data.get(field_id).unwrap().pull(calc) } fn add_to_index(&mut self, field_name: &Uuid, field: Field, oid: Oid) -> Result<(), MTTError> { let index = match self.data.get_mut(field_name) { Some(data) => data, None => return Ok(()), }; index.add(field, oid) } fn validate(&self, field_name: &Uuid, value: &Field) -> Result<(), MTTError> { match self.data.get(field_name) { Some(index) => match index.validate(value) { Ok(_) => {} Err(err) => return Err(err), }, None => {} } Ok(()) } fn iter_mut(&mut self) -> impl Iterator { self.data.iter_mut() } } #[cfg(test)] mod indexes { use super::*; use crate::action::{FieldType, Operand}; fn get_fields(count: usize) -> Vec { let mut output = Vec::new(); while output.len() < count { let field: Field = Uuid::new_v4().into(); if !output.contains(&field) { output.push(field); } } output } fn get_oids(count: usize) -> Vec { let mut output = Vec::new(); while output.len() < count { let oid = Oid::new(); if !output.contains(&oid) { output.push(oid); } } output } #[test] fn add_to_index() { let mut index = Index::new(); let count = 3; let fields = get_fields(count); let oids = get_oids(count); for i in 0..count { index.add(fields[i].clone(), oids[i].clone()).unwrap(); } for i in 0..count { let mut calc = Calculation::new(Operand::Equal); calc.add_value(fields[i].clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); let result = index.pull(&calc).unwrap(); assert_eq!(result.len(), 1); assert_eq!(result.iter().last().unwrap(), &oids[i]); } } #[test] fn index_can_handle_multiple_entries() { let mut index = Index::new(); let count = 3; let fields = get_fields(1); let oids = get_oids(count); for i in 0..count { index.add(fields[0].clone(), oids[i].clone()).unwrap(); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(fields[0].clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); let result = index.pull(&calc).unwrap(); assert_eq!(result.len(), 3); for oid in oids { assert!(result.contains(&oid)); } } #[test] fn can_remove_oid() { let mut index = Index::new(); let count = 3; let pos = 1; let fields = get_fields(1); let oids = get_oids(count); for i in 0..count { index.add(fields[0].clone(), oids[i].clone()).unwrap(); } index.remove(&fields[0], &oids[pos]); let mut calc = Calculation::new(Operand::Equal); calc.add_value(fields[0].clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); let result = index.pull(&calc).unwrap(); assert!(!result.contains(&oids[pos]), "should have removed oid"); } #[test] fn are_empty_indexes_removed() { let mut index = Index::new(); let field: Field = Uuid::new_v4().into(); let oid = Oid::new(); index.add(field.clone(), oid.clone()).unwrap(); index.remove(&field, &oid); assert_eq!(index.data.len(), 0); } #[test] fn do_unique_indexes_error_on_duplicates() { let mut index = Index::new_unique(); let field: Field = "fred".into(); let oids = get_oids(2); index.add(field.clone(), oids[0].clone()).unwrap(); match index.add(field.clone(), oids[0].clone()) { Ok(_) => unreachable!("should have been an error"), Err(err) => match err { MTTError::FieldDuplicate => {} _ => unreachable!("got {:?}: should have been duplicate field", err), }, } } #[test] fn index_returns_validate() { let mut index = Index::new(); let field: Field = "stuff".into(); let oid = Oid::new(); index.add(field.clone(), oid).unwrap(); match index.validate(&field) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should have returned without issue", err), } } #[test] fn unique_return_duplicate_error() { let mut index = Index::new_unique(); let field: Field = "fred".into(); let oid = Oid::new(); index.add(field.clone(), oid).unwrap(); match index.validate(&field) { Ok(_) => unreachable!("should have gotten a duplication error"), Err(err) => match err { MTTError::FieldDuplicate => {} _ => unreachable!("got {:?}: should have been duplicate field", err), }, } } } struct DocumentFile { docdef: DocDef, docs: InternalRecords, indexes: Indexes, name_id: Uuid, queue: Queue, routes: HashMap, rx: Receiver, } impl DocumentFile { fn new( queue: Queue, rx: Receiver, docdef: DocDef, routes: HashMap, name_id: Uuid, ) -> Self { let indexes = Indexes::new(docdef.get_indexes()); Self { docdef: docdef.clone(), docs: InternalRecords::new(), indexes: indexes, name_id: name_id, queue: queue, routes: routes, rx: rx, } } fn start(mut queue: Queue, msg: Message) { let (tx, rx) = channel(); let action = msg.get_action(); let docdef = match action { MsgAction::Create(data) => data.clone(), _ => unreachable!("got {:?}: should have been a create message", action), }; let names = docdef.get_document_names(); let id = queue.add_sender(tx); let reg_msg = Register::new(id, RegMsg::AddDocName(names.clone())); let rmsg = msg.response(reg_msg.clone()); queue.send(rmsg.clone()); let name_result = rx.recv().unwrap(); let name_id = match name_result.get_action() { MsgAction::Register(data) => match data.get_msg() { RegMsg::DocumentNameID(data) => data, RegMsg::Error(err) => { queue.remove_sender(&id); queue.send(msg.response(err.clone())); return; } _ => unreachable!("should only return a name id or an error"), }, _ => unreachable!("should only return a name id or an error"), }; let mut route_action: HashMap = HashMap::new(); for path_action in docdef.iter_routes() { let request = reg_msg.response(RegMsg::AddRoute(path_action.path())); let add_route = rmsg.response(request); queue.send(add_route); let result = rx.recv().unwrap(); let route_id = match result.get_action() { MsgAction::Register(data) => match data.get_msg() { RegMsg::RouteID(data) => data, RegMsg::Error(err) => { queue.remove_sender(&id); queue.send(msg.response(err.clone())); return; } _ => unreachable!("should only return a route id or an error"), }, _ => unreachable!("should only return a route id or an error"), }; route_action.insert(route_id.clone(), path_action.doc_function()); } let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action, name_id.clone()); spawn(move || { doc.listen(); }); let reply = msg.response(Reply::new()); queue.send(reply.clone()); } fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); let route = msg.get_route(); for (route_id, doc_func) in self.routes.clone().iter() { if route == route_id.into() { match doc_func { DocFuncType::Add => self.add_document(&msg), DocFuncType::Delete => self.delete(&msg), DocFuncType::Query => self.query(&msg), DocFuncType::Show => self.queue.send(msg.response(Reply::new())), DocFuncType::Update => self.update(&msg), DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), DocFuncType::Trigger(action) => self.trigger(&msg, action), } } } } } fn validate(&self, field_name: NT, value: &Field) -> Result where NT: Into, { let field_id = match self.docdef.get_field_id(field_name) { Ok(data) => data, Err(err) => return Err(err), }; let output = match self.docdef.validate(field_id.clone(), value) { Ok(data) => data, Err(err) => return Err(err), }; match self.indexes.validate(&field_id, &output) { Ok(_) => {} Err(err) => return Err(err), } Ok(output) } fn add_document(&mut self, msg: &Message) { let addition = match msg.get_action() { MsgAction::Addition(data) => data, _ => return, }; let mut holder = InternalRecord::new(); for (name_id, value) in addition.iter() { let field_id = match self.docdef.get_field_id(name_id) { Ok(id) => id, Err(err) => { let reply = msg.response(err); self.queue.send(reply); return; } }; holder.insert(field_id.clone(), value.get(&Field::None)); } for field_id in self.docdef.field_ids().iter() { let value = match holder.get(field_id) { Some(data) => data, None => &Field::None, }; let corrected = match self.validate(field_id, value) { Ok(data) => data, Err(err) => { let reply = msg.response(err); self.queue.send(reply); return; } }; holder.insert(field_id.clone(), corrected.clone()); } let mut records = Records::new(self.docdef.get_field_names().clone()); if !holder.is_empty() { let mut oid = Oid::new(); while self.docs.contains_key(&oid) { oid = Oid::new(); } for (field_id, oids) in self.indexes.iter_mut() { let value = holder.get(field_id).unwrap(); oids.internal_add(value, oid.clone()); } self.docs.insert(oid.clone(), holder.clone()); records.insert(oid, holder); } self.queue.send(msg.response(records.clone())); self.queue .send(msg.response(MsgAction::OnAddition(records))); } fn delete(&mut self, msg: &Message) { let delete = match msg.get_action() { MsgAction::Delete(data) => data, _ => unreachable!("should always be delete action"), }; let records = match self.run_query(delete.get_query()) { Ok(data) => data, Err(err) => { let reply = msg.response(err); self.queue.send(reply); return; } }; for (oid, record) in records.iter() { for (field_id, index) in self.indexes.iter_mut() { index.remove(record.get(field_id).unwrap(), oid); } self.docs.remove(oid); } let rec = Records::with_data(self.docdef.get_field_names().clone(), records); self.queue.send(msg.response(rec.clone())); self.queue.send(msg.response(MsgAction::OnDelete(rec))); } fn run_query(&self, query: &Query) -> Result { let indexed_ids = self.indexes.index_ids(); let mut indexed: HashMap = HashMap::new(); let mut unindexed: HashMap = HashMap::new(); for (field, data) in query.iter() { let id = match self.docdef.get_field_id(field) { Ok(fid) => fid, Err(err) => return Err(err), }; if indexed_ids.contains(&id) { indexed.insert(id, data.clone()); } else { unindexed.insert(id, data.clone()); } } let mut oids: HashSet = self.docs.keys().cloned().collect(); for (field_id, calculation) in indexed.iter() { let holder = match self.indexes.pull(field_id, calculation) { Ok(data) => data, Err(err) => return Err(err), }; oids = oids.intersection(&holder).cloned().collect(); } let mut records = InternalRecords::new(); for oid in oids.iter() { records.insert(oid.clone(), self.docs.get(oid).unwrap().clone()); } let holder = oids.clone(); for (oid, record) in records.iter() { for (field_id, calc) in unindexed.iter() { match calc.calculate(record.get(field_id).unwrap()) { Field::Boolean(data) => { if !data { oids.remove(oid); break; } } _ => { oids.remove(oid); break; } } } } let removals = holder.difference(&oids); for oid in removals { records.remove(oid); } Ok(records) } fn query(&self, msg: &Message) { let query = match msg.get_action() { MsgAction::Query(data) => data, _ => unreachable!("should receive a query"), }; let records = match self.run_query(query) { Ok(data) => data, Err(err) => { let reply = msg.response(err); self.queue.send(reply); return; } }; let recs = Records::with_data(self.docdef.get_field_names().clone(), records); self.queue.send(msg.response(recs.clone())); self.queue.send(msg.response(MsgAction::OnQuery(recs))); } fn run_update( &mut self, original: &InternalRecords, update: &Update, msg: &Message, ) -> Result { let mut changes: HashMap = HashMap::new(); for (key, value) in update.get_values().iter() { let field_id = match self.docdef.get_field_id(key) { Ok(data) => data, Err(err) => return Err(err), }; changes.insert(field_id, value); } let mut indexes = Indexes::new(self.docdef.get_indexes()); let mut updates = InternalRecords::new(); for (oid, record) in original.iter() { let mut holder = record.clone(); for (field_id, value) in changes.iter() { let field = value.get(holder.get(field_id).unwrap()); let correction = match self.validate(field_id, &field) { Ok(data) => data, Err(err) => return Err(err), }; holder.insert(field_id.clone(), correction.clone()); match indexes.add_to_index(&field_id, correction, oid.clone()) { Ok(_) => {} Err(err) => return Err(err), } } updates.insert(oid.clone(), holder); } for (oid, new_rec) in updates.iter() { let old_rec = original.get(oid).unwrap(); for (field_id, index) in self.indexes.iter_mut() { index.remove(old_rec.get(field_id).unwrap(), oid); index .add(new_rec.get(field_id).unwrap().clone(), oid.clone()) .unwrap(); } self.docs.insert(oid.clone(), new_rec.clone()); } let recs = Records::with_data(self.docdef.get_field_names().clone(), updates); self.queue .send(msg.response(MsgAction::OnUpdate(recs.clone()))); Ok(recs) } fn update(&mut self, msg: &Message) { let update = match msg.get_action() { MsgAction::Update(data) => data, _ => unreachable!("should receive a update"), }; let original = match self.run_query(update.get_query()) { Ok(result) => result, Err(err) => { let reply = msg.response(err); self.queue.send(reply); return; } }; let data = match self.run_update(&original, update, msg) { Ok(output) => output, Err(err) => { let reply = msg.response(err); self.queue.send(reply); return; } }; self.queue.send(msg.response(data)); } fn existing_query(&mut self, msg: &Message, action: &MsgAction) { let recs = match msg.get_action() { MsgAction::OnQuery(data) => data, _ => unreachable!("should only receive on messages"), }; match action { MsgAction::Update(change) => self .run_update(recs.get_internal_records(), change, msg) .unwrap(), _ => panic!("should not get here"), }; } fn trigger(&self, msg: &Message, action: &MsgAction) { self.queue .send(msg.forward(self.name_id.clone(), action.clone())); } } #[cfg(test)] mod document_files { use super::*; use crate::{ action::{Addition, Operand}, document::field::FieldType, message::wrapper::Delete, name::{Name, Names}, support_tests::TIMEOUT, }; use chrono::Utc; use std::{sync::mpsc::RecvTimeoutError, thread::sleep, time::Duration}; fn standard_paths() -> Vec { [ Path::new(Include::All, Include::All, Include::Just(Action::Records)), Path::new(Include::All, Include::All, Include::Just(Action::Reply)), Path::new(Include::All, Include::All, Include::Just(Action::Error)), ] .to_vec() } struct TestDocument { docdef: DocDef, queue: Queue, sender_id: Uuid, rx: Receiver, } impl TestDocument { fn new(field_types: Vec) -> Self { let doc_name = Name::english(Uuid::new_v4().to_string().as_str()); let mut docdef = DocDef::new(doc_name.clone()); let mut count = 0; for field_type in field_types.iter() { docdef.add_field( Name::english(format!("field{}", count).as_str()), field_type.clone(), ); count += 1; } let (tx, rx) = channel(); let mut queue = Queue::new(); let id = queue.add_sender(tx); Self { docdef: docdef, queue: queue, sender_id: id, rx: rx, } } fn doc_name(&self) -> Name { self.docdef.get_document_names()[0].clone() } fn get_docdef(&self) -> &DocDef { &self.docdef } fn get_docdef_mut(&mut self) -> &mut DocDef { &mut self.docdef } fn get_queue(&mut self) -> Queue { self.queue.clone() } fn get_receiver(&self) -> &Receiver { &self.rx } fn get_sender_id(&self) -> Uuid { self.sender_id.clone() } fn send(&self, action: A) where A: Into, { let msg = Message::new(self.docdef.get_document_names()[0].clone(), action); self.queue.send(msg); } fn start(&mut self, routes: Vec) { let msg = Message::new( self.docdef.get_document_names()[0].clone(), self.docdef.clone(), ); DocumentFile::start(self.queue.clone(), msg); for route in routes.iter() { let request = Register::new(self.sender_id.clone(), RegMsg::AddRoute(route.clone())); let add_route = Message::new(NameType::None, request); self.queue.send(add_route); self.rx.recv().unwrap(); } } fn populate(&self, data: Vec) { let mut add = Addition::new(self.doc_name()); let mut count = 0; for item in data.iter() { add.add_field( Name::english(format!("field{}", count).as_str()), item.clone(), ); count += 1; } self.send(add); match self.rx.recv_timeout(TIMEOUT) { Ok(_) => {} // eats the addition response. Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("got {}, should have been ok or time out", err), }, } } } impl From for TestDocument { fn from(value: DocDef) -> Self { let (tx, rx) = channel(); let mut queue = Queue::new(); let id = queue.add_sender(tx); Self { docdef: value, queue: queue, sender_id: id, rx: rx, } } } #[test] fn does_not_respond_to_create() { let name = Name::english("quiet"); let docdef = DocDef::new(name.clone()); let mut test_doc: TestDocument = docdef.into(); let alt = Name::english("alternate"); test_doc.start(standard_paths()); let docdef = DocDef::new(alt); let msg = Message::new(name.clone(), docdef); test_doc.get_queue().send(msg); match test_doc.get_receiver().recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn does_document_respond_to_requests() { let name = Name::english("listen"); let docdef = DocDef::new(name.clone()); let mut test_doc: TestDocument = docdef.into(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let msg_actions = [ MsgAction::Addition(Addition::new(name.clone())), MsgAction::Delete(Delete::new(Query::internal())), MsgAction::Query(Query::new(name.clone())), MsgAction::Show, MsgAction::Update(Update::new(Query::internal())), ]; for msg_action in msg_actions.iter() { let msg = Message::new(name.clone(), msg_action.clone()); queue.send(msg.clone()); let result = match test_doc.get_receiver().recv_timeout(TIMEOUT) { Ok(data) => data.clone(), Err(err) => unreachable!("for {:?} got {:?}", msg_action, err), }; assert_eq!( result.get_message_id(), msg.get_message_id(), "for {:?} response and reply ids should equal", msg_action ); match result.get_action() { MsgAction::Reply(data) => { assert_eq!(data.len(), 0, "for {:?} got {:?}", msg_action, result) } MsgAction::Records(data) => { assert_eq!(data.len(), 0, "for {:?} got {:?}", msg_action, result) } _ => unreachable!( "for {:?} got {:?}: should have received a reply", msg_action, result.get_action() ), } } } #[test] fn does_not_respond_to_other_document_requests() { let name = Name::english("quiet"); let alt = Name::english("alternate"); let docdef = DocDef::new(name.clone()); let mut test_doc: TestDocument = docdef.into(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let reg_msg = Register::new( test_doc.get_sender_id(), RegMsg::AddDocName([alt.clone()].to_vec()), ); let setup = Message::new(NameType::None, reg_msg.clone()); queue.send(setup); test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let msg_actions = [ MsgAction::Addition(Addition::new(alt.clone())), MsgAction::Create(DocDef::new(name.clone())), MsgAction::Delete(Delete::new(Query::new(alt.clone()))), MsgAction::Query(Query::internal().into()), MsgAction::Show, MsgAction::Update(Update::new(Query::internal())), ]; let mut msgs: HashMap = HashMap::new(); for msg_action in msg_actions.iter() { let msg = Message::new(alt.clone(), msg_action.clone()); msgs.insert(msg.get_message_id().clone(), msg_action.clone()); queue.send(msg); } match test_doc.get_receiver().recv_timeout(TIMEOUT) { Ok(msg) => unreachable!( "for {:?} should not receive: {:?}", msgs.get(msg.get_message_id()).unwrap(), msg ), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("got {:?}, should have timed out", err), }, } } #[test] fn query_sends_on_query_message() { let count = 5; let mut data: HashSet = HashSet::new(); while data.len() < count { let field: Field = Uuid::new_v4().into(); data.insert(field); } let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); let queue = test_doc.get_queue(); let routes = [Path::new( Include::All, Include::All, Include::Just(Action::OnQuery), )] .to_vec(); test_doc.start(routes); for item in data.iter() { test_doc.populate([item.clone()].to_vec()); } let msg = Message::new(doc_name.clone(), Query::new(doc_name.clone())); queue.send(msg.clone()); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); assert_eq!( result.get_message_id(), msg.get_message_id(), "message ids should match" ); match result.get_action() { MsgAction::OnQuery(output) => { assert_eq!( output.len(), count, "wrong number of entries: got {:?}", output ); for rec in output.iter() { assert!(data.contains(&rec.get(Name::english("field0")).unwrap())); } } _ => unreachable!("should never get here"), } } #[test] fn send_on_addition_message() { let data: Field = Uuid::new_v4().into(); let field_name = Name::english("field0"); let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); let queue = test_doc.get_queue(); let routes = vec![Path::new( Include::All, Include::All, Include::Just(Action::OnAddition), )]; test_doc.start(routes); let mut add = Addition::new(doc_name.clone()); add.add_field(field_name.clone(), data.clone()); let msg = Message::new(doc_name.clone(), add); queue.send(msg.clone()); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); assert_eq!( result.get_message_id(), msg.get_message_id(), "message ids should match" ); match result.get_action() { MsgAction::OnAddition(output) => { assert_eq!(output.len(), 1, "wrong number of entries: got {:?}", output); for rec in output.iter() { assert_eq!(rec.get(Name::english("field0")).unwrap(), data); } } _ => unreachable!("should never get here"), } } #[test] fn sends_on_delete_message() { let count = 5; let mut data: HashSet = HashSet::new(); while data.len() < count { let field: Field = Uuid::new_v4().into(); data.insert(field); } let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); let queue = test_doc.get_queue(); let routes = [Path::new( Include::All, Include::All, Include::Just(Action::OnDelete), )] .to_vec(); test_doc.start(routes); for item in data.iter() { test_doc.populate([item.clone()].to_vec()); } let msg = Message::new(doc_name.clone(), Delete::new(Query::internal())); queue.send(msg.clone()); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); assert_eq!( result.get_message_id(), msg.get_message_id(), "message ids should match" ); match result.get_action() { MsgAction::OnDelete(output) => { assert_eq!( output.len(), count, "wrong number of entries: got {:?}", output ); for rec in output.iter() { assert!(data.contains(&rec.get(Name::english("field0")).unwrap())); } } _ => unreachable!("should never get here"), } } #[test] fn sends_on_update_message() { let count = 5; let field_name = Name::english("field0"); let mut data: HashSet = HashSet::new(); while data.len() < count { let field: Field = Uuid::new_v4().into(); data.insert(field); } let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); let queue = test_doc.get_queue(); let routes = [Path::new( Include::All, Include::All, Include::Just(Action::OnUpdate), )] .to_vec(); test_doc.start(routes); for item in data.iter() { test_doc.populate([item.clone()].to_vec()); } let mut update = Update::new(Query::internal()); update .get_values_mut() .add_field(field_name.clone(), Uuid::nil()); let msg = Message::new(doc_name.clone(), update); queue.send(msg.clone()); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); assert_eq!( result.get_message_id(), msg.get_message_id(), "message ids should match" ); match result.get_action() { MsgAction::OnUpdate(output) => { assert_eq!( output.len(), count, "wrong number of entries: got {:?}", output ); for rec in output.iter() { assert_eq!(rec.get(&field_name).unwrap(), Uuid::nil().into()); } } _ => unreachable!("should never get here"), } } #[test] fn can_document_be_added() { let doc_name = Name::english("document"); let mut docdef = DocDef::new(doc_name.clone()); let name = Name::english("field"); let data = Uuid::new_v4(); docdef.add_field(name.clone(), FieldType::Uuid); let mut test_doc: TestDocument = docdef.clone().into(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let mut new_doc = Addition::new(doc_name.clone()); new_doc.add_field(name.clone(), data.clone()); let testing = |msg: Message| { queue.send(msg.clone()); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_action() { MsgAction::Records(output) => { assert_eq!(output.len(), 1); for rec in output.iter() { let holder = rec.get(&name).unwrap(); match holder { Field::Uuid(field_data) => assert_eq!(field_data, data), _ => unreachable!("got {:?}, should have been uuid", holder), } } } _ => unreachable!( "\n\ngot {:?}\n\nfor {:?}\n\nshould have been records", result, msg ), } }; testing(Message::new(doc_name.clone(), new_doc)); testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); } #[test] fn can_add_multiple_documents() { let doc_name = Name::english("multiple"); let mut docdef = DocDef::new(doc_name.clone()); let name = Name::english("count"); docdef.add_field(name.clone(), FieldType::Integer); let mut test_doc: TestDocument = docdef.clone().into(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let count = 5; for i in 0..count { let mut new_doc = Addition::new(doc_name.clone()); new_doc.add_field(name.clone(), i); queue.send(Message::new(doc_name.clone(), new_doc)); test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); } queue.send(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); let mut entries: HashSet = (0..count).collect(); match action { MsgAction::Records(output) => { let entry_count: usize = count.try_into().unwrap(); assert_eq!( output.len(), entry_count, "should have the same number of entries" ); for record in output.iter() { let holder = record.get(&name).unwrap(); let data = match holder { Field::Integer(item) => item.clone(), _ => unreachable!("got {:?}, should have been integer", holder), }; assert!( entries.contains(&data), "did not find {:?} in {:?}", data, entries ); entries.remove(&data); } } _ => unreachable!("\n\ngot {:?}\n\nshould have been records", action), } assert!(entries.is_empty(), "did not use {:?}", entries); } #[test] fn errors_on_wrong_field_name() { let mut test_doc = TestDocument::new(Vec::new()); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let name = Name::english("bad"); let mut addition = Addition::new(test_doc.doc_name()); addition.add_field(name.clone(), "doesn't matter"); queue.send(Message::new( test_doc.get_docdef().get_document_names()[0].clone(), addition, )); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::NameNotFound(data) => assert_eq!(data, &name), _ => unreachable!("got {:?}: should have been document field not found.", err), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn errors_on_wrong_field_type() { let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let mut addition = Addition::new(test_doc.doc_name()); addition.add_field(Name::english("field0"), "string"); queue.send(Message::new( test_doc.get_docdef().get_document_names()[0].clone(), addition, )); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(got, &FieldType::StaticString); assert_eq!(expected, &FieldType::Uuid); } _ => unreachable!( "got {:?}: should have been document field data mismatch.", err ), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn errors_on_missing_fields() { let mut test_doc = TestDocument::new([FieldType::Integer, FieldType::Integer].to_vec()); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let mut addition = Addition::new(test_doc.doc_name()); addition.add_field(Name::english("field0"), 1); queue.send(Message::new( test_doc.get_docdef().get_document_names()[0].clone(), addition, )); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::InvalidNone => {} _ => unreachable!("got {:?}: should have been document field missing", err), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn does_query_return_related_entries() { let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let count = 5; let expected = 3; for i in 0..count { test_doc.populate([i.into()].to_vec()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(expected.clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); let mut query = Query::new(doc_name); query.add(Name::english("field0"), calc); queue.send(Message::new( test_doc.get_docdef().get_document_names()[0].clone(), query, )); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { assert_eq!( data.len(), 1, "should return one entry containing {:?} got:\n{:?}", expected, action ); for doc in data.iter() { assert_eq!(doc.get(&Name::english("field0")).unwrap(), expected.into()); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn does_query_work_with_greater_than() { let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); test_doc.populate([1.into()].to_vec()); test_doc.populate([2.into()].to_vec()); let mut calc = Calculation::new(Operand::GreaterThan); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); calc.add_value(1).unwrap(); let mut query = Query::new(doc_name); query.add(Name::english("field0"), calc); queue.send(Message::new( test_doc.get_docdef().get_document_names()[0].clone(), query, )); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { assert_eq!( data.len(), 1, "should return one entry containing 2 got:\n{:?}", action ); for doc in data.iter() { assert_eq!(doc.get(&Name::english("field0")).unwrap(), 2.into()); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn gets_all_documents_in_query() { let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let data = 1; let count = 5; for i in 0..count { let holder: i128 = (i + count).try_into().unwrap(); test_doc.populate([holder.into()].to_vec()); test_doc.populate([data.into()].to_vec()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(data.clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); let mut query = Query::new(doc_name); query.add(Name::english("field0"), calc); queue.send(Message::new( test_doc.get_docdef().get_document_names()[0].clone(), query, )); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!( docs.len(), count, "should return one entry containing {:?} got:\n{:?}", data, action ); for doc in docs.iter() { assert_eq!(doc.get(&Name::english("field0")).unwrap(), data.into()); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn query_should_work_with_multiple_fields() { let mut doc = TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); doc.start(standard_paths()); let values = [ ["a".into(), "a".into()].to_vec(), ["a".into(), "b".into()].to_vec(), ["b".into(), "a".into()].to_vec(), ["b".into(), "b".into()].to_vec(), ]; for value in values.iter() { doc.populate(value.clone()); } let mut query = Query::new(doc.doc_name()); let mut calc = Calculation::new(Operand::Equal); calc.add_value("a").unwrap(); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); query.add(Name::english("field0"), calc); let mut calc = Calculation::new(Operand::Equal); calc.add_value("b").unwrap(); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); query.add(Name::english("field1"), calc); doc.send(query); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { let afield: Field = "a".into(); let bfield: Field = "b".into(); assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); for doc in data.iter() { assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn query_should_work_with_multiple_inexed_fields() { let mut doc = TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); let docdef = doc.get_docdef_mut(); docdef .add_index(&Name::english("field0"), IndexType::Index) .unwrap(); docdef .add_index(&Name::english("field1"), IndexType::Index) .unwrap(); doc.start(standard_paths()); let values = [ ["a".into(), "a".into()].to_vec(), ["a".into(), "b".into()].to_vec(), ["b".into(), "a".into()].to_vec(), ["b".into(), "b".into()].to_vec(), ]; for value in values.iter() { doc.populate(value.clone()); } let mut query = Query::new(doc.doc_name()); let mut calc = Calculation::new(Operand::Equal); calc.add_value("a").unwrap(); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); query.add(Name::english("field0"), calc); let mut calc = Calculation::new(Operand::Equal); calc.add_value("b").unwrap(); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); query.add(Name::english("field1"), calc); doc.send(query); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { let afield: Field = "a".into(); let bfield: Field = "b".into(); assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); for doc in data.iter() { assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn query_should_work_with_mixed_inexed_fields() { let mut doc = TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); let docdef = doc.get_docdef_mut(); docdef .add_index(&Name::english("field0"), IndexType::Index) .unwrap(); doc.start(standard_paths()); let values = [ ["a".into(), "a".into()].to_vec(), ["a".into(), "b".into()].to_vec(), ["b".into(), "a".into()].to_vec(), ["b".into(), "b".into()].to_vec(), ]; for value in values.iter() { doc.populate(value.clone()); } let mut query = Query::new(doc.doc_name()); let mut calc = Calculation::new(Operand::Equal); calc.add_value("a").unwrap(); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); query.add(Name::english("field0"), calc); let mut calc = Calculation::new(Operand::Equal); calc.add_value("b").unwrap(); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); query.add(Name::english("field1"), calc); doc.send(query); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { let afield: Field = "a".into(); let bfield: Field = "b".into(); assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); for doc in data.iter() { assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn errors_on_bad_field_name() { let mut doc = TestDocument::new(Vec::new()); doc.start(standard_paths()); let doc_name = doc.get_docdef().get_document_names()[0].clone(); let queue = doc.get_queue(); let rx = doc.get_receiver(); let field_name = Name::english("wrong"); let mut query = Query::new(doc.doc_name()); let mut calc = Calculation::new(Operand::Equal); calc.add_value("something").unwrap(); query.add(field_name.clone(), calc); let msg = Message::new(doc_name, query); queue.send(msg); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(data) => match data { MTTError::NameNotFound(output) => assert_eq!(output, &field_name), _ => unreachable!("got {:?}: should been field not found", data), }, _ => unreachable!("got {:?}: should have been a error", action), } } #[test] fn errors_on_bad_field_type_with_index() { let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); doc.get_docdef_mut() .add_index(&Name::english("field0"), IndexType::Index) .unwrap(); doc.start(standard_paths()); doc.populate([Uuid::nil().into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value("notUUID").unwrap(); let mut query = Query::new(doc.doc_name()); query.add(Name::english("field0"), calc); doc.send(query); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(data) => match data { MTTError::FieldInvalidType => {} _ => unreachable!("got {:?}: should been invalid field type", data), }, _ => unreachable!("got {:?}: should have been a error", action), } } #[test] fn can_use_default_values() { let doc_name = Name::english("default"); let mut docdef = DocDef::new(doc_name.clone()); let field_name = Name::english("holder"); docdef.add_field(field_name.clone(), FieldType::StaticString); docdef .set_default(&field_name, FieldType::StaticString) .unwrap(); let mut test_doc: TestDocument = docdef.into(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let rx = test_doc.get_receiver(); let new_doc = Addition::new(doc_name.clone()); let msg = Message::new(doc_name, new_doc); queue.send(msg); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), 1); for doc in docs.iter() { let expected: Field = "".into(); assert_eq!(doc.get(&field_name).unwrap(), expected); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn can_a_default_value_be_set() { let doc_name = Name::english("assigned"); let mut docdef = DocDef::new(doc_name.clone()); let field_name = Name::english("id"); docdef.add_field(field_name.clone(), FieldType::Uuid); docdef.set_default(&field_name, Uuid::nil()).unwrap(); let mut test_doc: TestDocument = docdef.into(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let rx = test_doc.get_receiver(); let new_doc = Addition::new(doc_name.clone()); let msg = Message::new(doc_name, new_doc); queue.send(msg); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), 1); for doc in docs.iter() { let expected: Field = Uuid::nil().into(); assert_eq!(doc.get(&field_name).unwrap(), expected); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn can_default_values_be_overridden() { let doc_name = Name::english("assigned"); let mut docdef = DocDef::new(doc_name.clone()); let field_name = Name::english("id"); docdef.add_field(field_name.clone(), FieldType::Uuid); docdef.set_default(&field_name, FieldType::Uuid).unwrap(); let mut test_doc: TestDocument = docdef.into(); test_doc.start(standard_paths()); let queue = test_doc.get_queue(); let rx = test_doc.get_receiver(); let mut new_doc = Addition::new(doc_name.clone()); new_doc.add_field(&field_name, Uuid::nil()); let msg = Message::new(doc_name, new_doc); queue.send(msg); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), 1); for doc in docs.iter() { let expected: Field = Uuid::nil().into(); assert_eq!(doc.get(&field_name).unwrap(), expected); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn empty_update_query_results_in_zero_changes() { let count = 5; let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(Uuid::new_v4()); } let id = ids.iter().last().unwrap().clone(); ids.remove(&id); let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); doc.start(standard_paths()); for id in ids.iter() { doc.populate([id.clone().into()].to_vec()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); calc.add_value(Uuid::nil()).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let mut update = Update::new(query); update .get_values_mut() .add_field(Name::english("field0"), Uuid::nil()); doc.send(update); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => assert_eq!(docs.len(), 0), _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn changes_information_requested() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(standard_paths()); let doc_name = doc.get_docdef().get_document_names()[0].clone(); let old = "old"; let new = "new"; let id = Uuid::new_v4(); doc.populate([id.into(), old.into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); calc.add_value(id.clone()).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let mut update = Update::new(query); update .get_values_mut() .add_field(Name::english("field1"), new); let mut testing = |msg: Message| { doc.get_queue().send(msg.clone()); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), 1, "for {:?}, should have one entry", msg); for doc in docs.iter() { assert_eq!(doc.get(Name::english("field0")).unwrap(), id.into()); assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } }; testing(Message::new(doc_name.clone(), update)); testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); } #[test] fn changes_only_the_queried() { let mut doc = TestDocument::new([FieldType::Integer, FieldType::StaticString].to_vec()); doc.start(standard_paths()); let doc_name = doc.get_docdef().get_document_names()[0].clone(); let old = "old"; let new = "new"; let count = 5; let field_count = count.clone().try_into().unwrap(); let picked = 3; for i in 0..field_count { doc.populate([i.into(), old.into()].to_vec()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); calc.add_value(picked.clone()).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let mut update = Update::new(query); update .get_values_mut() .add_field(Name::english("field1"), new); doc.get_queue().send(Message::new(doc_name.clone(), update)); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), 1, "should have one entry"); for doc in docs.iter() { assert_eq!(doc.get(Name::english("field0")).unwrap(), picked.into()); assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } doc.get_queue() .send(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), count, "should have one entry"); for doc in docs.iter() { if doc.get(Name::english("field0")).unwrap() == picked.into() { assert_eq!( doc.get(Name::english("field1")).unwrap(), new.into(), "{:?}", docs ); } else { assert_eq!( doc.get(Name::english("field1")).unwrap(), old.into(), "{:?}", docs ); } } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn can_handle_multiple_updates() { let mut doc = TestDocument::new([FieldType::Integer, FieldType::StaticString].to_vec()); doc.start(standard_paths()); let doc_name = doc.get_docdef().get_document_names()[0].clone(); let old = "old"; let new = "new"; let count = 5; let picked = 3; for _ in 0..count { doc.populate([picked.into(), old.into()].to_vec()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); calc.add_value(picked.clone()).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let mut update = Update::new(query); update .get_values_mut() .add_field(Name::english("field1"), new); let mut testing = |msg: Message| { doc.get_queue().send(msg); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), count, "should have one entry"); for doc in docs.iter() { assert_eq!(doc.get(Name::english("field0")).unwrap(), picked.into()); assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } }; testing(Message::new(doc_name.clone(), update)); testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); } #[test] fn update_errors_on_bad_field_name() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(standard_paths()); let id = Uuid::new_v4(); let old = "old"; let new = "new"; let bad_name = Name::english("wrong"); doc.populate([id.into(), old.into()].to_vec()); let mut update = Update::new(Query::internal()); update.get_values_mut().add_field(bad_name.clone(), new); doc.send(update); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::NameNotFound(data) => assert_eq!(data, &bad_name), _ => unreachable!("got {:?}: should have gotten an missing field", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn update_errors_on_bad_field_type() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(standard_paths()); let id = Uuid::new_v4(); let old = "old"; let new = Uuid::nil(); doc.populate([id.into(), old.into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); calc.add_value(id.clone()).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let mut update = Update::new(query); update .get_values_mut() .add_field(Name::english("field1"), new); doc.send(update); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(expected, &FieldType::StaticString); assert_eq!(got, &FieldType::Uuid); } _ => unreachable!("got {:?}: should have gotten incorrect file type", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn does_update_maintain_unique_fields() { let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); test_doc .get_docdef_mut() .add_index(&Name::english("field0"), IndexType::Unique) .unwrap(); test_doc.start(standard_paths()); let fname = Name::english("field0"); let old = 3; let new = 5; test_doc.populate([old.into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); calc.add_value(old.clone()).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let mut update = Update::new(query); update.get_values_mut().add_field(&fname, new); test_doc.send(update); test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let mut should_clear = Addition::new(test_doc.doc_name()); should_clear.add_field(fname.clone(), old); let mut should_error = Addition::new(test_doc.doc_name()); should_error.add_field(fname.clone(), new); test_doc.send(should_clear); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), 1, "should have one entry"); for doc in docs.iter() { assert_eq!(doc.get(&fname).unwrap(), old.into()); } } _ => unreachable!("got {:?}: should have gotten records", action), } test_doc.send(should_error); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::FieldDuplicate => {} _ => unreachable!("got {:?}: should have gotten incorrect file type", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn unique_value_remains_available_if_failure_occurs() { let f0name = Name::english("field0"); let f1name = Name::english("field1"); let mut test_doc = TestDocument::new([FieldType::Uuid, FieldType::Uuid].to_vec()); test_doc .get_docdef_mut() .add_index(&f0name, IndexType::Unique) .unwrap(); test_doc.start(standard_paths()); let f0data = Uuid::new_v4(); let f1bad_data = "NotUuid"; let f1good_data = Uuid::nil(); let mut bad_addition = Addition::new(test_doc.doc_name()); bad_addition.add_field(&f0name, f0data.clone()); bad_addition.add_field(&f1name, f1bad_data); let mut good_addition = Addition::new(test_doc.doc_name()); good_addition.add_field(&f0name, f0data.clone()); good_addition.add_field(&f1name, f1good_data.clone()); test_doc.send(bad_addition); test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); test_doc.send(good_addition); let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(docs) => { assert_eq!(docs.len(), 1, "should have one entry"); for doc in docs.iter() { assert_eq!(doc.get(&f0name).unwrap(), f0data.into()); assert_eq!(doc.get(&f1name).unwrap(), f1good_data.into()); } } _ => unreachable!("got {:?}: should have gotten records", action), } } #[test] fn updating_unique_updates_index_entries() { let fname = Name::english("field0"); let mut doc = TestDocument::new([FieldType::StaticString].to_vec()); doc.get_docdef_mut() .add_index(&fname, IndexType::Unique) .unwrap(); doc.start(standard_paths()); let old = "old"; let new = "new"; let fold: Field = old.into(); doc.populate([old.into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); calc.add_value(old).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let mut update = Update::new(query); update.get_values_mut().add_field(fname.clone(), new); doc.send(update); doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let mut old_addition = Addition::new(doc.doc_name()); old_addition.add_field(&fname, old); doc.send(old_addition); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { assert_eq!(data.len(), 1); for doc in data.iter() { assert_eq!(doc.get(&fname).unwrap(), fold); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } let mut new_addition = Addition::new(doc.doc_name()); new_addition.add_field(fname.clone(), new); doc.send(new_addition); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::FieldDuplicate => {} _ => unreachable!("got {:?}: should have gotten an missing field", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn update_does_not_override_unique_index() { let f0name = Name::english("field0"); let f1name = Name::english("field1"); let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.get_docdef_mut() .add_index(&f0name, IndexType::Unique) .unwrap(); doc.start(standard_paths()); let count = 5; let data = "data"; let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(Uuid::new_v4()); } let holder = ids.iter().last().unwrap().clone(); ids.remove(&holder); for id in ids.iter() { doc.populate([id.clone().into(), data.into()].to_vec()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::StaticString)) .unwrap(); calc.add_value(data).unwrap(); let mut query = Query::internal(); query.add(&f1name, calc); let mut update = Update::new(query); update.get_values_mut().add_field(&f0name, holder.clone()); doc.send(update); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::FieldDuplicate => {} _ => unreachable!("got {:?}: should have gotten field duplicate", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } let query = Query::new(doc.doc_name()); doc.send(query); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { assert_eq!(data.len(), ids.len()); for doc in data.iter() { match doc.get(&f0name).unwrap() { Field::Uuid(id) => { assert!(ids.contains(&id)); ids.remove(&id); } _ => unreachable!("did not get uuid"), } } } _ => unreachable!("got {:?}: should have gotten reply", action), } assert!(ids.is_empty(), "did not find {:?}", ids); } #[test] fn can_calculate_field_values() { let fname = Name::english("field0"); let mut doc = TestDocument::new([FieldType::DateTime].to_vec()); doc.start(standard_paths()); let duration = Duration::from_secs(300); let mut calc = Calculation::new(Operand::Add); calc.add_value(FieldType::DateTime).unwrap(); calc.add_value(duration.clone()).unwrap(); let mut addition = Addition::new(doc.doc_name()); addition.add_field(&fname, calc); let start = Utc::now() + duration; doc.send(addition); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let stop = Utc::now() + duration; let action = result.get_action(); match action { MsgAction::Records(data) => { assert_eq!(data.len(), 1); for doc in data.iter() { match doc.get(&fname).unwrap() { Field::DateTime(datetime) => assert!(datetime > start && datetime < stop), _ => unreachable!("did not get uuid"), } } } _ => unreachable!("got {:?}: should have gotten reply", action), } } #[test] fn can_delete() { let fname = Name::english("field0"); let mut doc = TestDocument::new([FieldType::Integer].to_vec()); doc.start(standard_paths()); doc.populate([1.into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(1).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); let mut query = Query::new(doc.doc_name()); query.add(&fname, calc); let delete = Delete::new(query.clone()); doc.send(delete); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { assert_eq!(data.len(), 1); for doc in data.iter() { match doc.get(&fname).unwrap() { Field::Integer(num) => assert_eq!(num, 1), _ => unreachable!("did not get uuid"), } } } _ => unreachable!("got {:?}: should have gotten reply", action), } doc.send(query); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => assert_eq!(data.len(), 0), _ => unreachable!("got {:?}, should have been empty", action), } } #[test] fn does_delete_return_query_errors() { let field_name = Name::english("wrong"); let mut doc = TestDocument::new([FieldType::Integer].to_vec()); doc.start(standard_paths()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); calc.add_value(1).unwrap(); let mut query = Query::internal(); query.add(field_name.clone(), calc); let delete = Delete::new(query); doc.send(delete); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::NameNotFound(data) => assert_eq!(data, &field_name), _ => unreachable!("got {:?}: should have gotten an missing field", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn does_delete_update_indexes() { let fname = Name::english("field0"); let value = 1; let mut doc = TestDocument::new([FieldType::Integer].to_vec()); doc.get_docdef_mut() .add_index(&fname, IndexType::Unique) .unwrap(); doc.start(standard_paths()); doc.populate([value.into()].to_vec()); doc.send(Delete::new(Query::internal())); doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let mut addition = Addition::new(doc.doc_name()); addition.add_field(&fname, value.clone()); doc.send(addition); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => assert_eq!(data.len(), 1), _ => unreachable!("got {:?}, should have added entry", action), } } #[test] fn can_query_trigger_reaction() { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); let doc_name = doc.get_docdef().get_document_names()[0].clone(); let path = Path::new( Include::All, Include::Just(doc_name.clone().into()), Include::Just(Action::OnQuery), ); let mut update = Update::new(Query::internal()); let mut calc = Calculation::new(Operand::Add); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); calc.add_value(1).unwrap(); update .get_values_mut() .add_field(Name::english("field0"), calc); let function = DocFuncType::ExistingQuery(update.into()); doc.get_docdef_mut().add_route(path, function); let mut paths = standard_paths(); paths.push(Path::new( Include::All, Include::Just(doc_name.clone().into()), Include::Just(Action::OnUpdate), )); doc.start(paths); doc.populate([0.into()].to_vec()); for i in 0..5 { let expected: Field = i.try_into().unwrap(); doc.send(Query::new(doc_name.clone())); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Records(data) => { assert_eq!(data.len(), 1); for rec in data.iter() { assert_eq!(rec.get(&Name::english("field0")).unwrap(), expected); } } _ => unreachable!("got {:?}, should have added entry", action), } let on_update = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); match on_update.get_action() { MsgAction::OnUpdate(recs) => { let expected: Field = (i + 1).into(); assert_eq!(recs.len(), 1); for rec in recs.iter() { assert_eq!(rec.get(Name::english("field0")).unwrap(), expected); } } _ => unreachable!("should only be on update"), } } } #[test] fn can_an_action_trigger_an_action() { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); let queue = doc.get_queue(); let doc_name = doc.get_docdef().get_document_names()[0].clone(); crate::document::clock::Clock::start(queue.clone()); let mut calc = Calculation::new(Operand::GreaterThan); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); calc.add_value(1).unwrap(); let mut query = Query::internal(); query.add(Name::english("field0"), calc); let delete = Delete::new(query.clone()); let path = Path::new( Include::All, Include::Just(Name::english("clock").into()), Include::Just(Action::OnUpdate), ); let function = DocFuncType::Trigger(delete.into()); doc.get_docdef_mut().add_route(path, function); let mut paths = standard_paths(); paths.push(Path::new( Include::All, Include::Just(doc_name.clone().into()), Include::Just(Action::Delete), )); doc.start(paths); let queue = doc.get_queue(); for item in 1..3 { doc.populate([item.into()].to_vec()); } let trigger = Message::new( Name::english("clock"), MsgAction::OnUpdate(Records::new(Names::new())), ); queue.send(trigger.clone()); sleep(TIMEOUT); let msg = Message::new(doc_name.clone(), Query::new(doc_name.clone())); queue.send(msg.clone()); let mut result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); while result.get_message_id() != msg.get_message_id() { result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); } let action = result.get_action(); let expected: Field = 1.into(); match action { MsgAction::Records(data) => { assert_eq!(data.len(), 1, "should have been one record:\n{:?}", data); for rec in data.iter() { assert_eq!(rec.get(Name::english("field0")).unwrap(), expected); } } _ => unreachable!("should return records"), } } }