From 404c6e9c34dccf75c5375839888e7748273c0a9a Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Fri, 13 Feb 2026 12:00:39 -0500 Subject: [PATCH] Moved Records into the document module. --- ' | 2426 ++++++++++++++++++++++++++++++++++++ src/action.rs | 5 +- src/action/message.rs | 5 +- src/action/user.rs | 3 +- src/document.rs | 18 +- src/document/clock.rs | 4 +- src/document/create.rs | 4 +- src/document/record.rs | 67 + src/document/session.rs | 3 +- src/lib.rs | 2 +- src/message/wrapper.rs | 13 +- src/mtterror.rs | 2 +- src/queue/data_director.rs | 2 +- 13 files changed, 2526 insertions(+), 28 deletions(-) create mode 100644 ' create mode 100644 src/document/record.rs diff --git a/' b/' new file mode 100644 index 0000000..617c857 --- /dev/null +++ b/' @@ -0,0 +1,2426 @@ +use crate::{ + action::{Action, CalcValue, Calculation, MsgAction, Query}, + document::{ + definition::{DocDef, DocFuncType}, + field::Field, + }, + message::wrapper::{InternalRecord, InternalRecords, Message, Oid, Records, Reply, Update}, + mtterror::MTTError, + name::NameType, + queue::{ + data_director::{Include, Path, RegMsg, Register, RouteID}, + router::Queue, + }, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::mpsc::{channel, Receiver}, + thread::spawn, +}; +use uuid::Uuid; + +pub struct CreateDoc { + queue: Queue, + rx: Receiver, +} + +impl CreateDoc { + fn new(queue: Queue, rx: Receiver) -> Self { + Self { + queue: queue, + rx: rx, + } + } + + pub fn start(mut queue: Queue) { + let (tx, rx) = channel(); + let routes = [Path::new( + Include::All, + Include::All, + Include::Just(Action::Create), + )] + .to_vec(); + let id = queue.add_sender(tx); + for route in routes.iter() { + let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone())); + queue.send(Message::new(NameType::None, regmsg)); + rx.recv().unwrap(); + } + let doc = CreateDoc::new(queue, rx); + spawn(move || { + doc.listen(); + }); + } + + fn listen(&self) { + loop { + let msg = self.rx.recv().unwrap(); + DocumentFile::start(self.queue.clone(), msg); + } + } +} + +#[cfg(test)] +mod createdocs { + use super::*; + use crate::{name::Name, support_tests::TIMEOUT}; + + struct TestCreateDoc { + queue: Queue, + rx: Receiver, + rx_id: Uuid, + } + + impl TestCreateDoc { + fn new() -> Self { + let mut queue = Queue::new(); + let (tx, rx) = channel(); + let id = queue.add_sender(tx); + CreateDoc::start(queue.clone()); + Self { + queue: queue, + rx: rx, + rx_id: id, + } + } + + fn get_queue(&self) -> Queue { + self.queue.clone() + } + + fn get_receiver(&self) -> &Receiver { + &self.rx + } + + fn register_paths(&self, paths: Vec) { + for path in paths.iter() { + let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone())); + self.queue.send(Message::new(NameType::None, regmsg)); + self.rx.recv_timeout(TIMEOUT).unwrap(); + } + } + } + + #[test] + fn create_document_creation() { + let doc_creator = TestCreateDoc::new(); + let paths = [ + Path::new(Include::All, Include::All, Include::Just(Action::Reply)), + Path::new(Include::All, Include::All, Include::Just(Action::Records)), + ] + .to_vec(); + doc_creator.register_paths(paths); + let queue = doc_creator.get_queue(); + let rx = doc_creator.get_receiver(); + let name = Name::english("project"); + let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); + queue.send(msg1.clone()); + let result1 = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!( + result1.get_message_id(), + msg1.get_message_id(), + "received {:?} from create message.", + rx + ); + match result1.get_action() { + MsgAction::Reply(_) => {} + _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), + } + let msg2 = Message::new(name.clone(), Query::new(name.clone())); + queue.send(msg2.clone()); + let result2 = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result2.get_message_id(), msg2.get_message_id()); + match result2.get_action() { + MsgAction::Records(data) => assert_eq!(data.len(), 0), + _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), + } + } + + #[test] + fn does_duplicates_generate_error() { + let doc_creator = TestCreateDoc::new(); + let paths = [Path::new( + Include::All, + Include::All, + Include::Just(Action::Error), + )] + .to_vec(); + doc_creator.register_paths(paths); + let queue = doc_creator.get_queue(); + let rx = doc_creator.get_receiver(); + let name = Name::english("duplicate"); + let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); + let msg2 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); + queue.send(msg1.clone()); + queue.send(msg2.clone()); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg2.get_message_id()); + match result.get_action() { + MsgAction::Error(err) => match err { + MTTError::NameDuplicate(data) => assert_eq!(data, &name), + _ => unreachable!("got {:?}: should have been a duplicate name", err), + }, + _ => unreachable!("got {:?}: should have been a reply.", result.get_action()), + } + } +} + +#[derive(Clone, Debug)] +pub enum IndexType { + Index, + Unique, +} + +impl IndexType { + fn create_index(&self) -> Index { + match self { + Self::Index => Index::new(), + Self::Unique => Index::new_unique(), + } + } +} + +#[derive(Debug)] +struct Index { + data: HashMap>, + unique: bool, +} + +impl Index { + fn new() -> Self { + Self { + data: HashMap::new(), + unique: false, + } + } + + fn new_unique() -> Self { + Self { + data: HashMap::new(), + unique: true, + } + } + + fn internal_add(&mut self, field: &Field, oid: Oid) { + let storage = match self.data.get_mut(field) { + Some(data) => data, + None => { + let data = HashSet::new(); + self.data.insert(field.clone(), data); + self.data.get_mut(field).unwrap() + } + }; + storage.insert(oid); + } + + fn add(&mut self, field: Field, oid: Oid) -> Result<(), MTTError> { + let oids = match self.data.get_mut(&field) { + Some(data) => data, + None => { + self.data.insert(field.clone(), HashSet::new()); + self.data.get_mut(&field).unwrap() + } + }; + if self.unique && oids.len() > 0 { + return Err(MTTError::FieldDuplicate); + } else { + oids.insert(oid); + } + Ok(()) + } + + fn pull(&self, calc: &Calculation) -> Result, MTTError> { + let mut output = HashSet::new(); + for (key, value) in self.data.iter() { + match calc.calculate(key) { + Field::Boolean(data) => { + if data { + output = output.union(&value).cloned().collect(); + } + } + _ => return Err(MTTError::FieldInvalidType), + } + } + Ok(output) + } + + fn remove(&mut self, field: &Field, oid: &Oid) { + match self.data.get_mut(field) { + Some(oids) => { + oids.remove(oid); + if oids.len() == 0 { + self.data.remove(field); + } + } + None => {} + }; + } + + fn validate(&self, field: &Field) -> Result<(), MTTError> { + if self.unique { + match self.data.get(field) { + Some(_) => return Err(MTTError::FieldDuplicate), + None => {} + } + } + Ok(()) + } +} + +struct Indexes { + data: HashMap, +} + +impl Indexes { + fn new(settings: &HashMap) -> Self { + let mut output = HashMap::new(); + for (key, value) in settings.iter() { + output.insert(key.clone(), value.create_index()); + } + Self { data: output } + } + + fn index_ids(&self) -> HashSet<&Uuid> { + self.data.keys().collect::>() + } + + fn pull(&self, field_id: &Uuid, calc: &Calculation) -> Result, MTTError> { + self.data.get(field_id).unwrap().pull(calc) + } + + fn add_to_index(&mut self, field_name: &Uuid, field: Field, oid: Oid) -> Result<(), MTTError> { + let index = match self.data.get_mut(field_name) { + Some(data) => data, + None => return Ok(()), + }; + index.add(field, oid) + } + + fn validate(&self, field_name: &Uuid, value: &Field) -> Result<(), MTTError> { + match self.data.get(field_name) { + Some(index) => match index.validate(value) { + Ok(_) => {} + Err(err) => return Err(err), + }, + None => {} + } + Ok(()) + } + + fn iter_mut(&mut self) -> impl Iterator { + self.data.iter_mut() + } +} + +#[cfg(test)] +mod indexes { + use super::*; + use crate::action::{FieldType, Operand}; + + fn get_fields(count: usize) -> Vec { + let mut output = Vec::new(); + while output.len() < count { + let field: Field = Uuid::new_v4().into(); + if !output.contains(&field) { + output.push(field); + } + } + output + } + + fn get_oids(count: usize) -> Vec { + let mut output = Vec::new(); + while output.len() < count { + let oid = Oid::new(); + if !output.contains(&oid) { + output.push(oid); + } + } + output + } + + #[test] + fn add_to_index() { + let mut index = Index::new(); + let count = 3; + let fields = get_fields(count); + let oids = get_oids(count); + for i in 0..count { + index.add(fields[i].clone(), oids[i].clone()).unwrap(); + } + for i in 0..count { + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(fields[i].clone()).unwrap(); + calc.add_value(CalcValue::Existing(FieldType::Uuid)) + .unwrap(); + let result = index.pull(&calc).unwrap(); + assert_eq!(result.len(), 1); + assert_eq!(result.iter().last().unwrap(), &oids[i]); + } + } + + #[test] + fn index_can_handle_multiple_entries() { + let mut index = Index::new(); + let count = 3; + let fields = get_fields(1); + let oids = get_oids(count); + for i in 0..count { + index.add(fields[0].clone(), oids[i].clone()).unwrap(); + } + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(fields[0].clone()).unwrap(); + calc.add_value(CalcValue::Existing(FieldType::Uuid)) + .unwrap(); + let result = index.pull(&calc).unwrap(); + assert_eq!(result.len(), 3); + for oid in oids { + assert!(result.contains(&oid)); + } + } + + #[test] + fn can_remove_oid() { + let mut index = Index::new(); + let count = 3; + let pos = 1; + let fields = get_fields(1); + let oids = get_oids(count); + for i in 0..count { + index.add(fields[0].clone(), oids[i].clone()).unwrap(); + } + index.remove(&fields[0], &oids[pos]); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(fields[0].clone()).unwrap(); + calc.add_value(CalcValue::Existing(FieldType::Uuid)) + .unwrap(); + let result = index.pull(&calc).unwrap(); + assert!(!result.contains(&oids[pos]), "should have removed oid"); + } + + #[test] + fn are_empty_indexes_removed() { + let mut index = Index::new(); + let field: Field = Uuid::new_v4().into(); + let oid = Oid::new(); + index.add(field.clone(), oid.clone()).unwrap(); + index.remove(&field, &oid); + assert_eq!(index.data.len(), 0); + } + + #[test] + fn do_unique_indexes_error_on_duplicates() { + let mut index = Index::new_unique(); + let field: Field = "fred".into(); + let oids = get_oids(2); + index.add(field.clone(), oids[0].clone()).unwrap(); + match index.add(field.clone(), oids[0].clone()) { + Ok(_) => unreachable!("should have been an error"), + Err(err) => match err { + MTTError::FieldDuplicate => {} + _ => unreachable!("got {:?}: should have been duplicate field", err), + }, + } + } + + #[test] + fn index_returns_validate() { + let mut index = Index::new(); + let field: Field = "stuff".into(); + let oid = Oid::new(); + index.add(field.clone(), oid).unwrap(); + match index.validate(&field) { + Ok(_) => {} + Err(err) => unreachable!("got {:?}: should have returned without issue", err), + } + } + + #[test] + fn unique_return_duplicate_error() { + let mut index = Index::new_unique(); + let field: Field = "fred".into(); + let oid = Oid::new(); + index.add(field.clone(), oid).unwrap(); + match index.validate(&field) { + Ok(_) => unreachable!("should have gotten a duplication error"), + Err(err) => match err { + MTTError::FieldDuplicate => {} + _ => unreachable!("got {:?}: should have been duplicate field", err), + }, + } + } +} + +struct DocumentFile { + docdef: DocDef, + docs: InternalRecords, + indexes: Indexes, + name_id: Uuid, + queue: Queue, + routes: HashMap, + rx: Receiver, +} + +impl DocumentFile { + fn new( + queue: Queue, + rx: Receiver, + docdef: DocDef, + routes: HashMap, + name_id: Uuid, + ) -> Self { + let indexes = Indexes::new(docdef.get_indexes()); + Self { + docdef: docdef.clone(), + docs: InternalRecords::new(), + indexes: indexes, + name_id: name_id, + queue: queue, + routes: routes, + rx: rx, + } + } + + fn start(mut queue: Queue, msg: Message) { + let (tx, rx) = channel(); + let action = msg.get_action(); + let docdef = match action { + MsgAction::Create(data) => data.clone(), + _ => unreachable!("got {:?}: should have been a create message", action), + }; + let names = docdef.get_document_names(); + let id = queue.add_sender(tx); + let reg_msg = Register::new(id, RegMsg::AddDocName(names.clone())); + let rmsg = msg.response(reg_msg.clone()); + queue.send(rmsg.clone()); + let name_result = rx.recv().unwrap(); + let name_id = match name_result.get_action() { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::DocumentNameID(data) => data, + RegMsg::Error(err) => { + queue.remove_sender(&id); + queue.send(msg.response(err.clone())); + return; + } + _ => unreachable!("should only return a name id or an error"), + }, + _ => unreachable!("should only return a name id or an error"), + }; + let mut route_action: HashMap = HashMap::new(); + for path_action in docdef.iter_routes() { + let request = reg_msg.response(RegMsg::AddRoute(path_action.path())); + let add_route = rmsg.response(request); + queue.send(add_route); + let result = rx.recv().unwrap(); + let route_id = match result.get_action() { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::RouteID(data) => data, + RegMsg::Error(err) => { + queue.remove_sender(&id); + queue.send(msg.response(err.clone())); + return; + } + _ => unreachable!("should only return a route id or an error"), + }, + _ => unreachable!("should only return a route id or an error"), + }; + route_action.insert(route_id.clone(), path_action.doc_function()); + } + let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action, name_id.clone()); + spawn(move || { + doc.listen(); + }); + let reply = msg.response(Reply::new()); + queue.send(reply.clone()); + } + + fn listen(&mut self) { + loop { + let msg = self.rx.recv().unwrap(); + let route = msg.get_route(); + for (route_id, doc_func) in self.routes.clone().iter() { + if route == route_id.into() { + match doc_func { + DocFuncType::Add => self.add_document(&msg), + DocFuncType::Delete => self.delete(&msg), + DocFuncType::Query => self.query(&msg), + DocFuncType::Show => self.queue.send(msg.response(Reply::new())), + DocFuncType::Update => self.update(&msg), + DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), + DocFuncType::Trigger(action) => self.trigger(&msg, action), + } + } + } + } + } + + fn validate(&self, field_name: NT, value: &Field) -> Result + where + NT: Into, + { + let field_id = match self.docdef.get_field_id(field_name) { + Ok(data) => data, + Err(err) => return Err(err), + }; + let output = match self.docdef.validate(field_id.clone(), value) { + Ok(data) => data, + Err(err) => return Err(err), + }; + match self.indexes.validate(&field_id, &output) { + Ok(_) => {} + Err(err) => return Err(err), + } + Ok(output) + } + + fn add_document(&mut self, msg: &Message) { + let addition = match msg.get_action() { + MsgAction::Addition(data) => data, + _ => return, + }; + let mut holder = InternalRecord::new(); + for (name_id, value) in addition.iter() { + let field_id = match self.docdef.get_field_id(name_id) { + Ok(id) => id, + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply); + return; + } + }; + holder.insert(field_id.clone(), value.get(&Field::None)); + } + for field_id in self.docdef.field_ids().iter() { + let value = match holder.get(field_id) { + Some(data) => data, + None => &Field::None, + }; + let corrected = match self.validate(field_id, value) { + Ok(data) => data, + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply); + return; + } + }; + holder.insert(field_id.clone(), corrected.clone()); + } + let mut records = Records::new(self.docdef.get_field_names().clone()); + if !holder.is_empty() { + let mut oid = Oid::new(); + while self.docs.contains_key(&oid) { + oid = Oid::new(); + } + for (field_id, oids) in self.indexes.iter_mut() { + let value = holder.get(field_id).unwrap(); + oids.internal_add(value, oid.clone()); + } + self.docs.insert(oid.clone(), holder.clone()); + records.insert(oid, holder); + } + self.queue.send(msg.response(records.clone())); + self.queue + .send(msg.response(MsgAction::OnAddition(records))); + } + + fn delete(&mut self, msg: &Message) { + let delete = match msg.get_action() { + MsgAction::Delete(data) => data, + _ => unreachable!("should always be delete action"), + }; + let records = match self.run_query(delete.get_query()) { + Ok(data) => data, + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply); + return; + } + }; + for (oid, record) in records.iter() { + for (field_id, index) in self.indexes.iter_mut() { + index.remove(record.get(field_id).unwrap(), oid); + } + self.docs.remove(oid); + } + let rec = Records::with_data(self.docdef.get_field_names().clone(), records); + self.queue.send(msg.response(rec.clone())); + self.queue.send(msg.response(MsgAction::OnDelete(rec))); + } + + fn run_query(&self, query: &Query) -> Result { + let indexed_ids = self.indexes.index_ids(); + let mut indexed: HashMap = HashMap::new(); + let mut unindexed: HashMap = HashMap::new(); + for (field, data) in query.iter() { + let id = match self.docdef.get_field_id(field) { + Ok(fid) => fid, + Err(err) => return Err(err), + }; + if indexed_ids.contains(&id) { + indexed.insert(id, data.clone()); + } else { + unindexed.insert(id, data.clone()); + } + } + let mut oids: HashSet = self.docs.keys().cloned().collect(); + for (field_id, calculation) in indexed.iter() { + let holder = match self.indexes.pull(field_id, calculation) { + Ok(data) => data, + Err(err) => return Err(err), + }; + oids = oids.intersection(&holder).cloned().collect(); + } + let mut records = InternalRecords::new(); + for oid in oids.iter() { + records.insert(oid.clone(), self.docs.get(oid).unwrap().clone()); + } + let holder = oids.clone(); + for (oid, record) in records.iter() { + for (field_id, calc) in unindexed.iter() { + match calc.calculate(record.get(field_id).unwrap()) { + Field::Boolean(data) => { + if !data { + oids.remove(oid); + break; + } + } + _ => { + oids.remove(oid); + break; + } + } + } + } + let removals = holder.difference(&oids); + for oid in removals { + records.remove(oid); + } + Ok(records) + } + + fn query(&self, msg: &Message) { + let query = match msg.get_action() { + MsgAction::Query(data) => data, + _ => unreachable!("should receive a query"), + }; + let records = match self.run_query(query) { + Ok(data) => data, + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply); + return; + } + }; + let recs = Records::with_data(self.docdef.get_field_names().clone(), records); + self.queue.send(msg.response(recs.clone())); + self.queue.send(msg.response(MsgAction::OnQuery(recs))); + } + + fn run_update( + &mut self, + original: &InternalRecords, + update: &Update, + msg: &Message, + ) -> Result { + let mut changes: HashMap = HashMap::new(); + for (key, value) in update.get_values().iter() { + let field_id = match self.docdef.get_field_id(key) { + Ok(data) => data, + Err(err) => return Err(err), + }; + changes.insert(field_id, value); + } + let mut indexes = Indexes::new(self.docdef.get_indexes()); + let mut updates = InternalRecords::new(); + for (oid, record) in original.iter() { + let mut holder = record.clone(); + for (field_id, value) in changes.iter() { + let field = value.get(holder.get(field_id).unwrap()); + let correction = match self.validate(field_id, &field) { + Ok(data) => data, + Err(err) => return Err(err), + }; + holder.insert(field_id.clone(), correction.clone()); + match indexes.add_to_index(&field_id, correction, oid.clone()) { + Ok(_) => {} + Err(err) => return Err(err), + } + } + updates.insert(oid.clone(), holder); + } + for (oid, new_rec) in updates.iter() { + let old_rec = original.get(oid).unwrap(); + for (field_id, index) in self.indexes.iter_mut() { + index.remove(old_rec.get(field_id).unwrap(), oid); + index + .add(new_rec.get(field_id).unwrap().clone(), oid.clone()) + .unwrap(); + } + self.docs.insert(oid.clone(), new_rec.clone()); + } + let recs = Records::with_data(self.docdef.get_field_names().clone(), updates); + self.queue + .send(msg.response(MsgAction::OnUpdate(recs.clone()))); + Ok(recs) + } + + fn update(&mut self, msg: &Message) { + let update = match msg.get_action() { + MsgAction::Update(data) => data, + _ => unreachable!("should receive a update"), + }; + let original = match self.run_query(update.get_query()) { + Ok(result) => result, + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply); + return; + } + }; + let data = match self.run_update(&original, update, msg) { + Ok(output) => output, + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply); + return; + } + }; + self.queue.send(msg.response(data)); + } + + fn existing_query(&mut self, msg: &Message, action: &MsgAction) { + let recs = match msg.get_action() { + MsgAction::OnQuery(data) => data, + _ => unreachable!("should only receive on messages"), + }; + match action { + MsgAction::Update(change) => self + .run_update(recs.get_internal_records(), change, msg) + .unwrap(), + _ => panic!("should not get here"), + }; + } + + fn trigger(&self, msg: &Message, action: &MsgAction) { + self.queue + .send(msg.forward(self.name_id.clone(), action.clone())); + } +} + +#[cfg(test)] +mod document_files { + use super::*; + use crate::{ + action::{Addition, Operand}, + document::field::FieldType, + message::wrapper::Delete, + name::{Name, Names}, + support_tests::TIMEOUT, + }; + use chrono::Utc; + use std::{sync::mpsc::RecvTimeoutError, thread::sleep, time::Duration}; + + fn standard_paths() -> Vec { + [ + Path::new(Include::All, Include::All, Include::Just(Action::Records)), + Path::new(Include::All, Include::All, Include::Just(Action::Reply)), + Path::new(Include::All, Include::All, Include::Just(Action::Error)), + ] + .to_vec() + } + + struct TestDocument { + docdef: DocDef, + queue: Queue, + sender_id: Uuid, + rx: Receiver, + } + + impl TestDocument { + fn new(field_types: Vec) -> Self { + let doc_name = Name::english(Uuid::new_v4().to_string().as_str()); + let mut docdef = DocDef::new(doc_name.clone()); + let mut count = 0; + for field_type in field_types.iter() { + docdef.add_field( + Name::english(format!("field{}", count).as_str()), + field_type.clone(), + ); + count += 1; + } + let (tx, rx) = channel(); + let mut queue = Queue::new(); + let id = queue.add_sender(tx); + Self { + docdef: docdef, + queue: queue, + sender_id: id, + rx: rx, + } + } + + fn doc_name(&self) -> Name { + self.docdef.get_document_names()[0].clone() + } + + fn get_docdef(&self) -> &DocDef { + &self.docdef + } + + fn get_docdef_mut(&mut self) -> &mut DocDef { + &mut self.docdef + } + + fn get_queue(&mut self) -> Queue { + self.queue.clone() + } + + fn get_receiver(&self) -> &Receiver { + &self.rx + } + + fn get_sender_id(&self) -> Uuid { + self.sender_id.clone() + } + + fn send(&self, action: A) + where + A: Into, + { + let msg = Message::new(self.docdef.get_document_names()[0].clone(), action); + self.queue.send(msg); + } + + fn start(&mut self, routes: Vec) { + let msg = Message::new( + self.docdef.get_document_names()[0].clone(), + self.docdef.clone(), + ); + DocumentFile::start(self.queue.clone(), msg); + for route in routes.iter() { + let request = + Register::new(self.sender_id.clone(), RegMsg::AddRoute(route.clone())); + let add_route = Message::new(NameType::None, request); + self.queue.send(add_route); + self.rx.recv().unwrap(); + } + } + + fn populate(&self, data: Vec) { + let mut add = Addition::new(self.doc_name()); + let mut count = 0; + for item in data.iter() { + add.add_field( + Name::english(format!("field{}", count).as_str()), + item.clone(), + ); + count += 1; + } + self.send(add); + match self.rx.recv_timeout(TIMEOUT) { + Ok(_) => {} // eats the addition response. + Err(err) => match err { + RecvTimeoutError::Timeout => {} + _ => unreachable!("got {}, should have been ok or time out", err), + }, + } + } + } + + impl From for TestDocument { + fn from(value: DocDef) -> Self { + let (tx, rx) = channel(); + let mut queue = Queue::new(); + let id = queue.add_sender(tx); + Self { + docdef: value, + queue: queue, + sender_id: id, + rx: rx, + } + } + } + + #[test] + fn does_not_respond_to_create() { + let name = Name::english("quiet"); + let docdef = DocDef::new(name.clone()); + let mut test_doc: TestDocument = docdef.into(); + let alt = Name::english("alternate"); + test_doc.start(standard_paths()); + let docdef = DocDef::new(alt); + let msg = Message::new(name.clone(), docdef); + test_doc.get_queue().send(msg); + match test_doc.get_receiver().recv_timeout(TIMEOUT) { + Ok(msg) => unreachable!("should not receive: {:?}", msg), + Err(err) => match err { + RecvTimeoutError::Timeout => {} + _ => unreachable!("should have timed out"), + }, + } + } + + #[test] + fn does_document_respond_to_requests() { + let name = Name::english("listen"); + let docdef = DocDef::new(name.clone()); + let mut test_doc: TestDocument = docdef.into(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let msg_actions = [ + MsgAction::Addition(Addition::new(name.clone())), + MsgAction::Delete(Delete::new(Query::internal())), + MsgAction::Query(Query::new(name.clone())), + MsgAction::Show, + MsgAction::Update(Update::new(Query::internal())), + ]; + for msg_action in msg_actions.iter() { + let msg = Message::new(name.clone(), msg_action.clone()); + queue.send(msg.clone()); + let result = match test_doc.get_receiver().recv_timeout(TIMEOUT) { + Ok(data) => data.clone(), + Err(err) => unreachable!("for {:?} got {:?}", msg_action, err), + }; + assert_eq!( + result.get_message_id(), + msg.get_message_id(), + "for {:?} response and reply ids should equal", + msg_action + ); + match result.get_action() { + MsgAction::Reply(data) => { + assert_eq!(data.len(), 0, "for {:?} got {:?}", msg_action, result) + } + MsgAction::Records(data) => { + assert_eq!(data.len(), 0, "for {:?} got {:?}", msg_action, result) + } + _ => unreachable!( + "for {:?} got {:?}: should have received a reply", + msg_action, + result.get_action() + ), + } + } + } + + #[test] + fn does_not_respond_to_other_document_requests() { + let name = Name::english("quiet"); + let alt = Name::english("alternate"); + let docdef = DocDef::new(name.clone()); + let mut test_doc: TestDocument = docdef.into(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let reg_msg = Register::new( + test_doc.get_sender_id(), + RegMsg::AddDocName([alt.clone()].to_vec()), + ); + let setup = Message::new(NameType::None, reg_msg.clone()); + queue.send(setup); + test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let msg_actions = [ + MsgAction::Addition(Addition::new(alt.clone())), + MsgAction::Create(DocDef::new(name.clone())), + MsgAction::Delete(Delete::new(Query::new(alt.clone()))), + MsgAction::Query(Query::internal().into()), + MsgAction::Show, + MsgAction::Update(Update::new(Query::internal())), + ]; + let mut msgs: HashMap = HashMap::new(); + for msg_action in msg_actions.iter() { + let msg = Message::new(alt.clone(), msg_action.clone()); + msgs.insert(msg.get_message_id().clone(), msg_action.clone()); + queue.send(msg); + } + match test_doc.get_receiver().recv_timeout(TIMEOUT) { + Ok(msg) => unreachable!( + "for {:?} should not receive: {:?}", + msgs.get(msg.get_message_id()).unwrap(), + msg + ), + Err(err) => match err { + RecvTimeoutError::Timeout => {} + _ => unreachable!("got {:?}, should have timed out", err), + }, + } + } + + #[test] + fn query_sends_on_query_message() { + let count = 5; + let mut data: HashSet = HashSet::new(); + while data.len() < count { + let field: Field = Uuid::new_v4().into(); + data.insert(field); + } + let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); + let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); + let queue = test_doc.get_queue(); + let routes = [Path::new( + Include::All, + Include::All, + Include::Just(Action::OnQuery), + )] + .to_vec(); + test_doc.start(routes); + for item in data.iter() { + test_doc.populate([item.clone()].to_vec()); + } + let msg = Message::new(doc_name.clone(), Query::new(doc_name.clone())); + queue.send(msg.clone()); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + assert_eq!( + result.get_message_id(), + msg.get_message_id(), + "message ids should match" + ); + match result.get_action() { + MsgAction::OnQuery(output) => { + assert_eq!( + output.len(), + count, + "wrong number of entries: got {:?}", + output + ); + for rec in output.iter() { + assert!(data.contains(&rec.get(Name::english("field0")).unwrap())); + } + } + _ => unreachable!("should never get here"), + } + } + + #[test] + fn send_on_addition_message() { + let data: Field = Uuid::new_v4().into(); + let field_name = Name::english("field0"); + let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); + let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); + let queue = test_doc.get_queue(); + let routes = vec![Path::new( + Include::All, + Include::All, + Include::Just(Action::OnAddition), + )]; + test_doc.start(routes); + let mut add = Addition::new(doc_name.clone()); + add.add_field(field_name.clone(), data.clone()); + let msg = Message::new(doc_name.clone(), add); + queue.send(msg.clone()); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + assert_eq!( + result.get_message_id(), + msg.get_message_id(), + "message ids should match" + ); + match result.get_action() { + MsgAction::OnAddition(output) => { + assert_eq!(output.len(), 1, "wrong number of entries: got {:?}", output); + for rec in output.iter() { + assert_eq!(rec.get(Name::english("field0")).unwrap(), data); + } + } + _ => unreachable!("should never get here"), + } + } + + #[test] + fn sends_on_delete_message() { + let count = 5; + let mut data: HashSet = HashSet::new(); + while data.len() < count { + let field: Field = Uuid::new_v4().into(); + data.insert(field); + } + let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); + let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); + let queue = test_doc.get_queue(); + let routes = [Path::new( + Include::All, + Include::All, + Include::Just(Action::OnDelete), + )] + .to_vec(); + test_doc.start(routes); + for item in data.iter() { + test_doc.populate([item.clone()].to_vec()); + } + let msg = Message::new(doc_name.clone(), Delete::new(Query::internal())); + queue.send(msg.clone()); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + assert_eq!( + result.get_message_id(), + msg.get_message_id(), + "message ids should match" + ); + match result.get_action() { + MsgAction::OnDelete(output) => { + assert_eq!( + output.len(), + count, + "wrong number of entries: got {:?}", + output + ); + for rec in output.iter() { + assert!(data.contains(&rec.get(Name::english("field0")).unwrap())); + } + } + _ => unreachable!("should never get here"), + } + } + + #[test] + fn sends_on_update_message() { + let count = 5; + let field_name = Name::english("field0"); + let mut data: HashSet = HashSet::new(); + while data.len() < count { + let field: Field = Uuid::new_v4().into(); + data.insert(field); + } + let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); + let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); + let queue = test_doc.get_queue(); + let routes = [Path::new( + Include::All, + Include::All, + Include::Just(Action::OnUpdate), + )] + .to_vec(); + test_doc.start(routes); + for item in data.iter() { + test_doc.populate([item.clone()].to_vec()); + } + let mut update = Update::new(Query::internal()); + update + .get_values_mut() + .add_field(field_name.clone(), Uuid::nil()); + let msg = Message::new(doc_name.clone(), update); + queue.send(msg.clone()); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + assert_eq!( + result.get_message_id(), + msg.get_message_id(), + "message ids should match" + ); + match result.get_action() { + MsgAction::OnUpdate(output) => { + assert_eq!( + output.len(), + count, + "wrong number of entries: got {:?}", + output + ); + for rec in output.iter() { + assert_eq!(rec.get(&field_name).unwrap(), Uuid::nil().into()); + } + } + _ => unreachable!("should never get here"), + } + } + + #[test] + fn can_document_be_added() { + let doc_name = Name::english("document"); + let mut docdef = DocDef::new(doc_name.clone()); + let name = Name::english("field"); + let data = Uuid::new_v4(); + docdef.add_field(name.clone(), FieldType::Uuid); + let mut test_doc: TestDocument = docdef.clone().into(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let mut new_doc = Addition::new(doc_name.clone()); + new_doc.add_field(name.clone(), data.clone()); + let testing = |msg: Message| { + queue.send(msg.clone()); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + match result.get_action() { + MsgAction::Records(output) => { + assert_eq!(output.len(), 1); + for rec in output.iter() { + let holder = rec.get(&name).unwrap(); + match holder { + Field::Uuid(field_data) => assert_eq!(field_data, data), + _ => unreachable!("got {:?}, should have been uuid", holder), + } + } + } + _ => unreachable!( + "\n\ngot {:?}\n\nfor {:?}\n\nshould have been records", + result, msg + ), + } + }; + testing(Message::new(doc_name.clone(), new_doc)); + testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); + } + + #[test] + fn can_add_multiple_documents() { + let doc_name = Name::english("multiple"); + let mut docdef = DocDef::new(doc_name.clone()); + let name = Name::english("count"); + docdef.add_field(name.clone(), FieldType::Integer); + let mut test_doc: TestDocument = docdef.clone().into(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let count = 5; + for i in 0..count { + let mut new_doc = Addition::new(doc_name.clone()); + new_doc.add_field(name.clone(), i); + queue.send(Message::new(doc_name.clone(), new_doc)); + test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + } + queue.send(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + let mut entries: HashSet = (0..count).collect(); + match action { + MsgAction::Records(output) => { + let entry_count: usize = count.try_into().unwrap(); + assert_eq!( + output.len(), + entry_count, + "should have the same number of entries" + ); + for record in output.iter() { + let holder = record.get(&name).unwrap(); + let data = match holder { + Field::Integer(item) => item.clone(), + _ => unreachable!("got {:?}, should have been integer", holder), + }; + assert!( + entries.contains(&data), + "did not find {:?} in {:?}", + data, + entries + ); + entries.remove(&data); + } + } + _ => unreachable!("\n\ngot {:?}\n\nshould have been records", action), + } + assert!(entries.is_empty(), "did not use {:?}", entries); + } + + #[test] + fn errors_on_wrong_field_name() { + let mut test_doc = TestDocument::new(Vec::new()); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let name = Name::english("bad"); + let mut addition = Addition::new(test_doc.doc_name()); + addition.add_field(name.clone(), "doesn't matter"); + queue.send(Message::new( + test_doc.get_docdef().get_document_names()[0].clone(), + addition, + )); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + match result.get_action() { + MsgAction::Error(err) => match err { + MTTError::NameNotFound(data) => assert_eq!(data, &name), + _ => unreachable!("got {:?}: should have been document field not found.", err), + }, + _ => unreachable!("got {:?}: should have been an error", result.get_action()), + } + } + + #[test] + fn errors_on_wrong_field_type() { + let mut test_doc = TestDocument::new([FieldType::Uuid].to_vec()); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let mut addition = Addition::new(test_doc.doc_name()); + addition.add_field(Name::english("field0"), "string"); + queue.send(Message::new( + test_doc.get_docdef().get_document_names()[0].clone(), + addition, + )); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + match result.get_action() { + MsgAction::Error(err) => match err { + MTTError::DocumentFieldWrongDataType(expected, got) => { + assert_eq!(got, &FieldType::StaticString); + assert_eq!(expected, &FieldType::Uuid); + } + _ => unreachable!( + "got {:?}: should have been document field data mismatch.", + err + ), + }, + _ => unreachable!("got {:?}: should have been an error", result.get_action()), + } + } + + #[test] + fn errors_on_missing_fields() { + let mut test_doc = TestDocument::new([FieldType::Integer, FieldType::Integer].to_vec()); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let mut addition = Addition::new(test_doc.doc_name()); + addition.add_field(Name::english("field0"), 1); + queue.send(Message::new( + test_doc.get_docdef().get_document_names()[0].clone(), + addition, + )); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + match result.get_action() { + MsgAction::Error(err) => match err { + MTTError::InvalidNone => {} + _ => unreachable!("got {:?}: should have been document field missing", err), + }, + _ => unreachable!("got {:?}: should have been an error", result.get_action()), + } + } + + #[test] + fn does_query_return_related_entries() { + let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); + let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let count = 5; + let expected = 3; + for i in 0..count { + test_doc.populate([i.into()].to_vec()); + } + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(expected.clone()).unwrap(); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + let mut query = Query::new(doc_name); + query.add(Name::english("field0"), calc); + queue.send(Message::new( + test_doc.get_docdef().get_document_names()[0].clone(), + query, + )); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!( + data.len(), + 1, + "should return one entry containing {:?} got:\n{:?}", + expected, + action + ); + for doc in data.iter() { + assert_eq!(doc.get(&Name::english("field0")).unwrap(), expected.into()); + } + } + _ => unreachable!("got {:?}: should have been a reply", action), + } + } + + #[test] + fn does_query_work_with_greater_than() { + let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); + let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + test_doc.populate([1.into()].to_vec()); + test_doc.populate([2.into()].to_vec()); + let mut calc = Calculation::new(Operand::GreaterThan); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + calc.add_value(1).unwrap(); + let mut query = Query::new(doc_name); + query.add(Name::english("field0"), calc); + queue.send(Message::new( + test_doc.get_docdef().get_document_names()[0].clone(), + query, + )); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!( + data.len(), + 1, + "should return one entry containing 2 got:\n{:?}", + action + ); + for doc in data.iter() { + assert_eq!(doc.get(&Name::english("field0")).unwrap(), 2.into()); + } + } + _ => unreachable!("got {:?}: should have been a reply", action), + } + } + + #[test] + fn gets_all_documents_in_query() { + let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); + let doc_name = test_doc.get_docdef().get_document_names()[0].clone(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let data = 1; + let count = 5; + for i in 0..count { + let holder: i128 = (i + count).try_into().unwrap(); + test_doc.populate([holder.into()].to_vec()); + test_doc.populate([data.into()].to_vec()); + } + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(data.clone()).unwrap(); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + let mut query = Query::new(doc_name); + query.add(Name::english("field0"), calc); + queue.send(Message::new( + test_doc.get_docdef().get_document_names()[0].clone(), + query, + )); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!( + docs.len(), + count, + "should return one entry containing {:?} got:\n{:?}", + data, + action + ); + for doc in docs.iter() { + assert_eq!(doc.get(&Name::english("field0")).unwrap(), data.into()); + } + } + _ => unreachable!("got {:?}: should have been a reply", action), + } + } + + #[test] + fn query_should_work_with_multiple_fields() { + let mut doc = + TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); + doc.start(standard_paths()); + let values = [ + ["a".into(), "a".into()].to_vec(), + ["a".into(), "b".into()].to_vec(), + ["b".into(), "a".into()].to_vec(), + ["b".into(), "b".into()].to_vec(), + ]; + for value in values.iter() { + doc.populate(value.clone()); + } + let mut query = Query::new(doc.doc_name()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("a").unwrap(); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + query.add(Name::english("field0"), calc); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("b").unwrap(); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + query.add(Name::english("field1"), calc); + doc.send(query); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + let afield: Field = "a".into(); + let bfield: Field = "b".into(); + assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); + for doc in data.iter() { + assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); + assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); + } + } + _ => unreachable!("got {:?}: should have been a reply", action), + } + } + + #[test] + fn query_should_work_with_multiple_inexed_fields() { + let mut doc = + TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); + let docdef = doc.get_docdef_mut(); + docdef + .add_index(&Name::english("field0"), IndexType::Index) + .unwrap(); + docdef + .add_index(&Name::english("field1"), IndexType::Index) + .unwrap(); + doc.start(standard_paths()); + let values = [ + ["a".into(), "a".into()].to_vec(), + ["a".into(), "b".into()].to_vec(), + ["b".into(), "a".into()].to_vec(), + ["b".into(), "b".into()].to_vec(), + ]; + for value in values.iter() { + doc.populate(value.clone()); + } + let mut query = Query::new(doc.doc_name()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("a").unwrap(); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + query.add(Name::english("field0"), calc); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("b").unwrap(); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + query.add(Name::english("field1"), calc); + doc.send(query); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + let afield: Field = "a".into(); + let bfield: Field = "b".into(); + assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); + for doc in data.iter() { + assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); + assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); + } + } + _ => unreachable!("got {:?}: should have been a reply", action), + } + } + + #[test] + fn query_should_work_with_mixed_inexed_fields() { + let mut doc = + TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); + let docdef = doc.get_docdef_mut(); + docdef + .add_index(&Name::english("field0"), IndexType::Index) + .unwrap(); + doc.start(standard_paths()); + let values = [ + ["a".into(), "a".into()].to_vec(), + ["a".into(), "b".into()].to_vec(), + ["b".into(), "a".into()].to_vec(), + ["b".into(), "b".into()].to_vec(), + ]; + for value in values.iter() { + doc.populate(value.clone()); + } + let mut query = Query::new(doc.doc_name()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("a").unwrap(); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + query.add(Name::english("field0"), calc); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("b").unwrap(); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + query.add(Name::english("field1"), calc); + doc.send(query); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + let afield: Field = "a".into(); + let bfield: Field = "b".into(); + assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); + for doc in data.iter() { + assert_eq!(doc.get(&Name::english("field0")).unwrap(), afield); + assert_eq!(doc.get(&Name::english("field1")).unwrap(), bfield); + } + } + _ => unreachable!("got {:?}: should have been a reply", action), + } + } + + #[test] + fn errors_on_bad_field_name() { + let mut doc = TestDocument::new(Vec::new()); + doc.start(standard_paths()); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + let queue = doc.get_queue(); + let rx = doc.get_receiver(); + let field_name = Name::english("wrong"); + let mut query = Query::new(doc.doc_name()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("something").unwrap(); + query.add(field_name.clone(), calc); + let msg = Message::new(doc_name, query); + queue.send(msg); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(data) => match data { + MTTError::NameNotFound(output) => assert_eq!(output, &field_name), + _ => unreachable!("got {:?}: should been field not found", data), + }, + _ => unreachable!("got {:?}: should have been a error", action), + } + } + + #[test] + fn errors_on_bad_field_type_with_index() { + let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); + doc.get_docdef_mut() + .add_index(&Name::english("field0"), IndexType::Index) + .unwrap(); + doc.start(standard_paths()); + doc.populate([Uuid::nil().into()].to_vec()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value("notUUID").unwrap(); + let mut query = Query::new(doc.doc_name()); + query.add(Name::english("field0"), calc); + doc.send(query); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(data) => match data { + MTTError::FieldInvalidType => {} + _ => unreachable!("got {:?}: should been invalid field type", data), + }, + _ => unreachable!("got {:?}: should have been a error", action), + } + } + + #[test] + fn can_use_default_values() { + let doc_name = Name::english("default"); + let mut docdef = DocDef::new(doc_name.clone()); + let field_name = Name::english("holder"); + docdef.add_field(field_name.clone(), FieldType::StaticString); + docdef + .set_default(&field_name, FieldType::StaticString) + .unwrap(); + let mut test_doc: TestDocument = docdef.into(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let rx = test_doc.get_receiver(); + let new_doc = Addition::new(); + let msg = Message::new(doc_name, new_doc); + queue.send(msg); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), 1); + for doc in docs.iter() { + let expected: Field = "".into(); + assert_eq!(doc.get(&field_name).unwrap(), expected); + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + } + + #[test] + fn can_a_default_value_be_set() { + let doc_name = Name::english("assigned"); + let mut docdef = DocDef::new(doc_name.clone()); + let field_name = Name::english("id"); + docdef.add_field(field_name.clone(), FieldType::Uuid); + docdef.set_default(&field_name, Uuid::nil()).unwrap(); + let mut test_doc: TestDocument = docdef.into(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let rx = test_doc.get_receiver(); + let new_doc = Addition::new(); + let msg = Message::new(doc_name, new_doc); + queue.send(msg); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), 1); + for doc in docs.iter() { + let expected: Field = Uuid::nil().into(); + assert_eq!(doc.get(&field_name).unwrap(), expected); + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + } + + #[test] + fn can_default_values_be_overridden() { + let doc_name = Name::english("assigned"); + let mut docdef = DocDef::new(doc_name.clone()); + let field_name = Name::english("id"); + docdef.add_field(field_name.clone(), FieldType::Uuid); + docdef.set_default(&field_name, FieldType::Uuid).unwrap(); + let mut test_doc: TestDocument = docdef.into(); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + let rx = test_doc.get_receiver(); + let mut new_doc = Addition::new(); + new_doc.add_field(&field_name, Uuid::nil()); + let msg = Message::new(doc_name, new_doc); + queue.send(msg); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), 1); + for doc in docs.iter() { + let expected: Field = Uuid::nil().into(); + assert_eq!(doc.get(&field_name).unwrap(), expected); + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + } + + #[test] + fn empty_update_query_results_in_zero_changes() { + let count = 5; + let mut ids: HashSet = HashSet::new(); + while ids.len() < count { + ids.insert(Uuid::new_v4()); + } + let id = ids.iter().last().unwrap().clone(); + ids.remove(&id); + let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); + doc.start(standard_paths()); + for id in ids.iter() { + doc.populate([id.clone().into()].to_vec()); + } + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::Uuid)) + .unwrap(); + calc.add_value(Uuid::nil()).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let mut update = Update::new(query); + update + .get_values_mut() + .add_field(Name::english("field0"), Uuid::nil()); + doc.send(update); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => assert_eq!(docs.len(), 0), + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + } + + #[test] + fn changes_information_requested() { + let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); + doc.start(standard_paths()); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + let old = "old"; + let new = "new"; + let id = Uuid::new_v4(); + doc.populate([id.into(), old.into()].to_vec()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::Uuid)) + .unwrap(); + calc.add_value(id.clone()).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let mut update = Update::new(query); + update + .get_values_mut() + .add_field(Name::english("field1"), new); + let mut testing = |msg: Message| { + doc.get_queue().send(msg.clone()); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), 1, "for {:?}, should have one entry", msg); + for doc in docs.iter() { + assert_eq!(doc.get(Name::english("field0")).unwrap(), id.into()); + assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + }; + testing(Message::new(doc_name.clone(), update)); + testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); + } + + #[test] + fn changes_only_the_queried() { + let mut doc = TestDocument::new([FieldType::Integer, FieldType::StaticString].to_vec()); + doc.start(standard_paths()); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + let old = "old"; + let new = "new"; + let count = 5; + let field_count = count.clone().try_into().unwrap(); + let picked = 3; + for i in 0..field_count { + doc.populate([i.into(), old.into()].to_vec()); + } + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + calc.add_value(picked.clone()).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let mut update = Update::new(query); + update + .get_values_mut() + .add_field(Name::english("field1"), new); + doc.get_queue().send(Message::new(doc_name.clone(), update)); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), 1, "should have one entry"); + for doc in docs.iter() { + assert_eq!(doc.get(Name::english("field0")).unwrap(), picked.into()); + assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + doc.get_queue() + .send(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), count, "should have one entry"); + for doc in docs.iter() { + if doc.get(Name::english("field0")).unwrap() == picked.into() { + assert_eq!( + doc.get(Name::english("field1")).unwrap(), + new.into(), + "{:?}", + docs + ); + } else { + assert_eq!( + doc.get(Name::english("field1")).unwrap(), + old.into(), + "{:?}", + docs + ); + } + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + } + + #[test] + fn can_handle_multiple_updates() { + let mut doc = TestDocument::new([FieldType::Integer, FieldType::StaticString].to_vec()); + doc.start(standard_paths()); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + let old = "old"; + let new = "new"; + let count = 5; + let picked = 3; + for _ in 0..count { + doc.populate([picked.into(), old.into()].to_vec()); + } + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + calc.add_value(picked.clone()).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let mut update = Update::new(query); + update + .get_values_mut() + .add_field(Name::english("field1"), new); + let mut testing = |msg: Message| { + doc.get_queue().send(msg); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), count, "should have one entry"); + for doc in docs.iter() { + assert_eq!(doc.get(Name::english("field0")).unwrap(), picked.into()); + assert_eq!(doc.get(Name::english("field1")).unwrap(), new.into()); + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + }; + testing(Message::new(doc_name.clone(), update)); + testing(Message::new(doc_name.clone(), Query::new(doc_name.clone()))); + } + + #[test] + fn update_errors_on_bad_field_name() { + let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); + doc.start(standard_paths()); + let id = Uuid::new_v4(); + let old = "old"; + let new = "new"; + let bad_name = Name::english("wrong"); + doc.populate([id.into(), old.into()].to_vec()); + let mut update = Update::new(Query::internal()); + update.get_values_mut().add_field(bad_name.clone(), new); + doc.send(update); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(err) => match err { + MTTError::NameNotFound(data) => assert_eq!(data, &bad_name), + _ => unreachable!("got {:?}: should have gotten an missing field", err), + }, + _ => unreachable!("got {:?}: should have gotten an error", action), + } + } + + #[test] + fn update_errors_on_bad_field_type() { + let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); + doc.start(standard_paths()); + let id = Uuid::new_v4(); + let old = "old"; + let new = Uuid::nil(); + doc.populate([id.into(), old.into()].to_vec()); + + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::Uuid)) + .unwrap(); + calc.add_value(id.clone()).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let mut update = Update::new(query); + update + .get_values_mut() + .add_field(Name::english("field1"), new); + doc.send(update); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(err) => match err { + MTTError::DocumentFieldWrongDataType(expected, got) => { + assert_eq!(expected, &FieldType::StaticString); + assert_eq!(got, &FieldType::Uuid); + } + _ => unreachable!("got {:?}: should have gotten incorrect file type", err), + }, + _ => unreachable!("got {:?}: should have gotten an error", action), + } + } + + #[test] + fn does_update_maintain_unique_fields() { + let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); + test_doc + .get_docdef_mut() + .add_index(&Name::english("field0"), IndexType::Unique) + .unwrap(); + test_doc.start(standard_paths()); + let fname = Name::english("field0"); + let old = 3; + let new = 5; + test_doc.populate([old.into()].to_vec()); + + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + calc.add_value(old.clone()).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let mut update = Update::new(query); + update.get_values_mut().add_field(&fname, new); + test_doc.send(update); + test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let mut should_clear = Addition::new(); + should_clear.add_field(fname.clone(), old); + let mut should_error = Addition::new(); + should_error.add_field(fname.clone(), new); + test_doc.send(should_clear); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), 1, "should have one entry"); + for doc in docs.iter() { + assert_eq!(doc.get(&fname).unwrap(), old.into()); + } + } + _ => unreachable!("got {:?}: should have gotten records", action), + } + test_doc.send(should_error); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(err) => match err { + MTTError::FieldDuplicate => {} + _ => unreachable!("got {:?}: should have gotten incorrect file type", err), + }, + _ => unreachable!("got {:?}: should have gotten an error", action), + } + } + + #[test] + fn unique_value_remains_available_if_failure_occurs() { + let f0name = Name::english("field0"); + let f1name = Name::english("field1"); + let mut test_doc = TestDocument::new([FieldType::Uuid, FieldType::Uuid].to_vec()); + test_doc + .get_docdef_mut() + .add_index(&f0name, IndexType::Unique) + .unwrap(); + test_doc.start(standard_paths()); + let f0data = Uuid::new_v4(); + let f1bad_data = "NotUuid"; + let f1good_data = Uuid::nil(); + let mut bad_addition = Addition::new(); + bad_addition.add_field(&f0name, f0data.clone()); + bad_addition.add_field(&f1name, f1bad_data); + let mut good_addition = Addition::new(); + good_addition.add_field(&f0name, f0data.clone()); + good_addition.add_field(&f1name, f1good_data.clone()); + test_doc.send(bad_addition); + test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + test_doc.send(good_addition); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(docs) => { + assert_eq!(docs.len(), 1, "should have one entry"); + for doc in docs.iter() { + assert_eq!(doc.get(&f0name).unwrap(), f0data.into()); + assert_eq!(doc.get(&f1name).unwrap(), f1good_data.into()); + } + } + _ => unreachable!("got {:?}: should have gotten records", action), + } + } + + #[test] + fn updating_unique_updates_index_entries() { + let fname = Name::english("field0"); + let mut doc = TestDocument::new([FieldType::StaticString].to_vec()); + doc.get_docdef_mut() + .add_index(&fname, IndexType::Unique) + .unwrap(); + doc.start(standard_paths()); + let old = "old"; + let new = "new"; + let fold: Field = old.into(); + doc.populate([old.into()].to_vec()); + + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + calc.add_value(old).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let mut update = Update::new(query); + update.get_values_mut().add_field(fname.clone(), new); + doc.send(update); + doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let mut old_addition = Addition::new(); + old_addition.add_field(&fname, old); + doc.send(old_addition); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!(data.len(), 1); + for doc in data.iter() { + assert_eq!(doc.get(&fname).unwrap(), fold); + } + } + _ => unreachable!("got {:?}: should have gotten a reply", action), + } + let mut new_addition = Addition::new(); + new_addition.add_field(fname.clone(), new); + doc.send(new_addition); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(err) => match err { + MTTError::FieldDuplicate => {} + _ => unreachable!("got {:?}: should have gotten an missing field", err), + }, + _ => unreachable!("got {:?}: should have gotten an error", action), + } + } + + #[test] + fn update_does_not_override_unique_index() { + let f0name = Name::english("field0"); + let f1name = Name::english("field1"); + let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); + doc.get_docdef_mut() + .add_index(&f0name, IndexType::Unique) + .unwrap(); + doc.start(standard_paths()); + let count = 5; + let data = "data"; + let mut ids: HashSet = HashSet::new(); + while ids.len() < count { + ids.insert(Uuid::new_v4()); + } + let holder = ids.iter().last().unwrap().clone(); + ids.remove(&holder); + for id in ids.iter() { + doc.populate([id.clone().into(), data.into()].to_vec()); + } + + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::StaticString)) + .unwrap(); + calc.add_value(data).unwrap(); + let mut query = Query::internal(); + query.add(&f1name, calc); + let mut update = Update::new(query); + update.get_values_mut().add_field(&f0name, holder.clone()); + doc.send(update); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(err) => match err { + MTTError::FieldDuplicate => {} + _ => unreachable!("got {:?}: should have gotten field duplicate", err), + }, + _ => unreachable!("got {:?}: should have gotten an error", action), + } + let query = Query::new(doc.doc_name()); + doc.send(query); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!(data.len(), ids.len()); + for doc in data.iter() { + match doc.get(&f0name).unwrap() { + Field::Uuid(id) => { + assert!(ids.contains(&id)); + ids.remove(&id); + } + _ => unreachable!("did not get uuid"), + } + } + } + _ => unreachable!("got {:?}: should have gotten reply", action), + } + assert!(ids.is_empty(), "did not find {:?}", ids); + } + + #[test] + fn can_calculate_field_values() { + let fname = Name::english("field0"); + let mut doc = TestDocument::new([FieldType::DateTime].to_vec()); + doc.start(standard_paths()); + let duration = Duration::from_secs(300); + let mut calc = Calculation::new(Operand::Add); + calc.add_value(FieldType::DateTime).unwrap(); + calc.add_value(duration.clone()).unwrap(); + let mut addition = Addition::new(); + addition.add_field(&fname, calc); + let start = Utc::now() + duration; + doc.send(addition); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let stop = Utc::now() + duration; + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!(data.len(), 1); + for doc in data.iter() { + match doc.get(&fname).unwrap() { + Field::DateTime(datetime) => assert!(datetime > start && datetime < stop), + _ => unreachable!("did not get uuid"), + } + } + } + _ => unreachable!("got {:?}: should have gotten reply", action), + } + } + + #[test] + fn can_delete() { + let fname = Name::english("field0"); + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + doc.start(standard_paths()); + doc.populate([1.into()].to_vec()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(1).unwrap(); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + let mut query = Query::new(doc.doc_name()); + query.add(&fname, calc); + let delete = Delete::new(query.clone()); + doc.send(delete); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!(data.len(), 1); + for doc in data.iter() { + match doc.get(&fname).unwrap() { + Field::Integer(num) => assert_eq!(num, 1), + _ => unreachable!("did not get uuid"), + } + } + } + _ => unreachable!("got {:?}: should have gotten reply", action), + } + doc.send(query); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => assert_eq!(data.len(), 0), + _ => unreachable!("got {:?}, should have been empty", action), + } + } + + #[test] + fn does_delete_return_query_errors() { + let field_name = Name::english("wrong"); + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + doc.start(standard_paths()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + calc.add_value(1).unwrap(); + let mut query = Query::internal(); + query.add(field_name.clone(), calc); + let delete = Delete::new(query); + doc.send(delete); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Error(err) => match err { + MTTError::NameNotFound(data) => assert_eq!(data, &field_name), + _ => unreachable!("got {:?}: should have gotten an missing field", err), + }, + _ => unreachable!("got {:?}: should have gotten an error", action), + } + } + + #[test] + fn does_delete_update_indexes() { + let fname = Name::english("field0"); + let value = 1; + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + doc.get_docdef_mut() + .add_index(&fname, IndexType::Unique) + .unwrap(); + doc.start(standard_paths()); + doc.populate([value.into()].to_vec()); + doc.send(Delete::new(Query::internal())); + doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let mut addition = Addition::new(); + addition.add_field(&fname, value.clone()); + doc.send(addition); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => assert_eq!(data.len(), 1), + _ => unreachable!("got {:?}, should have added entry", action), + } + } + + #[test] + fn can_query_trigger_reaction() { + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + let path = Path::new( + Include::All, + Include::Just(doc_name.clone().into()), + Include::Just(Action::OnQuery), + ); + let mut update = Update::new(Query::internal()); + let mut calc = Calculation::new(Operand::Add); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + calc.add_value(1).unwrap(); + update + .get_values_mut() + .add_field(Name::english("field0"), calc); + let function = DocFuncType::ExistingQuery(update.into()); + doc.get_docdef_mut().add_route(path, function); + let mut paths = standard_paths(); + paths.push(Path::new( + Include::All, + Include::Just(doc_name.clone().into()), + Include::Just(Action::OnUpdate), + )); + doc.start(paths); + doc.populate([0.into()].to_vec()); + for i in 0..5 { + let expected: Field = i.try_into().unwrap(); + doc.send(Query::new(doc_name.clone())); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!(data.len(), 1); + for rec in data.iter() { + assert_eq!(rec.get(&Name::english("field0")).unwrap(), expected); + } + } + _ => unreachable!("got {:?}, should have added entry", action), + } + let on_update = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + match on_update.get_action() { + MsgAction::OnUpdate(recs) => { + let expected: Field = (i + 1).into(); + assert_eq!(recs.len(), 1); + for rec in recs.iter() { + assert_eq!(rec.get(Name::english("field0")).unwrap(), expected); + } + } + _ => unreachable!("should only be on update"), + } + } + } + + #[test] + fn can_an_action_trigger_an_action() { + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + let queue = doc.get_queue(); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + crate::document::clock::Clock::start(queue.clone()); + let mut calc = Calculation::new(Operand::GreaterThan); + calc.add_value(CalcValue::Existing(FieldType::Integer)) + .unwrap(); + calc.add_value(1).unwrap(); + let mut query = Query::internal(); + query.add(Name::english("field0"), calc); + let delete = Delete::new(query.clone()); + let path = Path::new( + Include::All, + Include::Just(Name::english("clock").into()), + Include::Just(Action::OnUpdate), + ); + let function = DocFuncType::Trigger(delete.into()); + doc.get_docdef_mut().add_route(path, function); + let mut paths = standard_paths(); + paths.push(Path::new( + Include::All, + Include::Just(doc_name.clone().into()), + Include::Just(Action::Delete), + )); + doc.start(paths); + let queue = doc.get_queue(); + for item in 1..3 { + doc.populate([item.into()].to_vec()); + } + let trigger = Message::new( + Name::english("clock"), + MsgAction::OnUpdate(Records::new(Names::new())), + ); + queue.send(trigger.clone()); + sleep(TIMEOUT); + let msg = Message::new(doc_name.clone(), Query::new(doc_name.clone())); + queue.send(msg.clone()); + let mut result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + while result.get_message_id() != msg.get_message_id() { + result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + } + let action = result.get_action(); + let expected: Field = 1.into(); + match action { + MsgAction::Records(data) => { + assert_eq!(data.len(), 1, "should have been one record:\n{:?}", data); + for rec in data.iter() { + assert_eq!(rec.get(Name::english("field0")).unwrap(), expected); + } + } + _ => unreachable!("should return records"), + } + } +} diff --git a/src/action.rs b/src/action.rs index 9d06931..5b3932a 100644 --- a/src/action.rs +++ b/src/action.rs @@ -6,10 +6,7 @@ mod query; mod request_data; mod user; -pub use crate::document::{ - definition::DocDef, - field::{Field, FieldType}, -}; +pub use crate::document::{DocDef, Field, FieldType, Records}; pub use action_type::Action; pub use addition::Addition; pub use calculation::{CalcValue, Calculation, Operand}; diff --git a/src/action/message.rs b/src/action/message.rs index 3c5aff2..e4c1221 100644 --- a/src/action/message.rs +++ b/src/action/message.rs @@ -1,7 +1,7 @@ -use super::{Addition, DocDef, Query, UserAction}; +use super::{Addition, DocDef, Query, Records, UserAction}; use crate::{ message::{ - wrapper::{Delete, Records, Reply, Update}, + wrapper::{Delete, Reply, Update}, MessageAction, }, mtterror::MTTError, @@ -33,6 +33,7 @@ impl MessageAction for MsgAction { Self::Addition(data) => data.doc_name(), Self::Query(data) => data.doc_name(), Self::Create(data) => data.doc_name(), + Self::Error(data) => data.doc_name(), _ => &NameType::None, } } diff --git a/src/action/user.rs b/src/action/user.rs index ab9111d..d7282dd 100644 --- a/src/action/user.rs +++ b/src/action/user.rs @@ -1,5 +1,4 @@ -use super::Query; -use crate::document::{definition::DocDef, field::FieldType}; +use super::{DocDef, FieldType, Query}; pub enum UserAction { CreateDocument(DocDef), diff --git a/src/document.rs b/src/document.rs index 253157f..e8fb005 100644 --- a/src/document.rs +++ b/src/document.rs @@ -1,5 +1,13 @@ -pub mod clock; -pub mod create; -pub mod definition; -pub mod field; -pub mod session; +mod clock; +mod create; +mod definition; +mod field; +mod record; +mod session; + +pub use clock::Clock; +pub use create::CreateDoc; +pub use definition::DocDef; +pub use field::{Field, FieldType}; +pub use record::Records; +pub use session::Session; diff --git a/src/document/clock.rs b/src/document/clock.rs index 5590486..efb4ed3 100644 --- a/src/document/clock.rs +++ b/src/document/clock.rs @@ -1,6 +1,6 @@ use crate::{ - action::{Action, MsgAction}, - message::wrapper::{Message, Records}, + action::{Action, MsgAction, Records}, + message::wrapper::Message, name::{Name, NameType, Names}, queue::{ data_director::{Include, Path, RegMsg, Register}, diff --git a/src/document/create.rs b/src/document/create.rs index 3e97f28..11dc263 100644 --- a/src/document/create.rs +++ b/src/document/create.rs @@ -1,10 +1,10 @@ use crate::{ - action::{Action, CalcValue, Calculation, MsgAction, Query}, + action::{Action, CalcValue, Calculation, MsgAction, Query, Records}, document::{ definition::{DocDef, DocFuncType}, field::Field, }, - message::wrapper::{InternalRecord, InternalRecords, Message, Oid, Records, Reply, Update}, + message::wrapper::{InternalRecord, InternalRecords, Message, Oid, Reply, Update}, mtterror::{ErrorID, MTTError}, name::NameType, queue::{ diff --git a/src/document/record.rs b/src/document/record.rs new file mode 100644 index 0000000..91676ba --- /dev/null +++ b/src/document/record.rs @@ -0,0 +1,67 @@ +use crate::{ + message::wrapper::{InternalRecord, InternalRecords, Oid, Record}, + name::{Name, Names}, +}; + +#[derive(Clone, Debug)] +pub struct Records { + names: Names, + data: InternalRecords, +} + +impl Records { + pub fn new(names: Names) -> Self { + Self { + names: names, + data: InternalRecords::new(), + } + } + + pub fn with_data(names: Names, records: InternalRecords) -> Self { + Self { + names: names, + data: records, + } + } + + pub fn insert(&mut self, oid: Oid, record: InternalRecord) -> Option { + self.data.insert(oid, record) + } + + pub fn len(&self) -> usize { + self.data.len() + } + + pub fn iter(&self) -> impl Iterator { + RecordIter::new(self) + } + + pub fn get_internal_records(&self) -> &InternalRecords { + &self.data + } +} + +struct RecordIter { + names: Names, + recs: Vec, +} + +impl RecordIter { + fn new(records: &Records) -> Self { + Self { + names: records.names.clone(), + recs: records.data.values().cloned().collect(), + } + } +} + +impl Iterator for RecordIter { + type Item = Record; + + fn next(&mut self) -> Option { + match self.recs.pop() { + Some(rec) => Some(Record::with_data(self.names.clone(), rec.clone())), + None => None, + } + } +} diff --git a/src/document/session.rs b/src/document/session.rs index e9b8b38..2f77db5 100644 --- a/src/document/session.rs +++ b/src/document/session.rs @@ -85,13 +85,12 @@ impl Session { mod sessions { use super::*; use crate::{ - action::{Addition, MsgAction, Query}, + action::{Addition, MsgAction, Query, Records}, document::{ clock::{clock_test_support::gen_clock_message, Clock}, create::CreateDoc, field::Field, }, - message::wrapper::Records, mtterror::{ErrorID, MTTError}, name::{Name, NameType, Names}, queue::data_director::{Include, Path, RegMsg, Register}, diff --git a/src/lib.rs b/src/lib.rs index 398438a..dc5969d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ use action::{ Action, Addition, CalcValue, Calculation, DocDef, Field, FieldType, MsgAction, Operand, Query, UserAction, }; -use document::{clock::Clock, create::CreateDoc, session::Session}; +use document::{Clock, CreateDoc, Session}; use message::{wrapper::Message, MessageAction}; use queue::{ data_director::{Include, Path, RegMsg, Register}, diff --git a/src/message/wrapper.rs b/src/message/wrapper.rs index 615d516..ddeda2e 100644 --- a/src/message/wrapper.rs +++ b/src/message/wrapper.rs @@ -1,6 +1,5 @@ use crate::{ - action::{CalcValue, MsgAction, Operand, Query}, - document::field::{Field, FieldType}, + action::{CalcValue, Field, FieldType, MsgAction, Operand, Query}, mtterror::{ErrorID, MTTError}, name::{NameType, Names}, queue::data_director::{Include, Path, Route}, @@ -88,7 +87,7 @@ impl Message { #[cfg(test)] mod messages { use super::*; - use crate::{document::definition::DocDef, name::Name}; + use crate::{action::DocDef, name::Name}; #[test] fn can_the_document_be_a_named_reference() { @@ -429,7 +428,7 @@ impl InternalRecords { self.data.keys() } - fn values(&self) -> impl Iterator { + pub fn values(&self) -> impl Iterator { self.data.values() } @@ -437,7 +436,7 @@ impl InternalRecords { self.data.contains_key(oid) } - fn len(&self) -> usize { + pub fn len(&self) -> usize { self.data.len() } } @@ -449,7 +448,7 @@ pub struct Record { } impl Record { - fn with_data(names: Names, rec: InternalRecord) -> Self { + pub fn with_data(names: Names, rec: InternalRecord) -> Self { Self { names: names, data: rec, @@ -471,6 +470,7 @@ impl Record { } } +/* #[derive(Clone, Debug)] pub struct Records { names: Names, @@ -533,6 +533,7 @@ impl Iterator for RecordIter { } } } +*/ #[derive(Clone, Debug)] pub struct Document { diff --git a/src/mtterror.rs b/src/mtterror.rs index 699ef0c..7921169 100644 --- a/src/mtterror.rs +++ b/src/mtterror.rs @@ -1,5 +1,5 @@ use crate::{ - document::field::{Field, FieldType}, + action::{Field, FieldType}, message::MessageAction, name::{Name, NameType}, }; diff --git a/src/queue/data_director.rs b/src/queue/data_director.rs index e8dc801..fd5caea 100644 --- a/src/queue/data_director.rs +++ b/src/queue/data_director.rs @@ -121,7 +121,7 @@ impl Path { mod paths { use super::*; use crate::{ - message::wrapper::Records, + action::Records, name::{Name, Names}, };