diff --git a/src/message.rs b/src/message.rs index ec58198..7c6c683 100644 --- a/src/message.rs +++ b/src/message.rs @@ -48,6 +48,8 @@ enum Action { Create, Delete, Error, + GetLog, + Log, OnAddition, OnDelete, OnQuery, @@ -67,6 +69,8 @@ impl From for Action { MsgAction::Create(_) => Action::Create, MsgAction::Delete(_) => Action::Delete, MsgAction::Error(_) => Action::Error, + MsgAction::GetLog(_) => Action::GetLog, + MsgAction::Log(_) => Action::Log, MsgAction::OnAddition(_) => Action::OnAddition, MsgAction::OnDelete(_) => Action::OnDelete, MsgAction::OnQuery(_) => Action::OnQuery, @@ -135,6 +139,8 @@ enum MsgAction { // Alter // Remove Error(MTTError), + GetLog(Uuid), + Log(Vec), OnAddition(Records), OnDelete(Records), OnQuery(Records), @@ -202,6 +208,18 @@ impl From for MsgAction { } } +impl From for MsgAction { + fn from(value: Uuid) -> Self { + MsgAction::GetLog(value) + } +} + +impl From<&Uuid> for MsgAction { + fn from(value: &Uuid) -> Self { + Self::from(value.clone()) + } +} + #[cfg(test)] mod msgactions { use super::*; @@ -2690,13 +2708,14 @@ impl IndexType { } } -#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[derive(Clone, Debug)] enum DocFuncType { - Action, Add, Delete, + ExistingQuery(MsgAction), Query, Show, + Trigger(MsgAction), Update, } @@ -2888,6 +2907,10 @@ impl DocDef { fn iter_routes(&self) -> impl Iterator { self.routes.iter() } + + fn add_route(&mut self, path: Path, action: DocFuncType) { + self.routes.push(PathAction::new(path, action)); + } } #[cfg(test)] @@ -2997,7 +3020,7 @@ mod docdefs { docdef.routes ); let mut actions: HashSet = HashSet::new(); - let mut doc_funcs: HashSet = HashSet::new(); + let mut doc_funcs: HashSet = HashSet::new(); for path_action in docdef.iter_routes() { let path = path_action.path(); match &path.msg_id { @@ -3024,11 +3047,11 @@ mod docdefs { }; let file_func = path_action.doc_function(); match file_func { - DocFuncType::Add => doc_funcs.insert(file_func), - DocFuncType::Delete => doc_funcs.insert(file_func), - DocFuncType::Query => doc_funcs.insert(file_func), - DocFuncType::Show => doc_funcs.insert(file_func), - DocFuncType::Update => doc_funcs.insert(file_func), + DocFuncType::Add => doc_funcs.insert("Add".to_string()), + DocFuncType::Delete => doc_funcs.insert("Delete".to_string()), + DocFuncType::Query => doc_funcs.insert("Query".to_string()), + DocFuncType::Show => doc_funcs.insert("Show".to_string()), + DocFuncType::Update => doc_funcs.insert("Update".to_string()), _ => unreachable!("got {:?}, which is not a default function", file_func), }; } @@ -3045,6 +3068,45 @@ mod docdefs { doc_funcs ); } + + #[test] + fn add_route_function() { + let docname = Name::english(Uuid::new_v4().to_string().as_str()); + let mut docdef = DocDef::new(docname.clone()); + docdef.add_route( + Path::new( + Include::All, + Include::Some(docname.clone().into()), + Include::Some(Action::OnQuery), + ), + DocFuncType::Trigger(Update::new(Query::new()).into()), + ); + let path_action = docdef.iter_routes().last().unwrap(); + let path = path_action.path(); + match &path.msg_id { + Include::All => {} + _ => unreachable!("got {:?}, message id should include all", path.msg_id), + }; + match &path.doc { + Include::Some(output) => match output { + NameType::Name(data) => assert_eq!(data, &docname), + _ => unreachable!("got {:?}, name type should be {:?}", path.doc, docname), + }, + _ => unreachable!("got {:?}, name type should be {:?}", path.doc, docname), + }; + match &path.action { + Include::Some(output) => match output { + Action::OnQuery => {} + _ => unreachable!("got {:?} which is not a additional action", output), + }, + _ => unreachable!("got {:?}, which is not on query action", path.action), + } + let file_func = path_action.doc_function(); + match file_func { + DocFuncType::Trigger(_) => {} + _ => unreachable!("got {:?}, which is not a default function", file_func), + } + } } #[derive(Clone, Debug)] @@ -3888,6 +3950,10 @@ impl Records { fn iter(&self) -> impl Iterator { RecordIter::new(self) } + + fn get_internal_records(&self) -> &InternalRecords { + &self.data + } } struct RecordIter { @@ -4495,6 +4561,7 @@ impl DocumentFile { DocFuncType::Query => self.query(&msg), DocFuncType::Show => self.queue.send(msg.response(Reply::new())).unwrap(), DocFuncType::Update => self.update(&msg), + DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), _ => {} } } @@ -4696,6 +4763,48 @@ impl DocumentFile { .unwrap(); } + fn run_update( + &mut self, + original: &InternalRecords, + update: &Update, + ) -> Result { + let mut changes: HashMap = HashMap::new(); + for (key, value) in update.get_values().iter() { + let field_id = match self.docdef.get_field_id(key) { + Ok(data) => data, + Err(err) => return Err(err), + }; + changes.insert(field_id, value); + } + let mut indexes = self.docdef.create_indexes(); + let mut updates = InternalRecords::new(); + for (oid, record) in original.iter() { + let mut holder = record.clone(); + for (field_id, value) in changes.iter() { + let field = value.get(holder.get(field_id).unwrap()); + let correction = match self.validate(field_id, &field) { + Ok(data) => data, + Err(err) => return Err(err), + }; + holder.insert(field_id.clone(), correction.clone()); + match indexes.add_to_index(&field_id, correction, oid.clone()) { + Ok(_) => {} + Err(err) => return Err(err), + } + } + updates.insert(oid.clone(), holder); + } + for (oid, new_rec) in updates.iter() { + let old_rec = original.get(oid).unwrap(); + for (field_id, index) in self.indexes.iter_mut() { + index.remove(old_rec.get(field_id).unwrap(), oid); + index.add(new_rec.get(field_id).unwrap().clone(), oid.clone()); + } + self.docs.insert(oid.clone(), new_rec.clone()); + } + Ok(updates.clone()) + } + fn update(&mut self, msg: &Message) { let update = match msg.get_action() { MsgAction::Update(data) => data, @@ -4709,58 +4818,33 @@ impl DocumentFile { return; } }; - let mut changes: HashMap = HashMap::new(); - for (key, value) in update.get_values().iter() { - let field_id = match self.docdef.get_field_id(key) { - Ok(data) => data, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply).unwrap(); - return; - } - }; - changes.insert(field_id, value); - } - let mut indexes = self.docdef.create_indexes(); - let mut updates = InternalRecords::new(); - for (oid, record) in original.iter() { - let mut holder = record.clone(); - for (field_id, value) in changes.iter() { - let field = value.get(holder.get(field_id).unwrap()); - let correction = match self.validate(field_id, &field) { - Ok(data) => data, - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply).unwrap(); - return; - } - }; - holder.insert(field_id.clone(), correction.clone()); - match indexes.add_to_index(&field_id, correction, oid.clone()) { - Ok(_) => {} - Err(err) => { - let reply = msg.response(err); - self.queue.send(reply).unwrap(); - return; - } - } + let data = match self.run_update(&original, update) { + Ok(output) => output, + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; } - updates.insert(oid.clone(), holder); - } - for (oid, new_rec) in updates.iter() { - let old_rec = original.get(oid).unwrap(); - for (field_id, index) in self.indexes.iter_mut() { - index.remove(old_rec.get(field_id).unwrap(), oid); - index.add(new_rec.get(field_id).unwrap().clone(), oid.clone()); - } - self.docs.insert(oid.clone(), new_rec.clone()); - } - let recs = Records::with_data(self.docdef.get_field_names().clone(), updates); + }; + let recs = Records::with_data(self.docdef.get_field_names().clone(), data); self.queue.send(msg.response(recs.clone())).unwrap(); self.queue .send(msg.response(MsgAction::OnUpdate(recs))) .unwrap(); } + + fn existing_query(&mut self, msg: &Message, action: &MsgAction) { + let recs = match msg.get_action() { + MsgAction::OnQuery(data) => data, + _ => unreachable!("should only receive on messages"), + }; + match action { + MsgAction::Update(change) => self + .run_update(recs.get_internal_records(), change) + .unwrap(), + _ => panic!("should not get here"), + }; + } } #[cfg(test)] @@ -6264,6 +6348,20 @@ mod document_files { #[ignore] fn can_query_trigger_reaction() { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + let doc_name = doc.get_docdef().get_document_names()[0].clone(); + let path = Path::new( + Include::All, + Include::Some(doc_name.into()), + Include::Some(Action::OnQuery), + ); + let mut update = Update::new(Query::new()); + let mut calc = Calculation::new(Operand::Add); + calc.add_value(CalcValue::Existing(FieldType::Integer)); + calc.add_value(1); + update + .get_values_mut() + .add_field(Name::english("field0"), calc); + let function = DocFuncType::ExistingQuery(update.into()); doc.start(standard_paths()); doc.populate([0.into()].to_vec()); for i in 0..5 { @@ -6499,3 +6597,232 @@ mod clocks { } } } + +#[derive(Clone, Debug)] +struct MsgEntry { + timestamp: DateTime, + message: Message, +} + +impl MsgEntry { + fn new(msg: Message) -> Self { + Self { + timestamp: Utc::now(), + message: msg, + } + } + + fn get_timestamp(&self) -> &DateTime { + &self.timestamp + } + + fn get_message(&self) -> &Message { + &self.message + } + + fn get_message_id(&self) -> &Uuid { + self.message.get_message_id() + } +} + +#[cfg(test)] +mod msg_entries { + use super::*; + + #[test] + fn creates_message_entry() { + let msg = Message::new(Name::english("holder"), Query::new()); + let start = Utc::now(); + let entry = MsgEntry::new(msg.clone()); + let end = Utc::now(); + assert!( + entry.get_timestamp() > &start, + "timestamp should be between start and end times" + ); + assert!( + entry.get_timestamp() < &end, + "timestamp should be between start and end times" + ); + assert_eq!(entry.get_message_id(), msg.get_message_id()); + } +} + +#[derive(Clone, Debug)] +struct MsgLogs { + data: HashMap>, +} + +impl MsgLogs { + fn new() -> Self { + Self { + data: HashMap::new(), + } + } + + fn add(&mut self, msg: Message) { + let entry = MsgEntry::new(msg); + let id = entry.get_message_id(); + let entries = match self.data.get_mut(id) { + Some(data) => data, + None => { + self.data.insert(id.clone(), Vec::new()); + self.data.get_mut(id).unwrap() + } + }; + entries.push(entry); + } + + fn get(&self, msg_id: &Uuid) -> Option<&Vec> { + self.data.get(msg_id) + } +} + +#[cfg(test)] +mod msg_logs { + use super::*; + + #[test] + fn can_add_message_to_log() { + let mut logs = MsgLogs::new(); + let msg = Message::new(Name::english("something"), Query::new()); + logs.add(msg.clone()); + let result = logs.get(msg.get_message_id()).unwrap(); + assert_eq!(result.len(), 1, "should be one entry"); + } + + #[test] + fn returns_none_when_no_logs_found() { + let logs = MsgLogs::new(); + match logs.get(&Uuid::nil()) { + Some(data) => unreachable!("got {:?}, should return none", data), + None => (), + } + } + + #[test] + fn stores_messages_with_responses() { + let mut logs = MsgLogs::new(); + let msg1 = Message::new(Name::english("something"), Query::new()); + let msg2 = msg1.response(Records::new(Names::new())); + logs.add(msg1.clone()); + logs.add(msg2.clone()); + let result = logs.get(msg1.get_message_id()).unwrap(); + assert_eq!(result.len(), 2, "should be two entry"); + let action1: Action = result[0].get_message().get_action().clone().into(); + let action2: Action = result[1].get_message().get_action().clone().into(); + assert_eq!(action1, Action::Query); + assert_eq!(action2, Action::Records); + } + + #[test] + fn messages_are_stored_by_ids() { + let mut logs = MsgLogs::new(); + let msg1 = Message::new(Name::english("something"), Query::new()); + let msg2 = Message::new(Name::english("something"), Query::new()); + logs.add(msg1.clone()); + logs.add(msg2.clone()); + let result1 = logs.get(msg1.get_message_id()).unwrap(); + let result2 = logs.get(msg2.get_message_id()).unwrap(); + assert_eq!(result1.len(), 1, "should be one entry"); + assert_eq!(result2.len(), 1, "should be one entry"); + assert_eq!(result1[0].get_message_id(), msg1.get_message_id()); + assert_eq!(result2[0].get_message_id(), msg2.get_message_id()); + } +} + +struct MessageLog { + data: MsgLogs, + queue: Queue, + rx: Receiver, +} + +impl MessageLog { + fn new(queue: Queue, rx: Receiver) -> Self { + Self { + data: MsgLogs::new(), + queue: queue, + rx: rx, + } + } + + fn start(mut queue: Queue) { + let (tx, rx) = channel(); + let mut logs = MessageLog::new(queue.clone(), rx); + let id = queue.add_sender(tx); + let reg_msg = Register::new( + id, + RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), + ); + let rmsg = Message::new(NameType::None, reg_msg); + queue.send(rmsg.clone()).unwrap(); + spawn(move || { + logs.listen(); + }); + } + + fn listen(&mut self) { + loop { + let msg = self.rx.recv().unwrap(); + match msg.get_action() { + MsgAction::GetLog(id) => match self.data.get(id) { + Some(data) => self + .queue + .send(msg.response(MsgAction::Log(data.clone()))) + .unwrap(), + None => self + .queue + .send(msg.response(MsgAction::Log(Vec::new()))) + .unwrap(), + }, + _ => self.data.add(msg), + } + } + } +} + +#[cfg(test)] +mod message_logs { + use super::{support_test::TIMEOUT, *}; + + #[test] + fn does_log_store_messages() { + let doc_name = Name::english("unimportant"); + let mut queue = Queue::new(); + MessageLog::start(queue.clone()); + let (tx, rx) = channel(); + let id = queue.add_sender(tx); + let reg_msg = Register::new(id, RegMsg::AddDocName(vec![doc_name.clone()])); + let rmsg = Message::new(NameType::None, reg_msg); + queue.send(rmsg.clone()).unwrap(); + let name_result = rx.recv().unwrap(); + let name_id = match name_result.get_action() { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::DocumentNameID(data) => data, + RegMsg::Error(err) => unreachable!("got {:?}, should have gotten data", err), + _ => unreachable!("should only return a name id or an error"), + }, + _ => unreachable!("should only return a name id or an error"), + }; + let request = Register::new( + id.clone(), + RegMsg::AddRoute(Path::new( + Include::All, + Include::All, + Include::Some(Action::Log), + )), + ); + queue.send(Message::new(NameType::None, request)).unwrap(); + rx.recv_timeout(TIMEOUT).unwrap(); + let msg = Message::new(doc_name.clone(), Query::new()); + let start = Utc::now(); + queue.send(msg.clone()).unwrap(); + let log_msg = Message::new(NameType::None, msg.get_message_id()); + queue.send(log_msg.clone()).unwrap(); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), log_msg.get_message_id()); + match result.get_action() { + MsgAction::Log(output) => assert_eq!(output.len(), 1), + _ => unreachable!("got {:?}, should have been log", result.get_action()), + } + } +}