From a573c8d523af727ff6a762265c0da7d3b20afb11 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Mon, 21 Jul 2025 13:31:23 -0400 Subject: [PATCH] Got the router into a usable state. --- src/message.rs | 118 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 21 deletions(-) diff --git a/src/message.rs b/src/message.rs index 62684f2..81b66f8 100644 --- a/src/message.rs +++ b/src/message.rs @@ -139,14 +139,50 @@ mod messages { } } +#[derive(Clone)] +enum Include { + All, + Some(T), +} + +impl PartialEq for Include { + fn eq(&self, other: &Self) -> bool { + match self { + Include::All => true, + Include::Some(data) => match other { + Include::All => true, + Include::Some(other_data) => data == other_data, + }, + } + } +} + #[derive(Eq, Hash, PartialEq)] -struct Route { +struct RouteID { action: Action, doc_type: Option, } +impl From for RouteID { + fn from(value: Route) -> Self { + Self { + action: value.action, + doc_type: match value.doc_type { + Include::Some(doc) => Some(doc.clone()), + Include::All => None, + }, + } + } +} + +#[derive(Clone, PartialEq)] +struct Route { + action: Action, + doc_type: Include, +} + impl Route { - fn new(doc_type: Option, action: Action) -> Self { + fn new(doc_type: Include, action: Action) -> Self { Self { action: action, doc_type: doc_type, @@ -154,10 +190,34 @@ impl Route { } } +impl From for Route { + fn from(value: RouteID) -> Self { + Self { + action: value.action, + doc_type: match value.doc_type { + Some(doc) => Include::Some(doc.clone()), + None => Include::All, + }, + } + } +} + +impl From<&RouteID> for Route { + fn from(value: &RouteID) -> Self { + Self { + action: value.action.clone(), + doc_type: match value.doc_type { + Some(doc) => Include::Some(doc.clone()), + None => Include::All, + }, + } + } +} + struct QueueData { senders: HashMap>, names: HashMap, - routes: HashMap>, + routes: HashMap>, } impl QueueData { @@ -202,19 +262,18 @@ impl QueueData { } fn send(&self, msg: Message) -> Result<(), MTTError> { - let doc_id = match self.get_doc_id(msg.get_document_id()) { - Ok(id) => id.clone(), + let doc_id: Include = match self.get_doc_id(msg.get_document_id()) { + Ok(id) => Include::Some(id.clone()), Err(err) => return Err(err), }; - let route = Route::new(Some(doc_id), msg.get_action().clone()); - match self.routes.get(&route) { - Some(senders) => { - for sender_id in senders.iter() { - let tx = self.senders.get(sender_id).unwrap(); + let route = Route::new(doc_id, msg.get_action().clone()); + for (send_route, send_ids) in self.routes.iter() { + if route == send_route.into() { + for send_id in send_ids { + let tx = self.senders.get(&send_id).unwrap(); tx.send(msg.clone()).unwrap(); } } - None => {} } Ok(()) } @@ -222,24 +281,26 @@ impl QueueData { fn add_route( &mut self, sender_id: &Uuid, - doc_type: Option, + doc_type: Include, action: Action, ) -> Result<(), MTTError> where N: Into, { let doc_id = match doc_type { - Some(data) => match self.get_doc_id(data) { - Ok(id) => Some(id.clone()), + Include::Some(data) => match self.get_doc_id(data) { + Ok(id) => Include::Some(id.clone()), Err(err) => return Err(err), }, - None => None, + Include::All => Include::All, }; let route = Route::new(doc_id, action); - match self.routes.get_mut(&route) { + let route_id = route.into(); + match self.routes.get_mut(&route_id) { Some(mut senders) => senders.push(sender_id.clone()), None => { - self.routes.insert(route, [sender_id.clone()].to_vec()); + self.routes + .insert(route_id.into(), [sender_id.clone()].to_vec()); } } Ok(()) @@ -260,7 +321,7 @@ mod queuedatas { let (tx, rx) = channel(); let mut queuedata = QueueData::new(); let id = queuedata.register(name.clone(), tx).unwrap(); - queuedata.add_route(&id, Some(name.clone()), action); + queuedata.add_route(&id, Include::Some(name.clone()), action); let msg = Message::new(name.clone(), Action::Query); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); @@ -320,7 +381,7 @@ mod queuedatas { let doctype = "test"; let (tx, rx) = channel(); let id = queuedata.register(doctype.to_string(), tx).unwrap(); - queuedata.add_route(&id, Some(doctype.to_string()), Action::Query); + queuedata.add_route(&id, Include::Some(doctype.to_string()), Action::Query); let msg = Message::new(doctype, Action::Query); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); @@ -345,8 +406,8 @@ mod queuedatas { let (tx2, rx2) = channel(); let id1 = queuedata.register(name1.to_string(), tx1).unwrap(); let id2 = queuedata.register(name2.to_string(), tx2).unwrap(); - queuedata.add_route(&id1, Some(name1.to_string()), action.clone()); - queuedata.add_route(&id2, Some(name1.to_string()), action.clone()); + queuedata.add_route(&id1, Include::Some(name1.to_string()), action.clone()); + queuedata.add_route(&id2, Include::Some(name1.to_string()), action.clone()); let msg = Message::new(name1, action.clone()); queuedata.send(msg.clone()).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); @@ -354,6 +415,21 @@ mod queuedatas { assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), result2.get_message_id()); } + + #[test] + fn can_a_route_be_generally_set() { + let mut queuedata = QueueData::new(); + let doctype = "something"; + let action = Action::Query; + let (tx, rx) = channel(); + let id = queuedata.register(doctype.to_string(), tx).unwrap(); + let data: Include = Include::All; + queuedata.add_route(&id, data, action.clone()); + let msg = Message::new(doctype, action); + queuedata.send(msg.clone()).unwrap(); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + } } #[derive(Clone)]