Got the router into a usable state.

This commit is contained in:
Jeff Baskin 2025-07-21 13:31:23 -04:00
parent 2c431d55f9
commit a573c8d523

View File

@ -139,14 +139,50 @@ mod messages {
} }
} }
#[derive(Clone)]
enum Include<T> {
All,
Some(T),
}
impl<T: PartialEq> PartialEq for Include<T> {
fn eq(&self, other: &Self) -> bool {
match self {
Include::All => true,
Include::Some(data) => match other {
Include::All => true,
Include::Some(other_data) => data == other_data,
},
}
}
}
#[derive(Eq, Hash, PartialEq)] #[derive(Eq, Hash, PartialEq)]
struct Route { struct RouteID {
action: Action, action: Action,
doc_type: Option<Uuid>, doc_type: Option<Uuid>,
} }
impl From<Route> for RouteID {
fn from(value: Route) -> Self {
Self {
action: value.action,
doc_type: match value.doc_type {
Include::Some(doc) => Some(doc.clone()),
Include::All => None,
},
}
}
}
#[derive(Clone, PartialEq)]
struct Route {
action: Action,
doc_type: Include<Uuid>,
}
impl Route { impl Route {
fn new(doc_type: Option<Uuid>, action: Action) -> Self { fn new(doc_type: Include<Uuid>, action: Action) -> Self {
Self { Self {
action: action, action: action,
doc_type: doc_type, doc_type: doc_type,
@ -154,10 +190,34 @@ impl Route {
} }
} }
impl From<RouteID> for Route {
fn from(value: RouteID) -> Self {
Self {
action: value.action,
doc_type: match value.doc_type {
Some(doc) => Include::Some(doc.clone()),
None => Include::All,
},
}
}
}
impl From<&RouteID> for Route {
fn from(value: &RouteID) -> Self {
Self {
action: value.action.clone(),
doc_type: match value.doc_type {
Some(doc) => Include::Some(doc.clone()),
None => Include::All,
},
}
}
}
struct QueueData { struct QueueData {
senders: HashMap<Uuid, Sender<Message>>, senders: HashMap<Uuid, Sender<Message>>,
names: HashMap<String, Uuid>, names: HashMap<String, Uuid>,
routes: HashMap<Route, Vec<Uuid>>, routes: HashMap<RouteID, Vec<Uuid>>,
} }
impl QueueData { impl QueueData {
@ -202,19 +262,18 @@ impl QueueData {
} }
fn send(&self, msg: Message) -> Result<(), MTTError> { fn send(&self, msg: Message) -> Result<(), MTTError> {
let doc_id = match self.get_doc_id(msg.get_document_id()) { let doc_id: Include<Uuid> = match self.get_doc_id(msg.get_document_id()) {
Ok(id) => id.clone(), Ok(id) => Include::Some(id.clone()),
Err(err) => return Err(err), Err(err) => return Err(err),
}; };
let route = Route::new(Some(doc_id), msg.get_action().clone()); let route = Route::new(doc_id, msg.get_action().clone());
match self.routes.get(&route) { for (send_route, send_ids) in self.routes.iter() {
Some(senders) => { if route == send_route.into() {
for sender_id in senders.iter() { for send_id in send_ids {
let tx = self.senders.get(sender_id).unwrap(); let tx = self.senders.get(&send_id).unwrap();
tx.send(msg.clone()).unwrap(); tx.send(msg.clone()).unwrap();
} }
} }
None => {}
} }
Ok(()) Ok(())
} }
@ -222,24 +281,26 @@ impl QueueData {
fn add_route<N>( fn add_route<N>(
&mut self, &mut self,
sender_id: &Uuid, sender_id: &Uuid,
doc_type: Option<N>, doc_type: Include<N>,
action: Action, action: Action,
) -> Result<(), MTTError> ) -> Result<(), MTTError>
where where
N: Into<NameID>, N: Into<NameID>,
{ {
let doc_id = match doc_type { let doc_id = match doc_type {
Some(data) => match self.get_doc_id(data) { Include::Some(data) => match self.get_doc_id(data) {
Ok(id) => Some(id.clone()), Ok(id) => Include::Some(id.clone()),
Err(err) => return Err(err), Err(err) => return Err(err),
}, },
None => None, Include::All => Include::All,
}; };
let route = Route::new(doc_id, action); let route = Route::new(doc_id, action);
match self.routes.get_mut(&route) { let route_id = route.into();
match self.routes.get_mut(&route_id) {
Some(mut senders) => senders.push(sender_id.clone()), Some(mut senders) => senders.push(sender_id.clone()),
None => { None => {
self.routes.insert(route, [sender_id.clone()].to_vec()); self.routes
.insert(route_id.into(), [sender_id.clone()].to_vec());
} }
} }
Ok(()) Ok(())
@ -260,7 +321,7 @@ mod queuedatas {
let (tx, rx) = channel(); let (tx, rx) = channel();
let mut queuedata = QueueData::new(); let mut queuedata = QueueData::new();
let id = queuedata.register(name.clone(), tx).unwrap(); let id = queuedata.register(name.clone(), tx).unwrap();
queuedata.add_route(&id, Some(name.clone()), action); queuedata.add_route(&id, Include::Some(name.clone()), action);
let msg = Message::new(name.clone(), Action::Query); let msg = Message::new(name.clone(), Action::Query);
queuedata.send(msg.clone()).unwrap(); queuedata.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap();
@ -320,7 +381,7 @@ mod queuedatas {
let doctype = "test"; let doctype = "test";
let (tx, rx) = channel(); let (tx, rx) = channel();
let id = queuedata.register(doctype.to_string(), tx).unwrap(); let id = queuedata.register(doctype.to_string(), tx).unwrap();
queuedata.add_route(&id, Some(doctype.to_string()), Action::Query); queuedata.add_route(&id, Include::Some(doctype.to_string()), Action::Query);
let msg = Message::new(doctype, Action::Query); let msg = Message::new(doctype, Action::Query);
queuedata.send(msg.clone()).unwrap(); queuedata.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap();
@ -345,8 +406,8 @@ mod queuedatas {
let (tx2, rx2) = channel(); let (tx2, rx2) = channel();
let id1 = queuedata.register(name1.to_string(), tx1).unwrap(); let id1 = queuedata.register(name1.to_string(), tx1).unwrap();
let id2 = queuedata.register(name2.to_string(), tx2).unwrap(); let id2 = queuedata.register(name2.to_string(), tx2).unwrap();
queuedata.add_route(&id1, Some(name1.to_string()), action.clone()); queuedata.add_route(&id1, Include::Some(name1.to_string()), action.clone());
queuedata.add_route(&id2, Some(name1.to_string()), action.clone()); queuedata.add_route(&id2, Include::Some(name1.to_string()), action.clone());
let msg = Message::new(name1, action.clone()); let msg = Message::new(name1, action.clone());
queuedata.send(msg.clone()).unwrap(); queuedata.send(msg.clone()).unwrap();
let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap();
@ -354,6 +415,21 @@ mod queuedatas {
assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), msg.get_message_id());
assert_eq!(result1.get_message_id(), result2.get_message_id()); assert_eq!(result1.get_message_id(), result2.get_message_id());
} }
#[test]
fn can_a_route_be_generally_set() {
let mut queuedata = QueueData::new();
let doctype = "something";
let action = Action::Query;
let (tx, rx) = channel();
let id = queuedata.register(doctype.to_string(), tx).unwrap();
let data: Include<String> = Include::All;
queuedata.add_route(&id, data, action.clone());
let msg = Message::new(doctype, action);
queuedata.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
} }
#[derive(Clone)] #[derive(Clone)]