diff --git a/' b/' deleted file mode 100644 index 617c857..0000000 --- a/' +++ /dev/null @@ -1,2426 +0,0 @@ -use crate::{ - action::{Action, CalcValue, Calculation, MsgAction, Query}, - document::{ - definition::{DocDef, DocFuncType}, - field::Field, - }, - message::wrapper::{InternalRecord, InternalRecords, Message, Oid, Records, Reply, Update}, - mtterror::MTTError, - name::NameType, - queue::{ - data_director::{Include, Path, RegMsg, Register, RouteID}, - router::Queue, - }, -}; -use std::{ - collections::{HashMap, HashSet}, - sync::mpsc::{channel, Receiver}, - thread::spawn, -}; -use uuid::Uuid; - -pub struct CreateDoc { - queue: Queue, - rx: Receiver, -} - -impl CreateDoc { - fn new(queue: Queue, rx: Receiver) -> Self { - Self { - queue: queue, - rx: rx, - } - } - - pub fn start(mut queue: Queue) { - let (tx, rx) = channel(); - let routes = [Path::new( - Include::All, - Include::All, - Include::Just(Action::Create), - )] - .to_vec(); - let id = queue.add_sender(tx); - for route in routes.iter() { - let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone())); - queue.send(Message::new(NameType::None, regmsg)); - rx.recv().unwrap(); - } - let doc = CreateDoc::new(queue, rx); - spawn(move || { - doc.listen(); - }); - } - - fn listen(&self) { - loop { - let msg = self.rx.recv().unwrap(); - DocumentFile::start(self.queue.clone(), msg); - } - } -} - -#[cfg(test)] -mod createdocs { - use super::*; - use crate::{name::Name, support_tests::TIMEOUT}; - - struct TestCreateDoc { - queue: Queue, - rx: Receiver, - rx_id: Uuid, - } - - impl TestCreateDoc { - fn new() -> Self { - let mut queue = Queue::new(); - let (tx, rx) = channel(); - let id = queue.add_sender(tx); - CreateDoc::start(queue.clone()); - Self { - queue: queue, - rx: rx, - rx_id: id, - } - } - - fn get_queue(&self) -> Queue { - self.queue.clone() - } - - fn get_receiver(&self) -> &Receiver { - &self.rx - } - - fn register_paths(&self, paths: Vec) { - for path in paths.iter() { - let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone())); - self.queue.send(Message::new(NameType::None, regmsg)); - self.rx.recv_timeout(TIMEOUT).unwrap(); - } - } - } - - #[test] - fn create_document_creation() { - let doc_creator = TestCreateDoc::new(); - let paths = [ - Path::new(Include::All, Include::All, Include::Just(Action::Reply)), - Path::new(Include::All, Include::All, Include::Just(Action::Records)), - ] - .to_vec(); - doc_creator.register_paths(paths); - let queue = doc_creator.get_queue(); - let rx = doc_creator.get_receiver(); - let name = Name::english("project"); - let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); - queue.send(msg1.clone()); - let result1 = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!( - result1.get_message_id(), - msg1.get_message_id(), - "received {:?} from create message.", - rx - ); - match result1.get_action() { - MsgAction::Reply(_) => {} - _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), - } - let msg2 = Message::new(name.clone(), Query::new(name.clone())); - queue.send(msg2.clone()); - let result2 = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result2.get_message_id(), msg2.get_message_id()); - match result2.get_action() { - MsgAction::Records(data) => assert_eq!(data.len(), 0), - _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), - } - } - - #[test] - fn does_duplicates_generate_error() { - let doc_creator = TestCreateDoc::new(); - let paths = [Path::new( - Include::All, - Include::All, - Include::Just(Action::Error), - )] - .to_vec(); - doc_creator.register_paths(paths); - let queue = doc_creator.get_queue(); - let rx = doc_creator.get_receiver(); - let name = Name::english("duplicate"); - let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); - let msg2 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); - queue.send(msg1.clone()); - queue.send(msg2.clone()); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg2.get_message_id()); - match result.get_action() { - MsgAction::Error(err) => match err { - MTTError::NameDuplicate(data) => assert_eq!(data, &name), - _ => unreachable!("got {:?}: should have been a duplicate name", err), - }, - _ => unreachable!("got {:?}: should have been a reply.", result.get_action()), - } - } -} - -#[derive(Clone, Debug)] -pub enum IndexType { - Index, - Unique, -} - -impl IndexType { - fn create_index(&self) -> Index { - match self { - Self::Index => Index::new(), - Self::Unique => Index::new_unique(), - } - } -} - -#[derive(Debug)] -struct Index { - data: HashMap>, - unique: bool, -} - -impl Index { - fn new() -> Self { - Self { - data: HashMap::new(), - unique: false, - } - } - - fn new_unique() -> Self { - Self { - data: HashMap::new(), - unique: true, - } - } - - fn internal_add(&mut self, field: &Field, oid: Oid) { - let storage = match self.data.get_mut(field) { - Some(data) => data, - None => { - let data = HashSet::new(); - self.data.insert(field.clone(), data); - self.data.get_mut(field).unwrap() - } - }; - storage.insert(oid); - } - - fn add(&mut self, field: Field, oid: Oid) -> Result<(), MTTError> { - let oids = match self.data.get_mut(&field) { - Some(data) => data, - None => { - self.data.insert(field.clone(), HashSet::new()); - self.data.get_mut(&field).unwrap() - } - }; - if self.unique && oids.len() > 0 { - return Err(MTTError::FieldDuplicate); - } else { - oids.insert(oid); - } - Ok(()) - } - - fn pull(&self, calc: &Calculation) -> Result, MTTError> { - let mut output = HashSet::new(); - for (key, value) in self.data.iter() { - match calc.calculate(key) { - Field::Boolean(data) => { - if data { - output = output.union(&value).cloned().collect(); - } - } - _ => return Err(MTTError::FieldInvalidType), - } - } - Ok(output) - } - - fn remove(&mut self, field: &Field, oid: &Oid) { - match self.data.get_mut(field) { - Some(oids) => { - oids.remove(oid); - if oids.len() == 0 { - self.data.remove(field); - } - } - None => {} - }; - } - - fn validate(&self, field: &Field) -> Result<(), MTTError> { - if self.unique { - match self.data.get(field) { - Some(_) => return Err(MTTError::FieldDuplicate), - None => {} - } - } - Ok(()) - } -} - -struct Indexes { - data: HashMap, -} - -impl Indexes { - fn new(settings: &HashMap) -> Self { - let mut output = HashMap::new(); - for (key, value) in settings.iter() { - output.insert(key.clone(), value.create_index()); - } - Self { data: output } - } - - fn index_ids(&self) -> HashSet<&Uuid> { - self.data.keys().collect::>() - } - - fn pull(&self, field_id: &Uuid, calc: &Calculation) -> Result, MTTError> { - self.data.get(field_id).unwrap().pull(calc) - } - - fn add_to_index(&mut self, field_name: &Uuid, field: Field, oid: Oid) -> Result<(), MTTError> { - let index = match self.data.get_mut(field_name) { - Some(data) => data, - None => return Ok(()), - }; - index.add(field, oid) - } - - fn validate(&self, field_name: &Uuid, value: &Field) -> Result<(), MTTError> { - match self.data.get(field_name) { - Some(index) => match index.validate(value) { - Ok(_) => {} - Err(err) => return Err(err), - }, - None => {} - } - Ok(()) - } - - fn iter_mut(&mut self) -> impl Iterator { - self.data.iter_mut() - } -} - -#[cfg(test)] -mod indexes { - use super::*; - use crate::action::{FieldType, Operand}; - - fn get_fields(count: usize) -> Vec { - let mut output = Vec::new(); - while output.len() < count { - let field: Field = Uuid::new_v4().into(); - if !output.contains(&field) { - output.push(field); - } - } - output - } - - fn get_oids(count: usize) -> Vec { - let mut output = Vec::new(); - while output.len() < count { - let oid = Oid::new(); - if !output.contains(&oid) { - output.push(oid); - } - } - output - } - - #[test] - fn add_to_index() { - let mut index = Index::new(); - let count = 3; - let fields = get_fields(count); - let oids = get_oids(count); - for i in 0..count { - index.add(fields[i].clone(), oids[i].clone()).unwrap(); - } - for i in 0..count { - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(fields[i].clone()).unwrap(); - calc.add_value(CalcValue::Existing(FieldType::Uuid)) - .unwrap(); - let result = index.pull(&calc).unwrap(); - assert_eq!(result.len(), 1); - assert_eq!(result.iter().last().unwrap(), &oids[i]); - } - } - - #[test] - fn index_can_handle_multiple_entries() { - let mut index = Index::new(); - let count = 3; - let fields = get_fields(1); - let oids = get_oids(count); - for i in 0..count { - index.add(fields[0].clone(), oids[i].clone()).unwrap(); - } - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(fields[0].clone()).unwrap(); - calc.add_value(CalcValue::Existing(FieldType::Uuid)) - .unwrap(); - let result = index.pull(&calc).unwrap(); - assert_eq!(result.len(), 3); - for oid in oids { - assert!(result.contains(&oid)); - } - } - - #[test] - fn can_remove_oid() { - let mut index = Index::new(); - let count = 3; - let pos = 1; - let fields = get_fields(1); - let oids = get_oids(count); - for i in 0..count { - index.add(fields[0].clone(), oids[i].clone()).unwrap(); - } - index.remove(&fields[0], &oids[pos]); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(fields[0].clone()).unwrap(); - calc.add_value(CalcValue::Existing(FieldType::Uuid)) - .unwrap(); - let result = index.pull(&calc).unwrap(); - assert!(!result.contains(&oids[pos]), "should have removed oid"); - } - - #[test] - fn are_empty_indexes_removed() { - let mut index = Index::new(); - let field: Field = Uuid::new_v4().into(); - let oid = Oid::new(); - index.add(field.clone(), oid.clone()).unwrap(); - index.remove(&field, &oid); - assert_eq!(index.data.len(), 0); - } - - #[test] - fn do_unique_indexes_error_on_duplicates() { - let mut index = Index::new_unique(); - let field: Field = "fred".into(); - let oids = get_oids(2); - index.add(field.clone(), oids[0].clone()).unwrap(); - match index.add(field.clone(), oids[0].clone()) { - Ok(_) => unreachable!("should have been an error"), - Err(err) => match err { - MTTError::FieldDuplicate => {} - _ => unreachable!("got {:?}: should have been duplicate field", err), - }, - } - } - - #[test] - fn index_returns_validate() { - let mut index = Index::new(); - let field: Field = "stuff".into(); - let oid = Oid::new(); - index.add(field.clone(), oid).unwrap(); - match index.validate(&field) { - Ok(_) => {} - Err(err) => unreachable!("got {:?}: should have returned without issue", err), - } - } - - #[test] - fn unique_return_duplicate_error() { - let mut index = Index::new_unique(); - let field: Field = "fred".into(); - let oid = Oid::new(); - index.add(field.clone(), oid).unwrap(); - match index.validate(&field) { - Ok(_) => unreachable!("should have gotten a duplication error"), - Err(err) => match err { - MTTError::FieldDuplicate => {} - _ => unreachable!("got {:?}: should have been duplicate field", err), - }, - } - } -} - -struct DocumentFile { - docdef: DocDef, - docs: InternalRecords, - indexes: Indexes, - name_id: Uuid, - queue: Queue, - routes: HashMap, - rx: Receiver, -} - -impl DocumentFile { - fn new( - queue: Queue, - rx: Receiver, - docdef: DocDef, - routes: HashMap, - name_id: Uuid, - ) -> Self { - let indexes = Indexes::new(docdef.get_indexes()); - Self { - docdef: docdef.clone(), - docs: InternalRecords::new(), - indexes: indexes, - name_id: name_id, - queue: queue, - routes: routes, - rx: rx, - } - } - - fn start(mut queue: Queue, msg: Message) { - let (tx, rx) = channel(); - let action = msg.get_action(); - let docdef = match action { - MsgAction::Create(data) => data.clone(), - _ => unreachable!("got {:?}: should have been a create message", action), - }; - let names = docdef.get_document_names(); - let id = queue.add_sender(tx); - let reg_msg = Register::new(id, RegMsg::AddDocName(names.clone())); - let rmsg = msg.response(reg_msg.clone()); - queue.send(rmsg.clone()); - let name_result = rx.recv().unwrap(); - let name_id = match name_result.get_action() { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(data) => data, - RegMsg::Error(err) => { - queue.remove_sender(&id); - queue.send(msg.response(err.clone())); - return; - } - _ => unreachable!("should only return a name id or an error"), - }, - _ => unreachable!("should only return a name id or an error"), - }; - let mut route_action: HashMap = HashMap::new(); - for path_action in docdef.iter_routes() { - let request = reg_msg.response(RegMsg::AddRoute(path_action.path())); - let add_route = rmsg.response(request); - queue.send(add_route); - let result = rx.recv().unwrap(); - let route_id = match result.get_action() { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::RouteID(data) => data, - RegMsg::Error(err) => { - queue.remove_sender(&id); - queue.send(msg.response(err.clone())); - return; - } - _ => unreachable!("should only return a route id or an error"), - }, - _ => unreachable!("should only return a route id or an error"), - }; - route_action.insert(route_id.clone(), path_action.doc_function()); - } - let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action, name_id.clone()); - spawn(move || { - doc.listen(); - }); - let reply = msg.response(Reply::new()); - queue.send(reply.clone()); - } - - fn listen(&mut self) { - loop { - let msg = self.rx.recv().unwrap(); - let route = msg.get_route(); - for (route_id, doc_func) in self.routes.clone().iter() { - if route == route_id.into() { - match doc_func { - DocFuncType::Add => self.add_document(&msg), - DocFuncType::Delete => self.delete(&msg), - DocFuncType::Query => self.query(&msg), - DocFuncType::Show => self.queue.send(msg.response(Reply::new())), - DocFuncType::Update => self.update(&msg), - DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), - DocFuncType::Trigger(action) => self.trigger(&msg, action), - } - } - } - } - } - - fn validate(&self, field_name: NT, value: &Field) -> Result - where - NT: Into, - { - let field_id = match self.docdef.get_field_id(field_name) { - Ok(data) => data, - Err(err) => return Err(err), - }; - let output = match self.docdef.validate(field_id.clone(), value) { - Ok(data) => data, - Err(err) => return Err(err), - }; - match self.indexes.validate(&field_id, &output) { - Ok(_) => {} - Err(err) => return Err(err), - } - Ok(output) - } - - fn add_document(&mut self, msg: &Message) { - let addition = match msg.get_action() { - MsgAction::Addition(data) => data, - _ => return, - }; - let mut holder = InternalRecord::new(); - for (name_id, value) in addition.iter() { - let field_id = match self.docdef.get_field_id(name_id) { - Ok(id) => id, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply); - return; - } - }; - holder.insert(field_id.clone(), value.get(&Field::None)); - } - for field_id in self.docdef.field_ids().iter() { - let value = match holder.get(field_id) { - Some(data) => data, - None => &Field::None, - }; - let corrected = match self.validate(field_id, value) { - Ok(data) => data, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply); - return; - } - }; - holder.insert(field_id.clone(), corrected.clone()); - } - let mut records = Records::new(self.docdef.get_field_names().clone()); - if !holder.is_empty() { - let mut oid = Oid::new(); - while self.docs.contains_key(&oid) { - oid = Oid::new(); - } - for (field_id, oids) in self.indexes.iter_mut() { - let value = holder.get(field_id).unwrap(); - oids.internal_add(value, oid.clone()); - } - self.docs.insert(oid.clone(), holder.clone()); - records.insert(oid, holder); - } - self.queue.send(msg.response(records.clone())); - self.queue - .send(msg.response(MsgAction::OnAddition(records))); - } - - fn delete(&mut self, msg: &Message) { - let delete = match msg.get_action() { - MsgAction::Delete(data) => data, - _ => unreachable!("should always be delete action"), - }; - let records = match self.run_query(delete.get_query()) { - Ok(data) => data, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply); - return; - } - }; - for (oid, record) in records.iter() { - for (field_id, index) in self.indexes.iter_mut() { - index.remove(record.get(field_id).unwrap(), oid); - } - self.docs.remove(oid); - } - let rec = Records::with_data(self.docdef.get_field_names().clone(), records); - self.queue.send(msg.response(rec.clone())); - self.queue.send(msg.response(MsgAction::OnDelete(rec))); - } - - fn run_query(&self, query: &Query) -> Result { - let indexed_ids = self.indexes.index_ids(); - let mut indexed: HashMap = HashMap::new(); - let mut unindexed: HashMap = HashMap::new(); - for (field, data) in query.iter() { - let id = match self.docdef.get_field_id(field) { - Ok(fid) => fid, - Err(err) => return Err(err), - }; - if indexed_ids.contains(&id) { - indexed.insert(id, data.clone()); - } else { - unindexed.insert(id, data.clone()); - } - } - let mut oids: HashSet = self.docs.keys().cloned().collect(); - for (field_id, calculation) in indexed.iter() { - let holder = match self.indexes.pull(field_id, calculation) { - Ok(data) => data, - Err(err) => return Err(err), - }; - oids = oids.intersection(&holder).cloned().collect(); - } - let mut records = InternalRecords::new(); - for oid in oids.iter() { - records.insert(oid.clone(), self.docs.get(oid).unwrap().clone()); - } - let holder = oids.clone(); - for (oid, record) in records.iter() { - for (field_id, calc) in unindexed.iter() { - match calc.calculate(record.get(field_id).unwrap()) { - Field::Boolean(data) => { - if !data { - oids.remove(oid); - break; - } - } - _ => { - oids.remove(oid); - break; - } - } - } - } - let removals = holder.difference(&oids); - for oid in removals { - records.remove(oid); - } - Ok(records) - } - - fn query(&self, msg: &Message) { - let query = match msg.get_action() { - MsgAction::Query(data) => data, - _ => unreachable!("should receive a query"), - }; - let records = match self.run_query(query) { - Ok(data) => data, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply); - return; - } - }; - let recs = Records::with_data(self.docdef.get_field_names().clone(), records); - self.queue.send(msg.response(recs.clone())); - self.queue.send(msg.response(MsgAction::OnQuery(recs))); - } - - fn run_update( - &mut self, - original: &InternalRecords, - update: &Update, - msg: &Message, - ) -> Result { - let mut changes: HashMap = HashMap::new(); - for (key, value) in update.get_values().iter() { - let field_id = match self.docdef.get_field_id(key) { - Ok(data) => data, - Err(err) => return Err(err), - }; - changes.insert(field_id, value); - } - let mut indexes = Indexes::new(self.docdef.get_indexes()); - let mut updates = InternalRecords::new(); - for (oid, record) in original.iter() { - let mut holder = record.clone(); - for (field_id, value) in changes.iter() { - let field = value.get(holder.get(field_id).unwrap()); - let correction = match self.validate(field_id, &field) { - Ok(data) => data, - Err(err) => return Err(err), - }; - holder.insert(field_id.clone(), correction.clone()); - match indexes.add_to_index(&field_id, correction, oid.clone()) { - Ok(_) => {} - Err(err) => return Err(err), - } - } - updates.insert(oid.clone(), holder); - } - for (oid, new_rec) in updates.iter() { - let old_rec = original.get(oid).unwrap(); - for (field_id, index) in self.indexes.iter_mut() { - index.remove(old_rec.get(field_id).unwrap(), oid); - index - .add(new_rec.get(field_id).unwrap().clone(), oid.clone()) - .unwrap(); - } - self.docs.insert(oid.clone(), new_rec.clone()); - } - let recs = Records::with_data(self.docdef.get_field_names().clone(), updates); - self.queue - .send(msg.response(MsgAction::OnUpdate(recs.clone()))); - Ok(recs) - } - - fn update(&mut self, msg: &Message) { - let update = match msg.get_action() { - MsgAction::Update(data) => data, - _ => unreachable!("should receive a update"), - }; - let original = match self.run_query(update.get_query()) { - Ok(result) => result, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply); - return; - } - }; - let data = match self.run_update(&original, update, msg) { - Ok(output) => output, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply); - return; - } - }; - self.queue.send(msg.response(data)); - } - - fn existing_query(&mut self, msg: &Message, action: &MsgAction) { - let recs = match msg.get_action() { - MsgAction::OnQuery(data) => data, - _ => unreachable!("should only receive on messages"), - }; - match action { - MsgAction::Update(change) => self - .run_update(recs.get_internal_records(), change, msg) - .unwrap(), - _ => panic!("should not get here"), - }; - } - - fn trigger(&self, msg: &Message, action: &MsgAction) { - self.queue - .send(msg.forward(self.name_id.clone(), action.clone())); - } -} - -#[cfg(test)] -mod document_files { - use super::*; - use crate::{ - action::{Addition, Operand}, - document::field::FieldType, - message::wrapper::Delete, - name::{Name, Names}, - support_tests::TIMEOUT, - }; - use chrono::Utc; - use std::{sync::mpsc::RecvTimeoutError, thread::sleep, time::Duration}; - - fn standard_paths() -> Vec { - [ - Path::new(Include::All, Include::All, Include::Just(Action::Records)), - Path::new(Include::All, Include::All, Include::Just(Action::Reply)), - Path::new(Include::All, Include::All, Include::Just(Action::Error)), - ] - .to_vec() - } - - struct TestDocument { - docdef: DocDef, - queue: Queue, - sender_id: Uuid, - rx: Receiver, - } - - impl TestDocument { - fn new(field_types: Vec) -> Self { - let doc_name = Name::english(Uuid::new_v4().to_string().as_str()); - let mut docdef = DocDef::new(doc_name.clone()); - let mut count = 0; - for field_type in field_types.iter() { - docdef.add_field( - Name::english(format!("field{}", count).as_str()), - field_type.clone(), - ); - count += 1; - } - let (tx, rx) = channel(); - let mut queue = Queue::new(); - let id = queue.add_sender(tx); - Self { - docdef: docdef, - queue: queue, - sender_id: id, - rx: rx, - } - } - - fn doc_name(&self) -> Name { - self.docdef.get_document_names()[0].clone() - } - - fn get_docdef(&self) -> &DocDef { - &self.docdef - } - - fn get_docdef_mut(&mut self) -> &mut DocDef { - &mut self.docdef - } - - fn get_queue(&mut self) -> Queue { - self.queue.clone() - } - - fn get_receiver(&self) -> &Receiver { - &self.rx - } - - fn get_sender_id(&self) -> Uuid { - self.sender_id.clone() - } - - fn send(&self, action: A) - where - A: Into, - { - let msg = Message::new(self.docdef.get_document_names()[0].clone(), action); - self.queue.send(msg); - } - - fn start(&mut self, routes: Vec) { - let msg = Message::new( - self.docdef.get_document_names()[0].clone(), - self.docdef.clone(), - ); - DocumentFile::start(self.queue.clone(), msg); - for route in routes.iter() { - let request = - Register::new(self.sender_id.clone(), RegMsg::AddRoute(route.clone())); - let add_route = Message::new(NameType::None, request); - self.queue.send(add_route); - self.rx.recv().unwrap(); - } - } - - fn populate(&self, data: Vec) { - let mut add = Addition::new(self.doc_name()); - let mut count = 0; - for item in data.iter() { - add.add_field( - Name::english(format!("field{}", count).as_str()), - item.clone(), - ); - count += 1; - } - self.send(add); - match self.rx.recv_timeout(TIMEOUT) { - Ok(_) => {} // eats the addition response. - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("got {}, should have been ok or time out", err), - }, - } - } - } - - impl From for TestDocument { - fn from(value: DocDef) -> Self { - let (tx, rx) = channel(); - let mut queue = Queue::new(); - let id = queue.add_sender(tx); - Self { - docdef: value, - queue: queue, - sender_id: id, - rx: rx, - } - } - } - - #[test] - fn does_not_respond_to_create() { - let name = Name::english("quiet"); - let docdef = DocDef::new(name.clone()); - let mut test_doc: TestDocument = docdef.into(); - let alt = Name::english("alternate"); - test_doc.start(standard_paths()); - let docdef = DocDef::new(alt); - let msg = Message::new(name.clone(), docdef); - test_doc.get_queue().send(msg); - match test_doc.get_receiver().recv_timeout(TIMEOUT) { - Ok(msg) => unreachable!("should not receive: {:?}", msg), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("should have timed out"), - }, - } - } - - #[test] - fn does_document_respond_to_requests() { - let name = Name::english("listen"); - let docdef = DocDef::new(name.clone()); - let mut test_doc: TestDocument = docdef.into(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let msg_actions = [ - MsgAction::Addition(Addition::new(name.clone())), - MsgAction::Delete(Delete::new(Query::internal())), - MsgAction::Query(Query::new(name.clone())), - MsgAction::Show, - MsgAction::Update(Update::new(Query::internal())), - ]; - for msg_action in msg_actions.iter() { - let msg = Message::new(name.clone(), msg_action.clone()); - queue.send(msg.clone()); - let result = match test_doc.get_receiver().recv_timeout(TIMEOUT) { - Ok(data) => data.clone(), - Err(err) => unreachable!("for {:?} got {:?}", msg_action, err), - }; - assert_eq!( - result.get_message_id(), - msg.get_message_id(), - "for {:?} response and reply ids should equal", - msg_action - ); - match result.get_action() { - MsgAction::Reply(data) => { - assert_eq!(data.len(), 0, "for {:?} got {:?}", msg_action, result) - } - MsgAction::Records(data) => { - assert_eq!(data.len(), 0, "for {:?} got {:?}", msg_action, result) - } - _ => unreachable!( - "for {:?} got {:?}: should have received a reply", - msg_action, - result.get_action() - ), - } - } - } - - #[test] - fn does_not_respond_to_other_document_requests() { - let name = Name::english("quiet"); - let alt = Name::english("alternate"); - let docdef = DocDef::new(name.clone()); - let mut test_doc: TestDocument = docdef.into(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let reg_msg = Register::new( - test_doc.get_sender_id(), - RegMsg::AddDocName([alt.clone()].to_vec()), - ); - let setup = Message::new(NameType::None, reg_msg.clone()); - queue.send(setup); - test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let msg_actions = [ - MsgAction::Addition(Addition::new(alt.clone())), - MsgAction::Create(DocDef::new(name.clone())), - MsgAction::Delete(Delete::new(Query::new(alt.clone()))), - MsgAction::Query(Query::internal().into()), - MsgAction::Show, - MsgAction::Update(Update::new(Query::internal())), - ]; - let mut msgs: HashMap = HashMap::new(); - for msg_action in msg_actions.iter() { - let msg = Message::new(alt.clone(), msg_action.clone()); - msgs.insert(msg.get_message_id().clone(), msg_action.clone()); - queue.send(msg); - } - match test_doc.get_receiver().recv_timeout(TIMEOUT) { - Ok(msg) => unreachable!( - "for {:?} should not receive: {:?}", - msgs.get(msg.get_message_id()).unwrap(), - msg - ), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("got {:?}, should have timed out", err), - }, - } - } - - #[test] - fn query_sends_on_query_message() { - let count = 5; - let mut data: HashSet = HashSet::new(); - while data.len() < count { - let field: Field = Uuid::new_v4().into(); - data.insert(field); - } - let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); - let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); - let queue = test_doc.get_queue(); - let routes = [Path::new( - Include::All, - Include::All, - Include::Just(Action::OnQuery), - )] - .to_vec(); - test_doc.start(routes); - for item in data.iter() { - test_doc.populate([item.clone()].to_vec()); - } - let msg = Message::new(doc_name.clone(), Query::new(doc_name.clone())); - queue.send(msg.clone()); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - assert_eq!( - result.get_message_id(), - msg.get_message_id(), - "message ids should match" - ); - match result.get_action() { - MsgAction::OnQuery(output) => { - assert_eq!( - output.len(), - count, - "wrong number of entries: got {:?}", - output - ); - for rec in output.iter() { - assert!(data.contains(&rec.get(Name::english("field0")).unwrap())); - } - } - _ => unreachable!("should never get here"), - } - } - - #[test] - fn send_on_addition_message() { - let data: Field = Uuid::new_v4().into(); - let field_name = Name::english("field0"); - let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); - let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); - let queue = test_doc.get_queue(); - let routes = vec![Path::new( - Include::All, - Include::All, - Include::Just(Action::OnAddition), - )]; - test_doc.start(routes); - let mut add = Addition::new(doc_name.clone()); - add.add_field(field_name.clone(), data.clone()); - let msg = Message::new(doc_name.clone(), add); - queue.send(msg.clone()); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - assert_eq!( - result.get_message_id(), - msg.get_message_id(), - "message ids should match" - ); - match result.get_action() { - MsgAction::OnAddition(output) => { - assert_eq!(output.len(), 1, "wrong number of entries: got {:?}", output); - for rec in output.iter() { - assert_eq!(rec.get(Name::english("field0")).unwrap(), data); - } - } - _ => unreachable!("should never get here"), - } - } - - #[test] - fn sends_on_delete_message() { - let count = 5; - let mut data: HashSet = HashSet::new(); - while data.len() < count { - let field: Field = Uuid::new_v4().into(); - data.insert(field); - } - let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); - let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); - let queue = test_doc.get_queue(); - let routes = [Path::new( - Include::All, - Include::All, - Include::Just(Action::OnDelete), - )] - .to_vec(); - test_doc.start(routes); - for item in data.iter() { - test_doc.populate([item.clone()].to_vec()); - } - let msg = Message::new(doc_name.clone(), Delete::new(Query::internal())); - queue.send(msg.clone()); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - assert_eq!( - result.get_message_id(), - msg.get_message_id(), - "message ids should match" - ); - match result.get_action() { - MsgAction::OnDelete(output) => { - assert_eq!( - output.len(), - count, - "wrong number of entries: got {:?}", - output - ); - for rec in output.iter() { - assert!(data.contains(&rec.get(Name::english("field0")).unwrap())); - } - } - _ => unreachable!("should never get here"), - } - } - - #[test] - fn sends_on_update_message() { - let count = 5; - let field_name = Name::english("field0"); - let mut data: HashSet = HashSet::new(); - while data.len() < count { - let field: Field = Uuid::new_v4().into(); - data.insert(field); - } - let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); - let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); - let queue = test_doc.get_queue(); - let routes = [Path::new( - Include::All, - Include::All, - Include::Just(Action::OnUpdate), - )] - .to_vec(); - test_doc.start(routes); - for item in data.iter() { - test_doc.populate([item.clone()].to_vec()); - } - let mut update = Update::new(Query::internal()); - update - .get_values_mut() - .add_field(field_name.clone(), Uuid::nil()); - let msg = Message::new(doc_name.clone(), update); - queue.send(msg.clone()); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - assert_eq!( - result.get_message_id(), - msg.get_message_id(), - "message ids should match" - ); - match result.get_action() { - MsgAction::OnUpdate(output) => { - assert_eq!( - output.len(), - count, - "wrong number of entries: got {:?}", - output - ); - for rec in output.iter() { - assert_eq!(rec.get(&field_name).unwrap(), Uuid::nil().into()); - } - } - _ => unreachable!("should never get here"), - } - } - - #[test] - fn can_document_be_added() { - let doc_name = Name::english("document"); - let mut docdef = DocDef::new(doc_name.clone()); - let name = Name::english("field"); - let data = Uuid::new_v4(); - docdef.add_field(name.clone(), FieldType::Uuid); - let mut test_doc: TestDocument = docdef.clone().into(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let mut new_doc = Addition::new(doc_name.clone()); - new_doc.add_field(name.clone(), data.clone()); - let testing = |msg: Message| { - queue.send(msg.clone()); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - match result.get_action() { - MsgAction::Records(output) => { - assert_eq!(output.len(), 1); - for rec in output.iter() { - let holder = rec.get(&name).unwrap(); - match holder { - Field::Uuid(field_data) => assert_eq!(field_data, data), - _ => unreachable!("got {:?}, should have been uuid", holder), - } - } - } - _ => unreachable!( - "\n\ngot {:?}\n\nfor {:?}\n\nshould have been records", - result, msg - ), - } - }; - testing(Message::new(doc_name.clone(), new_doc)); - testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); - } - - #[test] - fn can_add_multiple_documents() { - let doc_name = Name::english("multiple"); - let mut docdef = DocDef::new(doc_name.clone()); - let name = Name::english("count"); - docdef.add_field(name.clone(), FieldType::Integer); - let mut test_doc: TestDocument = docdef.clone().into(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let count = 5; - for i in 0..count { - let mut new_doc = Addition::new(doc_name.clone()); - new_doc.add_field(name.clone(), i); - queue.send(Message::new(doc_name.clone(), new_doc)); - test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - } - queue.send(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - let mut entries: HashSet = (0..count).collect(); - match action { - MsgAction::Records(output) => { - let entry_count: usize = count.try_into().unwrap(); - assert_eq!( - output.len(), - entry_count, - "should have the same number of entries" - ); - for record in output.iter() { - let holder = record.get(&name).unwrap(); - let data = match holder { - Field::Integer(item) => item.clone(), - _ => unreachable!("got {:?}, should have been integer", holder), - }; - assert!( - entries.contains(&data), - "did not find {:?} in {:?}", - data, - entries - ); - entries.remove(&data); - } - } - _ => unreachable!("\n\ngot {:?}\n\nshould have been records", action), - } - assert!(entries.is_empty(), "did not use {:?}", entries); - } - - #[test] - fn errors_on_wrong_field_name() { - let mut test_doc = TestDocument::new(Vec::new()); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let name = Name::english("bad"); - let mut addition = Addition::new(test_doc.doc_name()); - addition.add_field(name.clone(), "doesn't matter"); - queue.send(Message::new( - test_doc.get_docdef().get_document_names()[0].clone(), - addition, - )); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - match result.get_action() { - MsgAction::Error(err) => match err { - MTTError::NameNotFound(data) => assert_eq!(data, &name), - _ => unreachable!("got {:?}: should have been document field not found.", err), - }, - _ => unreachable!("got {:?}: should have been an error", result.get_action()), - } - } - - #[test] - fn errors_on_wrong_field_type() { - let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let mut addition = Addition::new(test_doc.doc_name()); - addition.add_field(Name::english("field0"), "string"); - queue.send(Message::new( - test_doc.get_docdef().get_document_names()[0].clone(), - addition, - )); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - match result.get_action() { - MsgAction::Error(err) => match err { - MTTError::DocumentFieldWrongDataType(expected, got) => { - assert_eq!(got, &FieldType::StaticString); - assert_eq!(expected, &FieldType::Uuid); - } - _ => unreachable!( - "got {:?}: should have been document field data mismatch.", - err - ), - }, - _ => unreachable!("got {:?}: should have been an error", result.get_action()), - } - } - - #[test] - fn errors_on_missing_fields() { - let mut test_doc = TestDocument::new([FieldType::Integer, FieldType::Integer].to_vec()); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let mut addition = Addition::new(test_doc.doc_name()); - addition.add_field(Name::english("field0"), 1); - queue.send(Message::new( - test_doc.get_docdef().get_document_names()[0].clone(), - addition, - )); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - match result.get_action() { - MsgAction::Error(err) => match err { - MTTError::InvalidNone => {} - _ => unreachable!("got {:?}: should have been document field missing", err), - }, - _ => unreachable!("got {:?}: should have been an error", result.get_action()), - } - } - - #[test] - fn does_query_return_related_entries() { - let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); - let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let count = 5; - let expected = 3; - for i in 0..count { - test_doc.populate([i.into()].to_vec()); - } - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(expected.clone()).unwrap(); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - let mut query = Query::new(doc_name); - query.add(Name::english("field0"), calc); - queue.send(Message::new( - test_doc.get_docdef().get_document_names()[0].clone(), - query, - )); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - assert_eq!( - data.len(), - 1, - "should return one entry containing {:?} got:\n{:?}", - expected, - action - ); - for doc in data.iter() { - assert_eq!(doc.get(&Name::english("field0")).unwrap(), expected.into()); - } - } - _ => unreachable!("got {:?}: should have been a reply", action), - } - } - - #[test] - fn does_query_work_with_greater_than() { - let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); - let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - test_doc.populate([1.into()].to_vec()); - test_doc.populate([2.into()].to_vec()); - let mut calc = Calculation::new(Operand::GreaterThan); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - calc.add_value(1).unwrap(); - let mut query = Query::new(doc_name); - query.add(Name::english("field0"), calc); - queue.send(Message::new( - test_doc.get_docdef().get_document_names()[0].clone(), - query, - )); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - assert_eq!( - data.len(), - 1, - "should return one entry containing 2 got:\n{:?}", - action - ); - for doc in data.iter() { - assert_eq!(doc.get(&Name::english("field0")).unwrap(), 2.into()); - } - } - _ => unreachable!("got {:?}: should have been a reply", action), - } - } - - #[test] - fn gets_all_documents_in_query() { - let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); - let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let data = 1; - let count = 5; - for i in 0..count { - let holder: i128 = (i + count).try_into().unwrap(); - test_doc.populate([holder.into()].to_vec()); - test_doc.populate([data.into()].to_vec()); - } - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(data.clone()).unwrap(); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - let mut query = Query::new(doc_name); - query.add(Name::english("field0"), calc); - queue.send(Message::new( - test_doc.get_docdef().get_document_names()[0].clone(), - query, - )); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!( - docs.len(), - count, - "should return one entry containing {:?} got:\n{:?}", - data, - action - ); - for doc in docs.iter() { - assert_eq!(doc.get(&Name::english("field0")).unwrap(), data.into()); - } - } - _ => unreachable!("got {:?}: should have been a reply", action), - } - } - - #[test] - fn query_should_work_with_multiple_fields() { - let mut doc = - TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); - doc.start(standard_paths()); - let values = [ - ["a".into(), "a".into()].to_vec(), - ["a".into(), "b".into()].to_vec(), - ["b".into(), "a".into()].to_vec(), - ["b".into(), "b".into()].to_vec(), - ]; - for value in values.iter() { - doc.populate(value.clone()); - } - let mut query = Query::new(doc.doc_name()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("a").unwrap(); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - query.add(Name::english("field0"), calc); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("b").unwrap(); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - query.add(Name::english("field1"), calc); - doc.send(query); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - let afield: Field = "a".into(); - let bfield: Field = "b".into(); - assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); - for doc in data.iter() { - assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); - assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); - } - } - _ => unreachable!("got {:?}: should have been a reply", action), - } - } - - #[test] - fn query_should_work_with_multiple_inexed_fields() { - let mut doc = - TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); - let docdef = doc.get_docdef_mut(); - docdef - .add_index(&Name::english("field0"), IndexType::Index) - .unwrap(); - docdef - .add_index(&Name::english("field1"), IndexType::Index) - .unwrap(); - doc.start(standard_paths()); - let values = [ - ["a".into(), "a".into()].to_vec(), - ["a".into(), "b".into()].to_vec(), - ["b".into(), "a".into()].to_vec(), - ["b".into(), "b".into()].to_vec(), - ]; - for value in values.iter() { - doc.populate(value.clone()); - } - let mut query = Query::new(doc.doc_name()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("a").unwrap(); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - query.add(Name::english("field0"), calc); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("b").unwrap(); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - query.add(Name::english("field1"), calc); - doc.send(query); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - let afield: Field = "a".into(); - let bfield: Field = "b".into(); - assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); - for doc in data.iter() { - assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); - assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); - } - } - _ => unreachable!("got {:?}: should have been a reply", action), - } - } - - #[test] - fn query_should_work_with_mixed_inexed_fields() { - let mut doc = - TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); - let docdef = doc.get_docdef_mut(); - docdef - .add_index(&Name::english("field0"), IndexType::Index) - .unwrap(); - doc.start(standard_paths()); - let values = [ - ["a".into(), "a".into()].to_vec(), - ["a".into(), "b".into()].to_vec(), - ["b".into(), "a".into()].to_vec(), - ["b".into(), "b".into()].to_vec(), - ]; - for value in values.iter() { - doc.populate(value.clone()); - } - let mut query = Query::new(doc.doc_name()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("a").unwrap(); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - query.add(Name::english("field0"), calc); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("b").unwrap(); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - query.add(Name::english("field1"), calc); - doc.send(query); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - let afield: Field = "a".into(); - let bfield: Field = "b".into(); - assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); - for doc in data.iter() { - assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); - assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); - } - } - _ => unreachable!("got {:?}: should have been a reply", action), - } - } - - #[test] - fn errors_on_bad_field_name() { - let mut doc = TestDocument::new(Vec::new()); - doc.start(standard_paths()); - let doc_name = doc.get_docdef().get_document_names()[0].clone(); - let queue = doc.get_queue(); - let rx = doc.get_receiver(); - let field_name = Name::english("wrong"); - let mut query = Query::new(doc.doc_name()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("something").unwrap(); - query.add(field_name.clone(), calc); - let msg = Message::new(doc_name, query); - queue.send(msg); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(data) => match data { - MTTError::NameNotFound(output) => assert_eq!(output, &field_name), - _ => unreachable!("got {:?}: should been field not found", data), - }, - _ => unreachable!("got {:?}: should have been a error", action), - } - } - - #[test] - fn errors_on_bad_field_type_with_index() { - let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); - doc.get_docdef_mut() - .add_index(&Name::english("field0"), IndexType::Index) - .unwrap(); - doc.start(standard_paths()); - doc.populate([Uuid::nil().into()].to_vec()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value("notUUID").unwrap(); - let mut query = Query::new(doc.doc_name()); - query.add(Name::english("field0"), calc); - doc.send(query); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(data) => match data { - MTTError::FieldInvalidType => {} - _ => unreachable!("got {:?}: should been invalid field type", data), - }, - _ => unreachable!("got {:?}: should have been a error", action), - } - } - - #[test] - fn can_use_default_values() { - let doc_name = Name::english("default"); - let mut docdef = DocDef::new(doc_name.clone()); - let field_name = Name::english("holder"); - docdef.add_field(field_name.clone(), FieldType::StaticString); - docdef - .set_default(&field_name, FieldType::StaticString) - .unwrap(); - let mut test_doc: TestDocument = docdef.into(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let rx = test_doc.get_receiver(); - let new_doc = Addition::new(); - let msg = Message::new(doc_name, new_doc); - queue.send(msg); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), 1); - for doc in docs.iter() { - let expected: Field = "".into(); - assert_eq!(doc.get(&field_name).unwrap(), expected); - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - } - - #[test] - fn can_a_default_value_be_set() { - let doc_name = Name::english("assigned"); - let mut docdef = DocDef::new(doc_name.clone()); - let field_name = Name::english("id"); - docdef.add_field(field_name.clone(), FieldType::Uuid); - docdef.set_default(&field_name, Uuid::nil()).unwrap(); - let mut test_doc: TestDocument = docdef.into(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let rx = test_doc.get_receiver(); - let new_doc = Addition::new(); - let msg = Message::new(doc_name, new_doc); - queue.send(msg); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), 1); - for doc in docs.iter() { - let expected: Field = Uuid::nil().into(); - assert_eq!(doc.get(&field_name).unwrap(), expected); - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - } - - #[test] - fn can_default_values_be_overridden() { - let doc_name = Name::english("assigned"); - let mut docdef = DocDef::new(doc_name.clone()); - let field_name = Name::english("id"); - docdef.add_field(field_name.clone(), FieldType::Uuid); - docdef.set_default(&field_name, FieldType::Uuid).unwrap(); - let mut test_doc: TestDocument = docdef.into(); - test_doc.start(standard_paths()); - let queue = test_doc.get_queue(); - let rx = test_doc.get_receiver(); - let mut new_doc = Addition::new(); - new_doc.add_field(&field_name, Uuid::nil()); - let msg = Message::new(doc_name, new_doc); - queue.send(msg); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), 1); - for doc in docs.iter() { - let expected: Field = Uuid::nil().into(); - assert_eq!(doc.get(&field_name).unwrap(), expected); - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - } - - #[test] - fn empty_update_query_results_in_zero_changes() { - let count = 5; - let mut ids: HashSet = HashSet::new(); - while ids.len() < count { - ids.insert(Uuid::new_v4()); - } - let id = ids.iter().last().unwrap().clone(); - ids.remove(&id); - let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); - doc.start(standard_paths()); - for id in ids.iter() { - doc.populate([id.clone().into()].to_vec()); - } - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::Uuid)) - .unwrap(); - calc.add_value(Uuid::nil()).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let mut update = Update::new(query); - update - .get_values_mut() - .add_field(Name::english("field0"), Uuid::nil()); - doc.send(update); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => assert_eq!(docs.len(), 0), - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - } - - #[test] - fn changes_information_requested() { - let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); - doc.start(standard_paths()); - let doc_name = doc.get_docdef().get_document_names()[0].clone(); - let old = "old"; - let new = "new"; - let id = Uuid::new_v4(); - doc.populate([id.into(), old.into()].to_vec()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::Uuid)) - .unwrap(); - calc.add_value(id.clone()).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let mut update = Update::new(query); - update - .get_values_mut() - .add_field(Name::english("field1"), new); - let mut testing = |msg: Message| { - doc.get_queue().send(msg.clone()); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), 1, "for {:?}, should have one entry", msg); - for doc in docs.iter() { - assert_eq!(doc.get(Name::english("field0")).unwrap(), id.into()); - assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - }; - testing(Message::new(doc_name.clone(), update)); - testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); - } - - #[test] - fn changes_only_the_queried() { - let mut doc = TestDocument::new([FieldType::Integer, FieldType::StaticString].to_vec()); - doc.start(standard_paths()); - let doc_name = doc.get_docdef().get_document_names()[0].clone(); - let old = "old"; - let new = "new"; - let count = 5; - let field_count = count.clone().try_into().unwrap(); - let picked = 3; - for i in 0..field_count { - doc.populate([i.into(), old.into()].to_vec()); - } - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - calc.add_value(picked.clone()).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let mut update = Update::new(query); - update - .get_values_mut() - .add_field(Name::english("field1"), new); - doc.get_queue().send(Message::new(doc_name.clone(), update)); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), 1, "should have one entry"); - for doc in docs.iter() { - assert_eq!(doc.get(Name::english("field0")).unwrap(), picked.into()); - assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - doc.get_queue() - .send(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), count, "should have one entry"); - for doc in docs.iter() { - if doc.get(Name::english("field0")).unwrap() == picked.into() { - assert_eq!( - doc.get(Name::english("field1")).unwrap(), - new.into(), - "{:?}", - docs - ); - } else { - assert_eq!( - doc.get(Name::english("field1")).unwrap(), - old.into(), - "{:?}", - docs - ); - } - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - } - - #[test] - fn can_handle_multiple_updates() { - let mut doc = TestDocument::new([FieldType::Integer, FieldType::StaticString].to_vec()); - doc.start(standard_paths()); - let doc_name = doc.get_docdef().get_document_names()[0].clone(); - let old = "old"; - let new = "new"; - let count = 5; - let picked = 3; - for _ in 0..count { - doc.populate([picked.into(), old.into()].to_vec()); - } - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - calc.add_value(picked.clone()).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let mut update = Update::new(query); - update - .get_values_mut() - .add_field(Name::english("field1"), new); - let mut testing = |msg: Message| { - doc.get_queue().send(msg); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), count, "should have one entry"); - for doc in docs.iter() { - assert_eq!(doc.get(Name::english("field0")).unwrap(), picked.into()); - assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - }; - testing(Message::new(doc_name.clone(), update)); - testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); - } - - #[test] - fn update_errors_on_bad_field_name() { - let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); - doc.start(standard_paths()); - let id = Uuid::new_v4(); - let old = "old"; - let new = "new"; - let bad_name = Name::english("wrong"); - doc.populate([id.into(), old.into()].to_vec()); - let mut update = Update::new(Query::internal()); - update.get_values_mut().add_field(bad_name.clone(), new); - doc.send(update); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(err) => match err { - MTTError::NameNotFound(data) => assert_eq!(data, &bad_name), - _ => unreachable!("got {:?}: should have gotten an missing field", err), - }, - _ => unreachable!("got {:?}: should have gotten an error", action), - } - } - - #[test] - fn update_errors_on_bad_field_type() { - let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); - doc.start(standard_paths()); - let id = Uuid::new_v4(); - let old = "old"; - let new = Uuid::nil(); - doc.populate([id.into(), old.into()].to_vec()); - - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::Uuid)) - .unwrap(); - calc.add_value(id.clone()).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let mut update = Update::new(query); - update - .get_values_mut() - .add_field(Name::english("field1"), new); - doc.send(update); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(err) => match err { - MTTError::DocumentFieldWrongDataType(expected, got) => { - assert_eq!(expected, &FieldType::StaticString); - assert_eq!(got, &FieldType::Uuid); - } - _ => unreachable!("got {:?}: should have gotten incorrect file type", err), - }, - _ => unreachable!("got {:?}: should have gotten an error", action), - } - } - - #[test] - fn does_update_maintain_unique_fields() { - let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); - test_doc - .get_docdef_mut() - .add_index(&Name::english("field0"), IndexType::Unique) - .unwrap(); - test_doc.start(standard_paths()); - let fname = Name::english("field0"); - let old = 3; - let new = 5; - test_doc.populate([old.into()].to_vec()); - - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - calc.add_value(old.clone()).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let mut update = Update::new(query); - update.get_values_mut().add_field(&fname, new); - test_doc.send(update); - test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let mut should_clear = Addition::new(); - should_clear.add_field(fname.clone(), old); - let mut should_error = Addition::new(); - should_error.add_field(fname.clone(), new); - test_doc.send(should_clear); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), 1, "should have one entry"); - for doc in docs.iter() { - assert_eq!(doc.get(&fname).unwrap(), old.into()); - } - } - _ => unreachable!("got {:?}: should have gotten records", action), - } - test_doc.send(should_error); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(err) => match err { - MTTError::FieldDuplicate => {} - _ => unreachable!("got {:?}: should have gotten incorrect file type", err), - }, - _ => unreachable!("got {:?}: should have gotten an error", action), - } - } - - #[test] - fn unique_value_remains_available_if_failure_occurs() { - let f0name = Name::english("field0"); - let f1name = Name::english("field1"); - let mut test_doc = TestDocument::new([FieldType::Uuid, FieldType::Uuid].to_vec()); - test_doc - .get_docdef_mut() - .add_index(&f0name, IndexType::Unique) - .unwrap(); - test_doc.start(standard_paths()); - let f0data = Uuid::new_v4(); - let f1bad_data = "NotUuid"; - let f1good_data = Uuid::nil(); - let mut bad_addition = Addition::new(); - bad_addition.add_field(&f0name, f0data.clone()); - bad_addition.add_field(&f1name, f1bad_data); - let mut good_addition = Addition::new(); - good_addition.add_field(&f0name, f0data.clone()); - good_addition.add_field(&f1name, f1good_data.clone()); - test_doc.send(bad_addition); - test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - test_doc.send(good_addition); - let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(docs) => { - assert_eq!(docs.len(), 1, "should have one entry"); - for doc in docs.iter() { - assert_eq!(doc.get(&f0name).unwrap(), f0data.into()); - assert_eq!(doc.get(&f1name).unwrap(), f1good_data.into()); - } - } - _ => unreachable!("got {:?}: should have gotten records", action), - } - } - - #[test] - fn updating_unique_updates_index_entries() { - let fname = Name::english("field0"); - let mut doc = TestDocument::new([FieldType::StaticString].to_vec()); - doc.get_docdef_mut() - .add_index(&fname, IndexType::Unique) - .unwrap(); - doc.start(standard_paths()); - let old = "old"; - let new = "new"; - let fold: Field = old.into(); - doc.populate([old.into()].to_vec()); - - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - calc.add_value(old).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let mut update = Update::new(query); - update.get_values_mut().add_field(fname.clone(), new); - doc.send(update); - doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let mut old_addition = Addition::new(); - old_addition.add_field(&fname, old); - doc.send(old_addition); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - assert_eq!(data.len(), 1); - for doc in data.iter() { - assert_eq!(doc.get(&fname).unwrap(), fold); - } - } - _ => unreachable!("got {:?}: should have gotten a reply", action), - } - let mut new_addition = Addition::new(); - new_addition.add_field(fname.clone(), new); - doc.send(new_addition); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(err) => match err { - MTTError::FieldDuplicate => {} - _ => unreachable!("got {:?}: should have gotten an missing field", err), - }, - _ => unreachable!("got {:?}: should have gotten an error", action), - } - } - - #[test] - fn update_does_not_override_unique_index() { - let f0name = Name::english("field0"); - let f1name = Name::english("field1"); - let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); - doc.get_docdef_mut() - .add_index(&f0name, IndexType::Unique) - .unwrap(); - doc.start(standard_paths()); - let count = 5; - let data = "data"; - let mut ids: HashSet = HashSet::new(); - while ids.len() < count { - ids.insert(Uuid::new_v4()); - } - let holder = ids.iter().last().unwrap().clone(); - ids.remove(&holder); - for id in ids.iter() { - doc.populate([id.clone().into(), data.into()].to_vec()); - } - - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::StaticString)) - .unwrap(); - calc.add_value(data).unwrap(); - let mut query = Query::internal(); - query.add(&f1name, calc); - let mut update = Update::new(query); - update.get_values_mut().add_field(&f0name, holder.clone()); - doc.send(update); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(err) => match err { - MTTError::FieldDuplicate => {} - _ => unreachable!("got {:?}: should have gotten field duplicate", err), - }, - _ => unreachable!("got {:?}: should have gotten an error", action), - } - let query = Query::new(doc.doc_name()); - doc.send(query); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - assert_eq!(data.len(), ids.len()); - for doc in data.iter() { - match doc.get(&f0name).unwrap() { - Field::Uuid(id) => { - assert!(ids.contains(&id)); - ids.remove(&id); - } - _ => unreachable!("did not get uuid"), - } - } - } - _ => unreachable!("got {:?}: should have gotten reply", action), - } - assert!(ids.is_empty(), "did not find {:?}", ids); - } - - #[test] - fn can_calculate_field_values() { - let fname = Name::english("field0"); - let mut doc = TestDocument::new([FieldType::DateTime].to_vec()); - doc.start(standard_paths()); - let duration = Duration::from_secs(300); - let mut calc = Calculation::new(Operand::Add); - calc.add_value(FieldType::DateTime).unwrap(); - calc.add_value(duration.clone()).unwrap(); - let mut addition = Addition::new(); - addition.add_field(&fname, calc); - let start = Utc::now() + duration; - doc.send(addition); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let stop = Utc::now() + duration; - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - assert_eq!(data.len(), 1); - for doc in data.iter() { - match doc.get(&fname).unwrap() { - Field::DateTime(datetime) => assert!(datetime > start && datetime < stop), - _ => unreachable!("did not get uuid"), - } - } - } - _ => unreachable!("got {:?}: should have gotten reply", action), - } - } - - #[test] - fn can_delete() { - let fname = Name::english("field0"); - let mut doc = TestDocument::new([FieldType::Integer].to_vec()); - doc.start(standard_paths()); - doc.populate([1.into()].to_vec()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(1).unwrap(); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - let mut query = Query::new(doc.doc_name()); - query.add(&fname, calc); - let delete = Delete::new(query.clone()); - doc.send(delete); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - assert_eq!(data.len(), 1); - for doc in data.iter() { - match doc.get(&fname).unwrap() { - Field::Integer(num) => assert_eq!(num, 1), - _ => unreachable!("did not get uuid"), - } - } - } - _ => unreachable!("got {:?}: should have gotten reply", action), - } - doc.send(query); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => assert_eq!(data.len(), 0), - _ => unreachable!("got {:?}, should have been empty", action), - } - } - - #[test] - fn does_delete_return_query_errors() { - let field_name = Name::english("wrong"); - let mut doc = TestDocument::new([FieldType::Integer].to_vec()); - doc.start(standard_paths()); - let mut calc = Calculation::new(Operand::Equal); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - calc.add_value(1).unwrap(); - let mut query = Query::internal(); - query.add(field_name.clone(), calc); - let delete = Delete::new(query); - doc.send(delete); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Error(err) => match err { - MTTError::NameNotFound(data) => assert_eq!(data, &field_name), - _ => unreachable!("got {:?}: should have gotten an missing field", err), - }, - _ => unreachable!("got {:?}: should have gotten an error", action), - } - } - - #[test] - fn does_delete_update_indexes() { - let fname = Name::english("field0"); - let value = 1; - let mut doc = TestDocument::new([FieldType::Integer].to_vec()); - doc.get_docdef_mut() - .add_index(&fname, IndexType::Unique) - .unwrap(); - doc.start(standard_paths()); - doc.populate([value.into()].to_vec()); - doc.send(Delete::new(Query::internal())); - doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let mut addition = Addition::new(); - addition.add_field(&fname, value.clone()); - doc.send(addition); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => assert_eq!(data.len(), 1), - _ => unreachable!("got {:?}, should have added entry", action), - } - } - - #[test] - fn can_query_trigger_reaction() { - let mut doc = TestDocument::new([FieldType::Integer].to_vec()); - let doc_name = doc.get_docdef().get_document_names()[0].clone(); - let path = Path::new( - Include::All, - Include::Just(doc_name.clone().into()), - Include::Just(Action::OnQuery), - ); - let mut update = Update::new(Query::internal()); - let mut calc = Calculation::new(Operand::Add); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - calc.add_value(1).unwrap(); - update - .get_values_mut() - .add_field(Name::english("field0"), calc); - let function = DocFuncType::ExistingQuery(update.into()); - doc.get_docdef_mut().add_route(path, function); - let mut paths = standard_paths(); - paths.push(Path::new( - Include::All, - Include::Just(doc_name.clone().into()), - Include::Just(Action::OnUpdate), - )); - doc.start(paths); - doc.populate([0.into()].to_vec()); - for i in 0..5 { - let expected: Field = i.try_into().unwrap(); - doc.send(Query::new(doc_name.clone())); - let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Records(data) => { - assert_eq!(data.len(), 1); - for rec in data.iter() { - assert_eq!(rec.get(&Name::english("field0")).unwrap(), expected); - } - } - _ => unreachable!("got {:?}, should have added entry", action), - } - let on_update = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - match on_update.get_action() { - MsgAction::OnUpdate(recs) => { - let expected: Field = (i + 1).into(); - assert_eq!(recs.len(), 1); - for rec in recs.iter() { - assert_eq!(rec.get(Name::english("field0")).unwrap(), expected); - } - } - _ => unreachable!("should only be on update"), - } - } - } - - #[test] - fn can_an_action_trigger_an_action() { - let mut doc = TestDocument::new([FieldType::Integer].to_vec()); - let queue = doc.get_queue(); - let doc_name = doc.get_docdef().get_document_names()[0].clone(); - crate::document::clock::Clock::start(queue.clone()); - let mut calc = Calculation::new(Operand::GreaterThan); - calc.add_value(CalcValue::Existing(FieldType::Integer)) - .unwrap(); - calc.add_value(1).unwrap(); - let mut query = Query::internal(); - query.add(Name::english("field0"), calc); - let delete = Delete::new(query.clone()); - let path = Path::new( - Include::All, - Include::Just(Name::english("clock").into()), - Include::Just(Action::OnUpdate), - ); - let function = DocFuncType::Trigger(delete.into()); - doc.get_docdef_mut().add_route(path, function); - let mut paths = standard_paths(); - paths.push(Path::new( - Include::All, - Include::Just(doc_name.clone().into()), - Include::Just(Action::Delete), - )); - doc.start(paths); - let queue = doc.get_queue(); - for item in 1..3 { - doc.populate([item.into()].to_vec()); - } - let trigger = Message::new( - Name::english("clock"), - MsgAction::OnUpdate(Records::new(Names::new())), - ); - queue.send(trigger.clone()); - sleep(TIMEOUT); - let msg = Message::new(doc_name.clone(), Query::new(doc_name.clone())); - queue.send(msg.clone()); - let mut result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - while result.get_message_id() != msg.get_message_id() { - result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); - } - let action = result.get_action(); - let expected: Field = 1.into(); - match action { - MsgAction::Records(data) => { - assert_eq!(data.len(), 1, "should have been one record:\n{:?}", data); - for rec in data.iter() { - assert_eq!(rec.get(Name::english("field0")).unwrap(), expected); - } - } - _ => unreachable!("should return records"), - } - } -}