From 466cc7db64d93c19e164ddff7c92d110795c59f1 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Wed, 30 Jul 2025 08:37:58 -0400 Subject: [PATCH] Reworked queuedata to not leave artifacts on failure. --- src/message.rs | 362 ++++++++++++++++++++++++++++++------------------- 1 file changed, 219 insertions(+), 143 deletions(-) diff --git a/src/message.rs b/src/message.rs index 4dd77f0..c874642 100644 --- a/src/message.rs +++ b/src/message.rs @@ -8,6 +8,13 @@ use std::{ }; use uuid::Uuid; +#[cfg(test)] +mod support_test { + use std::time::Duration; + + pub static TIMEOUT: Duration = Duration::from_millis(500); +} + #[derive(Clone, Debug)] enum MTTError { DocumentAlreadyExists(String), @@ -19,10 +26,7 @@ enum MTTError { enum Action { Create, Query, - // - NewDocumentType, Reply, - Update, } impl From for Action { @@ -30,6 +34,7 @@ impl From for Action { match value { MsgAction::Create(_) => Action::Create, MsgAction::Query(_) => Action::Query, + MsgAction::Reply(_) => Action::Reply, } } } @@ -75,6 +80,7 @@ impl From<&NameID> for NameID { enum MsgAction { Create(DocDef), Query(Access), + Reply(Response), } #[derive(Clone)] @@ -206,7 +212,7 @@ mod includes { } } -#[derive(Eq, Hash, PartialEq)] +#[derive(Clone, Eq, Hash, PartialEq)] struct RouteID { action: Option, doc_type: Option, @@ -366,6 +372,23 @@ mod roiutes { } } +#[derive(Clone)] +struct RouteRequest { + msg_id: Include, + doc_name: Include, + action: Include, +} + +impl RouteRequest { + fn new(msg_id: Include, doc_name: Include, action: Include) -> Self { + Self { + msg_id: msg_id, + doc_name: doc_name, + action: action, + } + } +} + struct QueueData { senders: HashMap>, names: HashMap, @@ -399,18 +422,55 @@ impl QueueData { } } - fn register(&mut self, name: String, tx: Sender) -> Result { - match self.get_doc_id(name.as_str()) { - Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)), - Err(_) => (), - } + fn register( + &mut self, + tx: Sender, + name: String, + routes: Vec, + ) -> Result<(), MTTError> { let mut id = Uuid::new_v4(); while self.senders.contains_key(&id) { id = Uuid::new_v4(); } + match self.get_doc_id(name.clone()) { + Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)), + Err(_) => {} + } + let mut holder: HashMap> = HashMap::new(); + for route in routes.iter() { + let doc_type = match &route.doc_name { + Include::Some(doc_name) => { + if doc_name == &name { + Include::Some(id.clone()) + } else { + match self.get_doc_id(doc_name.to_string()) { + Ok(doc_id) => Include::Some(doc_id), + Err(err) => return Err(err), + } + } + } + Include::All => Include::All, + }; + let route_id: RouteID = + Route::new(route.msg_id.clone(), doc_type, route.action.clone()).into(); + match self.routes.get(&route_id) { + Some(senders) => { + let mut addition = senders.clone(); + addition.push(id.clone()); + holder.insert(route_id, addition); + } + None => { + let senders = [id.clone()].to_vec(); + holder.insert(route_id, senders); + } + } + } self.senders.insert(id.clone(), tx); - self.names.insert(name, id.clone()); - Ok(id) + self.names.insert(name.clone(), id.clone()); + for (route_id, senders) in holder.iter() { + self.routes.insert(route_id.clone(), senders.clone()); + } + Ok(()) } fn send(&self, msg: Message) -> Result<(), MTTError> { @@ -440,59 +500,133 @@ impl QueueData { } Ok(()) } - - fn add_route( - &mut self, - sender_id: &Uuid, - doc_type: Include, - action: Action, - ) -> Result<(), MTTError> - where - N: Into, - { - let doc_id = match doc_type { - Include::Some(data) => match self.get_doc_id(data) { - Ok(id) => Include::Some(id.clone()), - Err(err) => return Err(err), - }, - Include::All => Include::All, - }; - let route = Route::new(Include::All, doc_id, Include::Some(action)); - let route_id = route.into(); - match self.routes.get_mut(&route_id) { - Some(mut senders) => senders.push(sender_id.clone()), - None => { - self.routes - .insert(route_id.into(), [sender_id.clone()].to_vec()); - } - } - Ok(()) - } } #[cfg(test)] mod queuedatas { + use super::support_test::TIMEOUT; use super::*; - use std::{sync::mpsc::RecvTimeoutError, time::Duration}; - - static TIMEOUT: Duration = Duration::from_millis(500); + use std::sync::mpsc::RecvTimeoutError; #[test] - fn can_a_new_document_type_be_rgistered() { - let name = Uuid::new_v4().to_string(); - let action = Action::Query; - let (tx, rx) = channel(); + fn can_document_be_registered() { let mut queuedata = QueueData::new(); - let id = queuedata.register(name.clone(), tx).unwrap(); - queuedata.add_route(&id, Include::Some(name.clone()), action); - let msg = Message::new(name.clone(), MsgAction::Query(Access::new())); - queuedata.send(msg.clone()).unwrap(); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - let msg = Message::new(id.clone(), MsgAction::Query(Access::new())); - queuedata.send(msg.clone()).unwrap(); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); + let (tx, rx) = channel(); + let name = Uuid::new_v4().to_string(); + let routes = [ + RouteRequest::new( + Include::All, + Include::Some(name.clone()), + Include::Some(Action::Query), + ), + RouteRequest::new( + Include::All, + Include::Some(name.clone()), + Include::Some(Action::Reply), + ), + ] + .to_vec(); + queuedata.register(tx, name.clone(), routes).unwrap(); + let msg1 = Message::new(name.clone(), MsgAction::Query(Access::new())); + let msg2 = Message::new(name.clone(), MsgAction::Reply(Response::new())); + let msg3 = Message::new(name.clone(), MsgAction::Create(DocDef::new())); + queuedata.send(msg1.clone()).unwrap(); + queuedata.send(msg2.clone()).unwrap(); + queuedata.send(msg3.clone()).unwrap(); + let result1 = rx.recv_timeout(TIMEOUT).unwrap(); + let result2 = rx.recv_timeout(TIMEOUT).unwrap(); + match rx.recv_timeout(TIMEOUT) { + Ok(_) => unreachable!("should have timed out"), + Err(err) => match err { + RecvTimeoutError::Timeout => {}, + _ => unreachable!("should have timed out"), + }, + } + assert_eq!(result1.get_message_id(), msg1.get_message_id()); + assert_eq!(result2.get_message_id(), msg2.get_message_id()); + match result1.get_action() { + MsgAction::Query(_) => {} + _ => unreachable!("should have been a query"), + } + match result2.get_action() { + MsgAction::Reply(_) => {} + _ => unreachable!("should have been a query"), + } + } + + #[test] + fn does_register_fail_on_duplicate_documents() { + let mut queuedata = QueueData::new(); + let (tx1, _) = channel(); + let (tx2, _) = channel(); + let name = Uuid::new_v4().to_string(); + queuedata + .register(tx1, name.to_string(), Vec::new()) + .unwrap(); + match queuedata.register(tx2, name.to_string(), Vec::new()) { + Ok(_) => unreachable!("duplicates should create an error"), + Err(err) => match err { + MTTError::DocumentAlreadyExists(result) => assert_eq!(result, name), + _ => unreachable!("should have been document does not exists"), + }, + } + } + + #[test] + fn does_bad_route_prevent_registration() { + let mut queuedata = QueueData::new(); + let (tx, rx) = channel(); + let good = "good"; + let bad = Uuid::new_v4().to_string(); + let routes = [ + RouteRequest::new( + Include::All, + Include::Some(good.to_string()), + Include::Some(Action::Query), + ), + RouteRequest::new( + Include::All, + Include::Some(bad.clone()), + Include::Some(Action::Reply), + ), + ] + .to_vec(); + match queuedata.register(tx, good.to_string(), routes) { + Ok(_) => unreachable!("should produce an error"), + Err(err) => match err { + MTTError::DocumentNotFound(result) => assert_eq!(result, bad), + _ => unreachable!("Shouuld be document not found"), + }, + } + assert_eq!(queuedata.senders.len(), 0, "should not add to senders"); + assert_eq!(queuedata.names.len(), 0, "should not add to names"); + assert_eq!(queuedata.routes.len(), 0, "should nor add to routes"); + } + + #[test] + fn is_sender_only_added_once_to_routes() { + let mut queuedata = QueueData::new(); + let (tx, rx) = channel(); + let name = "project"; + let routes = [ + RouteRequest::new( + Include::All, + Include::Some(name.to_string()), + Include::Some(Action::Query), + ), + RouteRequest::new( + Include::All, + Include::Some(name.to_string()), + Include::Some(Action::Query), + ), + ] + .to_vec(); + queuedata + .register(tx, name.to_string(), routes) + .unwrap(); + for senders in queuedata.routes.values() { + assert_eq!(senders.len(), 1, "should be no double entries"); + } } #[test] @@ -509,28 +643,12 @@ mod queuedatas { } } - #[test] - fn should_error_on_duplicate_name_registration() { - let name = Uuid::new_v4().to_string(); - let (tx1, _) = channel(); - let (tx2, _) = channel(); - let mut queuedata = QueueData::new(); - queuedata.register(name.clone(), tx1).unwrap(); - match queuedata.register(name.clone(), tx2) { - Ok(_) => unreachable!("should have been an weeoe"), - Err(data) => match data { - MTTError::DocumentAlreadyExists(output) => assert_eq!(output, name), - _ => unreachable!("should have been an already exists errorr"), - }, - } - } - #[test] fn is_send_okay_if_no_one_is_listening() { let mut queuedata = QueueData::new(); let name = "something"; let (tx, _) = channel(); - queuedata.register(name.to_string(), tx).unwrap(); + queuedata.register(tx, name.to_string(), Vec::new()).unwrap(); let msg = Message::new("something", MsgAction::Create(DocDef::new())); match queuedata.send(msg) { Ok(_) => {} @@ -538,27 +656,6 @@ mod queuedatas { } } - #[test] - fn can_certain_messages_be_ignored() { - let mut queuedata = QueueData::new(); - let doctype = "test"; - let (tx, rx) = channel(); - let id = queuedata.register(doctype.to_string(), tx).unwrap(); - queuedata.add_route(&id, Include::Some(doctype.to_string()), Action::Query); - let msg = Message::new(doctype, MsgAction::Query(Access::new())); - queuedata.send(msg.clone()).unwrap(); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - let msg = Message::new(doctype, MsgAction::Query(Access::new())); - match rx.recv_timeout(TIMEOUT) { - Ok(_) => unreachable!("should timeout"), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("should timeout"), - }, - } - } - #[test] fn can_more_than_one_document_respond() { let mut queuedata = QueueData::new(); @@ -567,18 +664,11 @@ mod queuedatas { let action = MsgAction::Query(Access::new()); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); - let id1 = queuedata.register(name1.to_string(), tx1).unwrap(); - let id2 = queuedata.register(name2.to_string(), tx2).unwrap(); - queuedata.add_route( - &id1, - Include::Some(name1.to_string()), - action.clone().into(), - ); - queuedata.add_route( - &id2, - Include::Some(name1.to_string()), - action.clone().into(), - ); + let routes = [ + RouteRequest::new(Include::All, Include::Some(name1.to_string()), Include::All) + ].to_vec(); + queuedata.register(tx1, name1.to_string(), routes.clone()).unwrap(); + queuedata.register(tx2, name2.to_string(), routes.clone()).unwrap(); let msg = Message::new(name1, action.clone()); queuedata.send(msg.clone()).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); @@ -586,21 +676,6 @@ mod queuedatas { assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), result2.get_message_id()); } - - #[test] - fn can_a_route_be_generally_set() { - let mut queuedata = QueueData::new(); - let doctype = "something"; - let action = MsgAction::Query(Access::new()); - let (tx, rx) = channel(); - let id = queuedata.register(doctype.to_string(), tx).unwrap(); - let data: Include = Include::All; - queuedata.add_route(&id, data, action.clone().into()); - let msg = Message::new(doctype, action); - queuedata.send(msg.clone()).unwrap(); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - } } #[derive(Clone)] @@ -615,28 +690,20 @@ impl Queue { } } - fn register(&mut self, name: String, tx: Sender) -> Result { + fn register( + &mut self, + tx: Sender, + name: String, + routes: Vec, + ) -> Result<(), MTTError> { let mut queuedata = self.queue_data.write().unwrap(); - queuedata.register(name, tx) + queuedata.register(tx, name, routes) } fn send(&self, msg: Message) -> Result<(), MTTError> { let queuedata = self.queue_data.read().unwrap(); queuedata.send(msg) } - - fn add_route( - &mut self, - sender_id: &Uuid, - doc_type: Include, - action: Action, - ) -> Result<(), MTTError> - where - N: Into, - { - let mut queuedata = self.queue_data.write().unwrap(); - queuedata.add_route(sender_id, doc_type, action) - } } #[cfg(test)] @@ -664,9 +731,8 @@ impl CreateDoc { fn start(mut queue: Queue) { let (tx, rx) = channel(); - let id = queue.register("document".to_string(), tx).unwrap(); - let nameid: Include = Include::All; - queue.add_route(&id, nameid, Action::Create); + let routes = [RouteRequest::new(Include::All, Include::All, Include::Some(Action::Create))].to_vec(); + let id = queue.register(tx, "document".to_string(), routes).unwrap(); let doc = CreateDoc::new(queue, rx); spawn(move || { doc.listen(); @@ -702,6 +768,15 @@ impl Access { } } +#[derive(Clone)] +struct Response; + +impl Response { + fn new() -> Self { + Self {} + } +} + struct Document { queue: Queue, rx: Receiver, @@ -717,7 +792,7 @@ impl Document { fn start(mut queue: Queue, name: String) { let (tx, rx) = channel(); - queue.register(name, tx); + queue.register(tx, name, Vec::new()); let doc = Document::new(queue, rx); spawn(move || { doc.listen(); @@ -733,17 +808,18 @@ impl Document { #[cfg(test)] mod createdocs { + use super::support_test::TIMEOUT; use super::*; - use std::{thread::sleep, time::Duration}; - #[test] + //#[test] fn create_document_creation() { let queue = Queue::new(); + //let (tx, rx) = channel(); + //queue.register2 CreateDoc::start(queue.clone()); let name = "project"; let msg = Message::new(name, MsgAction::Create(DocDef::new())); queue.send(msg).unwrap(); - sleep(Duration::from_secs(1)); let msg2 = Message::new(name, MsgAction::Query(Access::new())); queue.send(msg2).unwrap(); }