From 2c431d55f96843a17ab9132184df31ff16c1e267 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Sun, 20 Jul 2025 10:39:15 -0400 Subject: [PATCH] Added routes to multple senders. --- src/message.rs | 107 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 70 insertions(+), 37 deletions(-) diff --git a/src/message.rs b/src/message.rs index fd1f8a7..62684f2 100644 --- a/src/message.rs +++ b/src/message.rs @@ -47,6 +47,12 @@ impl From for NameID { } } +impl From<&NameID> for NameID { + fn from(value: &NameID) -> Self { + value.clone() + } +} + #[derive(Clone)] struct Message { msg_id: Uuid, @@ -151,7 +157,7 @@ impl Route { struct QueueData { senders: HashMap>, names: HashMap, - routes: HashMap, + routes: HashMap>, } impl QueueData { @@ -163,10 +169,28 @@ impl QueueData { } } + fn get_doc_id(&self, nameid: N) -> Result + where + N: Into, + { + let sender_id = match nameid.into() { + NameID::Name(name) => match self.names.get(&name) { + Some(id) => id.clone(), + None => return Err(MTTError::DocumentNotFound(name.clone())), + }, + NameID::ID(id) => id.clone(), + }; + if self.senders.contains_key(&sender_id) { + Ok(sender_id) + } else { + Err(MTTError::DocumentNotFound(sender_id.to_string())) + } + } + fn register(&mut self, name: String, tx: Sender) -> Result { - match self.names.get(&name) { - Some(_) => return Err(MTTError::DocumentAlreadyExists(name)), - None => {} + match self.get_doc_id(name.as_str()) { + Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)), + Err(_) => (), } let mut id = Uuid::new_v4(); while self.senders.contains_key(&id) { @@ -178,35 +202,46 @@ impl QueueData { } fn send(&self, msg: Message) -> Result<(), MTTError> { - let doc_id = match msg.get_document_id() { - NameID::Name(name) => match self.names.get(name) { - Some(id) => Some(id.clone()), - None => return Err(MTTError::DocumentNotFound(name.clone())), - }, - NameID::ID(id) => Some(id.clone()), + let doc_id = match self.get_doc_id(msg.get_document_id()) { + Ok(id) => id.clone(), + Err(err) => return Err(err), }; - let route = Route::new(doc_id, msg.get_action().clone()); - let sender_id = match self.routes.get(&route) { - Some(sender_id) => sender_id, - None => return Ok(()), - }; - let tx = self.senders.get(sender_id).unwrap(); - tx.send(msg).unwrap(); + let route = Route::new(Some(doc_id), msg.get_action().clone()); + match self.routes.get(&route) { + Some(senders) => { + for sender_id in senders.iter() { + let tx = self.senders.get(sender_id).unwrap(); + tx.send(msg.clone()).unwrap(); + } + } + None => {} + } Ok(()) } - fn add_route( + fn add_route( &mut self, sender_id: &Uuid, - doc_type: Option, + doc_type: Option, action: Action, - ) -> Result<(), MTTError> { + ) -> Result<(), MTTError> + where + N: Into, + { let doc_id = match doc_type { - Some(name) => Some(self.names.get(&name).unwrap().clone()), + Some(data) => match self.get_doc_id(data) { + Ok(id) => Some(id.clone()), + Err(err) => return Err(err), + }, None => None, }; let route = Route::new(doc_id, action); - self.routes.insert(route, sender_id.clone()); + match self.routes.get_mut(&route) { + Some(mut senders) => senders.push(sender_id.clone()), + None => { + self.routes.insert(route, [sender_id.clone()].to_vec()); + } + } Ok(()) } } @@ -300,27 +335,25 @@ mod queuedatas { } } - /* #[test] - fn can_messages_be_directed() { + fn can_more_than_one_document_respond() { let mut queuedata = QueueData::new(); + let name1 = "task"; + let name2 = "work"; + let action = Action::Query; let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); - let id1 = queuedata.register("task".to_string(), tx1); - let id2 = queuedata.register("work".to_string(), tx2); - let msg = Message::new("task".to_string(), Action::Query); + let id1 = queuedata.register(name1.to_string(), tx1).unwrap(); + let id2 = queuedata.register(name2.to_string(), tx2).unwrap(); + queuedata.add_route(&id1, Some(name1.to_string()), action.clone()); + queuedata.add_route(&id2, Some(name1.to_string()), action.clone()); + let msg = Message::new(name1, action.clone()); queuedata.send(msg.clone()).unwrap(); - let result = rx1.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - match rx2.recv_timeout(TIMEOUT) { - Ok(_) => unreachable!("should timeout"), - Err(err) => match err { - RecvTimeoutError::Timeout => {}, - _ => unreachable!("should timeout"), - } - } + let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); + let result2 = rx2.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result1.get_message_id(), msg.get_message_id()); + assert_eq!(result1.get_message_id(), result2.get_message_id()); } - */ } #[derive(Clone)]