From 0553524ec69e56e609ac23ada306c33bdc3501ca Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Fri, 1 Aug 2025 10:58:40 -0400 Subject: [PATCH] Got the system to error on duplicate documents. --- src/message.rs | 139 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 116 insertions(+), 23 deletions(-) diff --git a/src/message.rs b/src/message.rs index 0565cf9..0ed9ad5 100644 --- a/src/message.rs +++ b/src/message.rs @@ -19,12 +19,12 @@ mod support_test { enum MTTError { DocumentAlreadyExists(String), DocumentNotFound(String), - RouteNoListeners, } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum Action { Create, + Error, Query, Reply, } @@ -33,6 +33,7 @@ impl From for Action { fn from(value: MsgAction) -> Self { match value { MsgAction::Create(_) => Action::Create, + MsgAction::Error(_) => Action::Error, MsgAction::Query(_) => Action::Query, MsgAction::Reply(_) => Action::Reply, } @@ -76,9 +77,10 @@ impl From<&NameID> for NameID { } } -#[derive(Clone)] +#[derive(Clone, Debug)] enum MsgAction { Create(DocDef), + Error(MTTError), Query(Access), Reply(Response), } @@ -121,6 +123,14 @@ impl Message { action: MsgAction::Reply(resp), } } + + fn error(&self, err: MTTError) -> Self { + Self { + msg_id: self.msg_id.clone(), + document_id: self.document_id.clone(), + action: MsgAction::Error(err), + } + } } #[cfg(test)] @@ -200,6 +210,27 @@ mod messages { _ => unreachable!("should have been a reply"), } } + + #[test] + fn Can_make_error_message() { + let name = "testing"; + let msg = Message::new(name, MsgAction::Query(Access::new())); + let err_msg = Uuid::new_v4().to_string(); + let result = msg.error(MTTError::DocumentNotFound(err_msg.clone())); + + assert_eq!(result.get_message_id(), msg.get_message_id()); + match result.get_document_id() { + NameID::Name(data) => assert_eq!(data, name), + _ => unreachable!("should have been a name"), + } + match result.get_action() { + MsgAction::Error(data) => match data { + MTTError::DocumentNotFound(txt) => assert_eq!(txt, &err_msg), + _ => unreachable!("got {:?}, should have received not found", data), + }, + _ => unreachable!("should have been a reply"), + } + } } #[derive(Clone, Debug)] @@ -779,16 +810,12 @@ impl CreateDoc { fn listen(&self) { loop { let msg = self.rx.recv().unwrap(); - match msg.get_document_id() { - NameID::Name(name) => Document::start(self.queue.clone(), name.clone()), - NameID::ID(_) => unreachable!("should be a name"), - } - self.queue.send(msg.reply(Response::new())).unwrap(); + Document::start(self.queue.clone(), msg); } } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct DocDef; impl DocDef { @@ -797,7 +824,7 @@ impl DocDef { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct Access; impl Access { @@ -806,7 +833,7 @@ impl Access { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct Response; impl Response { @@ -828,18 +855,39 @@ impl Document { } } - fn start(mut queue: Queue, name: String) { + fn start(mut queue: Queue, msg: Message) { let (tx, rx) = channel(); - queue.register(tx, name, Vec::new()); - let doc = Document::new(queue, rx); + let name = match msg.get_document_id() { + NameID::Name(name) => name.clone(), + NameID::ID(id) => id.to_string(), + }; + let routes = [RouteRequest::new( + Include::All, + Include::Some(name.clone()), + Include::Some(Action::Query), + )] + .to_vec(); + match queue.register(tx, name, routes) { + Ok(_) => {} + Err(err) => { + let error = msg.error(err); + queue.send(error).unwrap(); + return; + } + } + let doc = Document::new(queue.clone(), rx); spawn(move || { doc.listen(); }); + let reply = msg.reply(Response::new()); + queue.send(reply).unwrap(); } fn listen(&self) { loop { - self.rx.recv().unwrap(); + let msg = self.rx.recv().unwrap(); + let reply = msg.reply(Response::new()); + self.queue.send(reply).unwrap(); } } } @@ -849,23 +897,68 @@ mod createdocs { use super::support_test::TIMEOUT; use super::*; - #[test] - fn create_document_creation() { + fn setup_create_doc(routes: Vec) -> (Queue, Receiver) { let mut queue = Queue::new(); let (tx, rx) = channel(); + queue + .register(tx, Uuid::new_v4().to_string(), routes) + .unwrap(); + CreateDoc::start(queue.clone()); + (queue, rx) + } + + #[test] + fn create_document_creation() { + let name = "project"; let routes = [RouteRequest::new( Include::All, Include::All, Include::Some(Action::Reply), )] .to_vec(); - queue.register(tx, "testing".to_string(), routes).unwrap(); - CreateDoc::start(queue.clone()); - let name = "project"; - let msg = Message::new(name, MsgAction::Create(DocDef::new())); - queue.send(msg).unwrap(); - rx.recv_timeout(TIMEOUT).unwrap(); + let (queue, rx) = setup_create_doc(routes); + let msg1 = Message::new(name, MsgAction::Create(DocDef::new())); + queue.send(msg1.clone()).unwrap(); + let result1 = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result1.get_message_id(), msg1.get_message_id()); + assert_eq!(result1.get_document_id(), msg1.get_document_id()); + match result1.get_action() { + MsgAction::Reply(_) => {} + _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), + } let msg2 = Message::new(name, MsgAction::Query(Access::new())); - queue.send(msg2).unwrap(); + queue.send(msg2.clone()).unwrap(); + let result2 = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result2.get_message_id(), msg2.get_message_id()); + assert_eq!(result2.get_document_id(), msg2.get_document_id()); + match result2.get_action() { + MsgAction::Reply(_) => {} + _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), + } + } + + #[test] + fn does_duplicates_generate_error() { + let name = "duplicate"; + let routes = [ + RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)), + RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)), + ] + .to_vec(); + let (queue, rx) = setup_create_doc(routes); + let msg = Message::new(name, MsgAction::Create(DocDef::new())); + queue.send(msg.clone()).unwrap(); + rx.recv_timeout(TIMEOUT).unwrap(); + queue.send(msg.clone()).unwrap(); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + assert_eq!(result.get_document_id(), msg.get_document_id()); + match result.get_action() { + MsgAction::Error(err) => match err { + MTTError::DocumentAlreadyExists(data) => assert_eq!(data, name), + _ => unreachable!("got {:?}: should have been a reply.", err), + }, + _ => unreachable!("got {:?}: should have been a reply.", result.get_action()), + } } }