Queue replaces message name with id.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s

This commit is contained in:
Jeff Baskin 2025-11-11 21:28:46 -05:00
parent 8b269836c2
commit 47bbf65907

View File

@ -38,6 +38,7 @@ enum MTTError {
NameMissingTranslation(Language),
NameNotFound(Name),
QueryCannotChangeData,
RouteRequiresDocumentID,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
@ -291,7 +292,10 @@ impl Message {
)
}
fn reset_name_id<NT>(&mut self, name: NT) where NT: Into<NameType> {
fn reset_name_id<NT>(&mut self, name: NT)
where
NT: Into<NameType>,
{
self.document_id = name.into();
}
@ -663,12 +667,16 @@ impl Names {
if self.ids.contains_key(&data) {
Ok(data)
} else {
Err(MTTError::NameNotFound(Name::english(
data.to_string().as_str(),
)))
if data == Uuid::nil() {
Ok(data)
} else {
Err(MTTError::NameNotFound(Name::english(
data.to_string().as_str(),
)))
}
}
}
NameType::None => Err(MTTError::NameNotFound(Name::english("none"))),
NameType::None => Ok(Uuid::nil()),
}
}
@ -1072,6 +1080,25 @@ impl From<&RouteID> for Route {
}
}
impl TryFrom<Path> for Route {
type Error = MTTError;
fn try_from(value: Path) -> Result<Self, Self::Error> {
let doc = match value.doc {
Include::Some(data) => match data {
NameType::ID(id) => Include::Some(id.clone()),
_ => return Err(MTTError::RouteRequiresDocumentID),
},
Include::All => Include::All,
};
Ok(Self {
action: value.action,
doc_type: doc,
msg_id: value.msg_id,
})
}
}
#[cfg(test)]
mod routes {
use super::*;
@ -1194,33 +1221,32 @@ impl DocRegistry {
fn listen(&mut self) {
loop {
let msg = self.receiver.recv().unwrap();
let mut msg = self.receiver.recv().unwrap();
match msg.get_action() {
MsgAction::Register(data) => {
let id = data.get_sender_id();
let reply = msg.response(self.register_action(data));
self.queue.forward(id, reply);
}
_ => {
let path = msg.get_path();
match self.doc_names.path_to_route(&path) {
Ok(route) => {
let mut send_to: HashSet<Uuid> = HashSet::new();
for (route_id, senders) in self.routes.iter() {
if route == route_id.into() {
send_to = send_to.union(senders).cloned().collect();
}
}
for send_id in send_to.iter() {
self.queue.forward(send_id, msg.clone());
_ => match self.doc_names.get_id(msg.get_document_id()) {
Ok(doc_id) => {
msg.reset_name_id(doc_id);
let route: Route = msg.get_path().try_into().unwrap();
let mut send_to: HashSet<Uuid> = HashSet::new();
for (route_id, senders) in self.routes.iter() {
if route == route_id.into() {
send_to = send_to.union(senders).cloned().collect();
}
}
Err(err) => self
.queue
.send(msg.response(MsgAction::Error(err)))
.unwrap(),
for send_id in send_to.iter() {
self.queue.forward(send_id, msg.clone());
}
}
}
Err(err) => self
.queue
.send(msg.response(MsgAction::Error(err)))
.unwrap(),
},
}
}
}
@ -1247,7 +1273,7 @@ impl DocRegistry {
RegMsg::GetNameID(name) => match self.doc_names.get_id(name) {
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
Err(err) => reg.response(RegMsg::Error(err)),
}
},
_ => reg.response(RegMsg::Ok),
}
}
@ -1516,6 +1542,41 @@ mod queues {
}
}
#[test]
fn does_name_id_get_updated() {
let mut tester = TestQueue::new();
let mut queue = tester.get_queue();
let doc_name = Name::english("test");
let reg_msg = Register::new(
tester.get_preset_id().clone(),
RegMsg::AddDocName([doc_name.clone()].to_vec()),
);
let msg = Message::new(NameType::None, reg_msg);
queue.send(msg.clone()).unwrap();
let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
let id = match action {
MsgAction::Register(data) => match data.get_msg() {
RegMsg::DocumentNameID(data) => data.clone(),
_ => unreachable!("got {:?}, should have been register ok", action),
},
_ => unreachable!("got {:?}, should have been register ok", action),
};
let reg_msg = Register::new(
tester.get_preset_id().clone(),
RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)),
);
let msg = Message::new(NameType::None, reg_msg);
queue.send(msg.clone()).unwrap();
tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap();
let msg = Message::new(doc_name.clone(), Query::new());
queue.send(msg.clone()).unwrap();
let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
let name_id: NameType = id.into();
assert_eq!(result.get_document_id(), &name_id);
}
#[test]
fn can_register_multiple_names_at_once() {
let mut tester = TestQueue::new();
@ -1533,7 +1594,7 @@ mod queues {
MsgAction::Register(data) => match data.get_msg() {
RegMsg::DocumentNameID(data) => data,
_ => unreachable!("got {:?}, should have returned id", data),
}
},
_ => unreachable!("got {:?}, should have returned id", action),
};
for name in names.iter() {
@ -1549,7 +1610,7 @@ mod queues {
MsgAction::Register(data) => match data.get_msg() {
RegMsg::DocumentNameID(data) => data,
_ => unreachable!("got {:?}, should have returned id", data),
}
},
_ => unreachable!("got {:?}, should have returned id", action),
};
assert_eq!(result, id);
@ -4355,7 +4416,7 @@ impl DocumentFile {
doc.listen();
});
let reply = msg.response(Reply::new());
queue.send(reply).unwrap();
queue.send(reply.clone()).unwrap();
}
fn listen(&mut self) {
@ -5857,29 +5918,69 @@ mod createdocs {
use super::support_test::TIMEOUT;
use super::*;
fn setup_create_doc() -> (Queue, Receiver<Message>) {
let routes = [
Path::new(Include::All, Include::All, Include::Some(Action::Reply)),
Path::new(Include::All, Include::All, Include::Some(Action::Records)),
Path::new(Include::All, Include::All, Include::Some(Action::Error)),
]
.to_vec();
let mut queue = Queue::new();
let (tx, rx) = channel();
let id = queue.add_sender(tx);
for route in routes.iter() {
let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone()));
queue.send(Message::new(NameType::None, regmsg));
rx.recv_timeout(TIMEOUT).unwrap();
struct TestCreateDoc {
queue: Queue,
rx: Receiver<Message>,
rx_id: Uuid,
}
impl TestCreateDoc {
fn new() -> Self {
let mut queue = Queue::new();
let (tx, rx) = channel();
let id = queue.add_sender(tx);
CreateDoc::start(queue.clone());
Self {
queue: queue,
rx: rx,
rx_id: id,
}
}
fn get_queue(&self) -> Queue {
self.queue.clone()
}
fn get_receiver(&self) -> &Receiver<Message> {
&self.rx
}
fn get_document_id(&self, name: &Name) -> Uuid {
let reg_request = Register::new(self.rx_id.clone(), RegMsg::GetNameID(name.clone()));
self.queue
.send(Message::new(NameType::None, reg_request))
.unwrap();
let info = self.rx.recv_timeout(TIMEOUT).unwrap();
match info.get_action() {
MsgAction::Register(data) => match data.get_msg() {
RegMsg::DocumentNameID(ident) => ident.clone(),
_ => unreachable!("should not get here"),
},
_ => unreachable!("should not get here"),
}
}
fn register_paths(&self, paths: Vec<Path>) {
for path in paths.iter() {
let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone()));
self.queue.send(Message::new(NameType::None, regmsg));
self.rx.recv_timeout(TIMEOUT).unwrap();
}
}
CreateDoc::start(queue.clone());
(queue, rx)
}
#[test]
fn create_document_creation() {
let doc_creator = TestCreateDoc::new();
let paths = [
Path::new(Include::All, Include::All, Include::Some(Action::Reply)),
Path::new(Include::All, Include::All, Include::Some(Action::Records)),
]
.to_vec();
doc_creator.register_paths(paths);
let mut queue = doc_creator.get_queue();
let rx = doc_creator.get_receiver();
let name = Name::english("project");
let (queue, rx) = setup_create_doc();
let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
queue.send(msg1.clone()).unwrap();
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
@ -5893,11 +5994,12 @@ mod createdocs {
MsgAction::Reply(_) => {}
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
}
let doc_id: NameType = doc_creator.get_document_id(&name).into();
let msg2 = Message::new(name, Query::new());
queue.send(msg2.clone()).unwrap();
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result2.get_message_id(), msg2.get_message_id());
assert_eq!(result2.get_document_id(), msg2.get_document_id());
assert_eq!(result2.get_document_id(), &doc_id);
match result2.get_action() {
MsgAction::Records(data) => assert_eq!(data.len(), 0),
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
@ -5906,12 +6008,20 @@ mod createdocs {
#[test]
fn does_duplicates_generate_error() {
let doc_creator = TestCreateDoc::new();
let paths = [Path::new(
Include::All,
Include::All,
Include::Some(Action::Error),
)]
.to_vec();
doc_creator.register_paths(paths);
let mut queue = doc_creator.get_queue();
let rx = doc_creator.get_receiver();
let name = Name::english("duplicate");
let (queue, rx) = setup_create_doc();
let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
let msg2 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
queue.send(msg1.clone()).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
queue.send(msg2.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg2.get_message_id());
@ -5976,7 +6086,7 @@ mod clocks {
let (tx, rx) = channel();
let id = queue.add_sender(tx);
let request = Register::new(
id,
id.clone(),
RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)),
);
queue.send(Message::new(NameType::None, request)).unwrap();
@ -5990,10 +6100,22 @@ mod clocks {
let end = Utc::now();
assert!((end - start) > TimeDelta::seconds(1));
assert!((end - start) < TimeDelta::seconds(2));
let reg_request = Register::new(id, RegMsg::GetNameID(Name::english("clock")));
queue
.send(Message::new(NameType::None, reg_request))
.unwrap();
let info = rx.recv_timeout(TIMEOUT).unwrap();
let doc_id = match info.get_action() {
MsgAction::Register(data) => match data.get_msg() {
RegMsg::DocumentNameID(ident) => ident.clone(),
_ => unreachable!("should not get here"),
},
_ => unreachable!("should not get here"),
};
for msg in holder.iter() {
let name_id = msg.get_document_id();
match name_id {
NameType::Name(data) => assert_eq!(data, &Name::english("clock")),
NameType::ID(data) => assert_eq!(data, &doc_id),
_ => unreachable!("got {:?}, should have been clock", name_id),
}
let action = msg.get_action();