diff --git a/src/message.rs b/src/message.rs index 38d7265..7bd6f33 100644 --- a/src/message.rs +++ b/src/message.rs @@ -337,6 +337,18 @@ impl Message { action: action.into(), } } + + fn forward(&self, doc_id: D, action: A) -> Self + where + D: Into, + A: Into, + { + Self { + msg_id: self.msg_id.clone(), + document_id: doc_id.into(), + action: action.into(), + } + } } #[cfg(test)] @@ -3791,17 +3803,11 @@ impl Query { } } - fn add(&mut self, name: NT, operation: Calculation) -> Result<(), MTTError> + fn add(&mut self, name: NT, operation: Calculation) where NT: Into, { - match operation.operation() { - Operand::Equal => { - self.data.insert(name.into(), operation); - Ok(()) - } - _ => Err(MTTError::QueryCannotChangeData), - } + self.data.insert(name.into(), operation); } fn get(&self, name: NT) -> Option @@ -4606,6 +4612,7 @@ struct DocumentFile { docdef: DocDef, docs: InternalRecords, indexes: Indexes, + name_id: Uuid, queue: Queue, routes: HashMap, rx: Receiver, @@ -4617,11 +4624,13 @@ impl DocumentFile { rx: Receiver, docdef: DocDef, routes: HashMap, + name_id: Uuid, ) -> Self { Self { docdef: docdef.clone(), docs: InternalRecords::new(), indexes: docdef.create_indexes(), + name_id: name_id, queue: queue, routes: routes, rx: rx, @@ -4673,7 +4682,7 @@ impl DocumentFile { }; route_action.insert(route_id.clone(), path_action.doc_function()); } - let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action); + let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action, name_id.clone()); spawn(move || { doc.listen(); }); @@ -4694,6 +4703,7 @@ impl DocumentFile { DocFuncType::Show => self.queue.send(msg.response(Reply::new())).unwrap(), DocFuncType::Update => self.update(&msg), DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), + DocFuncType::Trigger(action) => self.trigger(&msg, action), _ => {} } } @@ -4978,12 +4988,18 @@ impl DocumentFile { _ => panic!("should not get here"), }; } + + fn trigger(&self, msg: &Message, action: &MsgAction) { + self.queue + .send(msg.forward(self.name_id.clone(), action.clone())) + .unwrap(); + } } #[cfg(test)] mod document_files { use super::{support_test::TIMEOUT, *}; - use std::sync::mpsc::RecvTimeoutError; + use std::{sync::mpsc::RecvTimeoutError, thread::sleep}; fn standard_paths() -> Vec { [ @@ -5586,7 +5602,6 @@ mod document_files { for i in 0..count { test_doc.populate([i.into()].to_vec()); } - let mut query = Query::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(expected.clone()); calc.add_value(CalcValue::Existing(FieldType::Integer)); @@ -5615,6 +5630,41 @@ mod document_files { } } + #[test] + fn does_query_work_with_greater_than() { + let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); + test_doc.start(standard_paths()); + let queue = test_doc.get_queue(); + test_doc.populate([1.into()].to_vec()); + test_doc.populate([2.into()].to_vec()); + let mut query = Query::new(); + let mut calc = Calculation::new(Operand::GreaterThan); + calc.add_value(CalcValue::Existing(FieldType::Integer)); + calc.add_value(1); + let mut query = Query::new(); + query.add(Name::english("field0"), calc); + queue.send(Message::new( + test_doc.get_docdef().get_document_names()[0].clone(), + query, + )); + let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Records(data) => { + assert_eq!( + data.len(), + 1, + "should return one entry containing 2 got:\n{:?}", + action + ); + for doc in data.iter() { + assert_eq!(doc.get(&Name::english("field0")).unwrap(), 2.into()); + } + } + _ => unreachable!("got {:?}: should have been a reply", action), + } + } + #[test] fn gets_all_documents_in_query() { let mut test_doc = TestDocument::new([FieldType::Integer].to_vec()); @@ -6530,6 +6580,68 @@ mod document_files { } } } + + #[test] + fn can_an_action_trigger_an_action() { + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + let queue = doc.get_queue(); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + Clock::start(queue.clone()); + let mut calc = Calculation::new(Operand::GreaterThan); + calc.add_value(CalcValue::Existing(FieldType::Integer)); + calc.add_value(1); + let mut query = Query::new(); + query.add(Name::english("field0"), calc); + let delete = Delete::new(query.clone()); + let path = Path::new( + Include::All, + Include::Some(Name::english("clock").into()), + Include::Some(Action::OnUpdate), + ); + let function = DocFuncType::Trigger(delete.into()); + doc.get_docdef_mut().add_route(path, function); + let mut paths = standard_paths(); + paths.push(Path::new( + Include::All, + Include::Some(doc_name.clone().into()), + Include::Some(Action::Delete), + )); + doc.start(paths); + let queue = doc.get_queue(); + for item in 1..3 { + doc.populate([item.into()].to_vec()); + } + let trigger = Message::new( + Name::english("clock"), + MsgAction::OnUpdate(Records::new(Names::new())), + ); + queue.send(trigger.clone()).unwrap(); + let trigger_reply = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + assert_eq!( + trigger_reply.get_message_id(), + trigger.get_message_id(), + "msg {:?} had incorrect message id", + trigger_reply + ); + sleep(TIMEOUT); + let msg = Message::new(doc_name, Query::new()); + queue.send(msg.clone()).unwrap(); + let mut result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + while result.get_message_id() != msg.get_message_id() { + result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + } + let action = result.get_action(); + let expected: Field = 1.into(); + match action { + MsgAction::Records(data) => { + assert_eq!(data.len(), 1, "should have been one record:\n{:?}", data); + for rec in data.iter() { + assert_eq!(rec.get(Name::english("field0")).unwrap(), expected); + } + } + _ => unreachable!("should return records"), + } + } } #[cfg(test)]