From 47bbf65907cd56fc7cbb97f5f724694dee2774a3 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Tue, 11 Nov 2025 21:28:46 -0500 Subject: [PATCH] Queue replaces message name with id. --- src/message.rs | 220 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 171 insertions(+), 49 deletions(-) diff --git a/src/message.rs b/src/message.rs index 3a6007e..c27dc02 100644 --- a/src/message.rs +++ b/src/message.rs @@ -38,6 +38,7 @@ enum MTTError { NameMissingTranslation(Language), NameNotFound(Name), QueryCannotChangeData, + RouteRequiresDocumentID, } #[derive(Clone, Debug, Eq, Hash, PartialEq)] @@ -291,7 +292,10 @@ impl Message { ) } - fn reset_name_id(&mut self, name: NT) where NT: Into { + fn reset_name_id(&mut self, name: NT) + where + NT: Into, + { self.document_id = name.into(); } @@ -663,12 +667,16 @@ impl Names { if self.ids.contains_key(&data) { Ok(data) } else { - Err(MTTError::NameNotFound(Name::english( - data.to_string().as_str(), - ))) + if data == Uuid::nil() { + Ok(data) + } else { + Err(MTTError::NameNotFound(Name::english( + data.to_string().as_str(), + ))) + } } } - NameType::None => Err(MTTError::NameNotFound(Name::english("none"))), + NameType::None => Ok(Uuid::nil()), } } @@ -1072,6 +1080,25 @@ impl From<&RouteID> for Route { } } +impl TryFrom for Route { + type Error = MTTError; + + fn try_from(value: Path) -> Result { + let doc = match value.doc { + Include::Some(data) => match data { + NameType::ID(id) => Include::Some(id.clone()), + _ => return Err(MTTError::RouteRequiresDocumentID), + }, + Include::All => Include::All, + }; + Ok(Self { + action: value.action, + doc_type: doc, + msg_id: value.msg_id, + }) + } +} + #[cfg(test)] mod routes { use super::*; @@ -1194,33 +1221,32 @@ impl DocRegistry { fn listen(&mut self) { loop { - let msg = self.receiver.recv().unwrap(); + let mut msg = self.receiver.recv().unwrap(); match msg.get_action() { MsgAction::Register(data) => { let id = data.get_sender_id(); let reply = msg.response(self.register_action(data)); self.queue.forward(id, reply); } - _ => { - let path = msg.get_path(); - match self.doc_names.path_to_route(&path) { - Ok(route) => { - let mut send_to: HashSet = HashSet::new(); - for (route_id, senders) in self.routes.iter() { - if route == route_id.into() { - send_to = send_to.union(senders).cloned().collect(); - } - } - for send_id in send_to.iter() { - self.queue.forward(send_id, msg.clone()); + _ => match self.doc_names.get_id(msg.get_document_id()) { + Ok(doc_id) => { + msg.reset_name_id(doc_id); + let route: Route = msg.get_path().try_into().unwrap(); + let mut send_to: HashSet = HashSet::new(); + for (route_id, senders) in self.routes.iter() { + if route == route_id.into() { + send_to = send_to.union(senders).cloned().collect(); } } - Err(err) => self - .queue - .send(msg.response(MsgAction::Error(err))) - .unwrap(), + for send_id in send_to.iter() { + self.queue.forward(send_id, msg.clone()); + } } - } + Err(err) => self + .queue + .send(msg.response(MsgAction::Error(err))) + .unwrap(), + }, } } } @@ -1247,7 +1273,7 @@ impl DocRegistry { RegMsg::GetNameID(name) => match self.doc_names.get_id(name) { Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), Err(err) => reg.response(RegMsg::Error(err)), - } + }, _ => reg.response(RegMsg::Ok), } } @@ -1516,6 +1542,41 @@ mod queues { } } + #[test] + fn does_name_id_get_updated() { + let mut tester = TestQueue::new(); + let mut queue = tester.get_queue(); + let doc_name = Name::english("test"); + let reg_msg = Register::new( + tester.get_preset_id().clone(), + RegMsg::AddDocName([doc_name.clone()].to_vec()), + ); + let msg = Message::new(NameType::None, reg_msg); + queue.send(msg.clone()).unwrap(); + let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + let id = match action { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::DocumentNameID(data) => data.clone(), + _ => unreachable!("got {:?}, should have been register ok", action), + }, + _ => unreachable!("got {:?}, should have been register ok", action), + }; + let reg_msg = Register::new( + tester.get_preset_id().clone(), + RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), + ); + let msg = Message::new(NameType::None, reg_msg); + queue.send(msg.clone()).unwrap(); + tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); + let msg = Message::new(doc_name.clone(), Query::new()); + queue.send(msg.clone()).unwrap(); + let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + let name_id: NameType = id.into(); + assert_eq!(result.get_document_id(), &name_id); + } + #[test] fn can_register_multiple_names_at_once() { let mut tester = TestQueue::new(); @@ -1533,7 +1594,7 @@ mod queues { MsgAction::Register(data) => match data.get_msg() { RegMsg::DocumentNameID(data) => data, _ => unreachable!("got {:?}, should have returned id", data), - } + }, _ => unreachable!("got {:?}, should have returned id", action), }; for name in names.iter() { @@ -1549,7 +1610,7 @@ mod queues { MsgAction::Register(data) => match data.get_msg() { RegMsg::DocumentNameID(data) => data, _ => unreachable!("got {:?}, should have returned id", data), - } + }, _ => unreachable!("got {:?}, should have returned id", action), }; assert_eq!(result, id); @@ -4355,7 +4416,7 @@ impl DocumentFile { doc.listen(); }); let reply = msg.response(Reply::new()); - queue.send(reply).unwrap(); + queue.send(reply.clone()).unwrap(); } fn listen(&mut self) { @@ -5857,29 +5918,69 @@ mod createdocs { use super::support_test::TIMEOUT; use super::*; - fn setup_create_doc() -> (Queue, Receiver) { - let routes = [ - Path::new(Include::All, Include::All, Include::Some(Action::Reply)), - Path::new(Include::All, Include::All, Include::Some(Action::Records)), - Path::new(Include::All, Include::All, Include::Some(Action::Error)), - ] - .to_vec(); - let mut queue = Queue::new(); - let (tx, rx) = channel(); - let id = queue.add_sender(tx); - for route in routes.iter() { - let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone())); - queue.send(Message::new(NameType::None, regmsg)); - rx.recv_timeout(TIMEOUT).unwrap(); + struct TestCreateDoc { + queue: Queue, + rx: Receiver, + rx_id: Uuid, + } + + impl TestCreateDoc { + fn new() -> Self { + let mut queue = Queue::new(); + let (tx, rx) = channel(); + let id = queue.add_sender(tx); + CreateDoc::start(queue.clone()); + Self { + queue: queue, + rx: rx, + rx_id: id, + } + } + + fn get_queue(&self) -> Queue { + self.queue.clone() + } + + fn get_receiver(&self) -> &Receiver { + &self.rx + } + + fn get_document_id(&self, name: &Name) -> Uuid { + let reg_request = Register::new(self.rx_id.clone(), RegMsg::GetNameID(name.clone())); + self.queue + .send(Message::new(NameType::None, reg_request)) + .unwrap(); + let info = self.rx.recv_timeout(TIMEOUT).unwrap(); + match info.get_action() { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::DocumentNameID(ident) => ident.clone(), + _ => unreachable!("should not get here"), + }, + _ => unreachable!("should not get here"), + } + } + + fn register_paths(&self, paths: Vec) { + for path in paths.iter() { + let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone())); + self.queue.send(Message::new(NameType::None, regmsg)); + self.rx.recv_timeout(TIMEOUT).unwrap(); + } } - CreateDoc::start(queue.clone()); - (queue, rx) } #[test] fn create_document_creation() { + let doc_creator = TestCreateDoc::new(); + let paths = [ + Path::new(Include::All, Include::All, Include::Some(Action::Reply)), + Path::new(Include::All, Include::All, Include::Some(Action::Records)), + ] + .to_vec(); + doc_creator.register_paths(paths); + let mut queue = doc_creator.get_queue(); + let rx = doc_creator.get_receiver(); let name = Name::english("project"); - let (queue, rx) = setup_create_doc(); let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); queue.send(msg1.clone()).unwrap(); let result1 = rx.recv_timeout(TIMEOUT).unwrap(); @@ -5893,11 +5994,12 @@ mod createdocs { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } + let doc_id: NameType = doc_creator.get_document_id(&name).into(); let msg2 = Message::new(name, Query::new()); queue.send(msg2.clone()).unwrap(); let result2 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result2.get_message_id(), msg2.get_message_id()); - assert_eq!(result2.get_document_id(), msg2.get_document_id()); + assert_eq!(result2.get_document_id(), &doc_id); match result2.get_action() { MsgAction::Records(data) => assert_eq!(data.len(), 0), _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), @@ -5906,12 +6008,20 @@ mod createdocs { #[test] fn does_duplicates_generate_error() { + let doc_creator = TestCreateDoc::new(); + let paths = [Path::new( + Include::All, + Include::All, + Include::Some(Action::Error), + )] + .to_vec(); + doc_creator.register_paths(paths); + let mut queue = doc_creator.get_queue(); + let rx = doc_creator.get_receiver(); let name = Name::english("duplicate"); - let (queue, rx) = setup_create_doc(); let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); let msg2 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone()))); queue.send(msg1.clone()).unwrap(); - rx.recv_timeout(TIMEOUT).unwrap(); queue.send(msg2.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg2.get_message_id()); @@ -5976,7 +6086,7 @@ mod clocks { let (tx, rx) = channel(); let id = queue.add_sender(tx); let request = Register::new( - id, + id.clone(), RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), ); queue.send(Message::new(NameType::None, request)).unwrap(); @@ -5990,10 +6100,22 @@ mod clocks { let end = Utc::now(); assert!((end - start) > TimeDelta::seconds(1)); assert!((end - start) < TimeDelta::seconds(2)); + let reg_request = Register::new(id, RegMsg::GetNameID(Name::english("clock"))); + queue + .send(Message::new(NameType::None, reg_request)) + .unwrap(); + let info = rx.recv_timeout(TIMEOUT).unwrap(); + let doc_id = match info.get_action() { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::DocumentNameID(ident) => ident.clone(), + _ => unreachable!("should not get here"), + }, + _ => unreachable!("should not get here"), + }; for msg in holder.iter() { let name_id = msg.get_document_id(); match name_id { - NameType::Name(data) => assert_eq!(data, &Name::english("clock")), + NameType::ID(data) => assert_eq!(data, &doc_id), _ => unreachable!("got {:?}, should have been clock", name_id), } let action = msg.get_action();