Added routes to multple senders.

This commit is contained in:
Jeff Baskin 2025-07-20 10:39:15 -04:00
parent f673673dac
commit 2c431d55f9

View File

@ -47,6 +47,12 @@ impl From<Uuid> for NameID {
} }
} }
impl From<&NameID> for NameID {
fn from(value: &NameID) -> Self {
value.clone()
}
}
#[derive(Clone)] #[derive(Clone)]
struct Message { struct Message {
msg_id: Uuid, msg_id: Uuid,
@ -151,7 +157,7 @@ impl Route {
struct QueueData { struct QueueData {
senders: HashMap<Uuid, Sender<Message>>, senders: HashMap<Uuid, Sender<Message>>,
names: HashMap<String, Uuid>, names: HashMap<String, Uuid>,
routes: HashMap<Route, Uuid>, routes: HashMap<Route, Vec<Uuid>>,
} }
impl QueueData { impl QueueData {
@ -163,10 +169,28 @@ impl QueueData {
} }
} }
fn get_doc_id<N>(&self, nameid: N) -> Result<Uuid, MTTError>
where
N: Into<NameID>,
{
let sender_id = match nameid.into() {
NameID::Name(name) => match self.names.get(&name) {
Some(id) => id.clone(),
None => return Err(MTTError::DocumentNotFound(name.clone())),
},
NameID::ID(id) => id.clone(),
};
if self.senders.contains_key(&sender_id) {
Ok(sender_id)
} else {
Err(MTTError::DocumentNotFound(sender_id.to_string()))
}
}
fn register(&mut self, name: String, tx: Sender<Message>) -> Result<Uuid, MTTError> { fn register(&mut self, name: String, tx: Sender<Message>) -> Result<Uuid, MTTError> {
match self.names.get(&name) { match self.get_doc_id(name.as_str()) {
Some(_) => return Err(MTTError::DocumentAlreadyExists(name)), Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)),
None => {} Err(_) => (),
} }
let mut id = Uuid::new_v4(); let mut id = Uuid::new_v4();
while self.senders.contains_key(&id) { while self.senders.contains_key(&id) {
@ -178,35 +202,46 @@ impl QueueData {
} }
fn send(&self, msg: Message) -> Result<(), MTTError> { fn send(&self, msg: Message) -> Result<(), MTTError> {
let doc_id = match msg.get_document_id() { let doc_id = match self.get_doc_id(msg.get_document_id()) {
NameID::Name(name) => match self.names.get(name) { Ok(id) => id.clone(),
Some(id) => Some(id.clone()), Err(err) => return Err(err),
None => return Err(MTTError::DocumentNotFound(name.clone())),
},
NameID::ID(id) => Some(id.clone()),
}; };
let route = Route::new(doc_id, msg.get_action().clone()); let route = Route::new(Some(doc_id), msg.get_action().clone());
let sender_id = match self.routes.get(&route) { match self.routes.get(&route) {
Some(sender_id) => sender_id, Some(senders) => {
None => return Ok(()), for sender_id in senders.iter() {
}; let tx = self.senders.get(sender_id).unwrap();
let tx = self.senders.get(sender_id).unwrap(); tx.send(msg.clone()).unwrap();
tx.send(msg).unwrap(); }
}
None => {}
}
Ok(()) Ok(())
} }
fn add_route( fn add_route<N>(
&mut self, &mut self,
sender_id: &Uuid, sender_id: &Uuid,
doc_type: Option<String>, doc_type: Option<N>,
action: Action, action: Action,
) -> Result<(), MTTError> { ) -> Result<(), MTTError>
where
N: Into<NameID>,
{
let doc_id = match doc_type { let doc_id = match doc_type {
Some(name) => Some(self.names.get(&name).unwrap().clone()), Some(data) => match self.get_doc_id(data) {
Ok(id) => Some(id.clone()),
Err(err) => return Err(err),
},
None => None, None => None,
}; };
let route = Route::new(doc_id, action); let route = Route::new(doc_id, action);
self.routes.insert(route, sender_id.clone()); match self.routes.get_mut(&route) {
Some(mut senders) => senders.push(sender_id.clone()),
None => {
self.routes.insert(route, [sender_id.clone()].to_vec());
}
}
Ok(()) Ok(())
} }
} }
@ -300,27 +335,25 @@ mod queuedatas {
} }
} }
/*
#[test] #[test]
fn can_messages_be_directed() { fn can_more_than_one_document_respond() {
let mut queuedata = QueueData::new(); let mut queuedata = QueueData::new();
let name1 = "task";
let name2 = "work";
let action = Action::Query;
let (tx1, rx1) = channel(); let (tx1, rx1) = channel();
let (tx2, rx2) = channel(); let (tx2, rx2) = channel();
let id1 = queuedata.register("task".to_string(), tx1); let id1 = queuedata.register(name1.to_string(), tx1).unwrap();
let id2 = queuedata.register("work".to_string(), tx2); let id2 = queuedata.register(name2.to_string(), tx2).unwrap();
let msg = Message::new("task".to_string(), Action::Query); queuedata.add_route(&id1, Some(name1.to_string()), action.clone());
queuedata.add_route(&id2, Some(name1.to_string()), action.clone());
let msg = Message::new(name1, action.clone());
queuedata.send(msg.clone()).unwrap(); queuedata.send(msg.clone()).unwrap();
let result = rx1.recv_timeout(TIMEOUT).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id()); let result2 = rx2.recv_timeout(TIMEOUT).unwrap();
match rx2.recv_timeout(TIMEOUT) { assert_eq!(result1.get_message_id(), msg.get_message_id());
Ok(_) => unreachable!("should timeout"), assert_eq!(result1.get_message_id(), result2.get_message_id());
Err(err) => match err {
RecvTimeoutError::Timeout => {},
_ => unreachable!("should timeout"),
}
}
} }
*/
} }
#[derive(Clone)] #[derive(Clone)]