From 837cea4ce0375dcf50c12f5d8f0301c05b825cd5 Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Thu, 26 Mar 2026 12:18:38 -0400 Subject: [PATCH] Added session id to message. --- src/action/action_type.rs | 2 + src/action/message.rs | 2 + src/document/create.rs | 49 +++++++-------- src/document/session.rs | 5 +- src/lib.rs | 25 +++++--- src/message/wrapper.rs | 120 +++++++++++++++++++++++++++---------- src/queue/data_director.rs | 4 +- 7 files changed, 137 insertions(+), 70 deletions(-) diff --git a/src/action/action_type.rs b/src/action/action_type.rs index b66c080..acdf4c0 100644 --- a/src/action/action_type.rs +++ b/src/action/action_type.rs @@ -7,6 +7,7 @@ pub enum Action { Delete, DocumentCreated, Error, + None, OnAddition, OnDelete, OnQuery, @@ -27,6 +28,7 @@ impl From for Action { MsgAction::Delete(_) => Action::Delete, MsgAction::DocumentCreated => Action::DocumentCreated, MsgAction::Error(_) => Action::Error, + MsgAction::None => Action::None, MsgAction::OnAddition(_) => Action::OnAddition, MsgAction::OnDelete(_) => Action::OnDelete, MsgAction::OnQuery(_) => Action::OnQuery, diff --git a/src/action/message.rs b/src/action/message.rs index cfe1f4e..be31690 100644 --- a/src/action/message.rs +++ b/src/action/message.rs @@ -10,6 +10,7 @@ pub enum MsgAction { Delete(Delete), DocumentCreated, Error(MTTError), + None, OnAddition(Records), OnDelete(Records), OnQuery(Records), @@ -30,6 +31,7 @@ impl MessageAction for MsgAction { Self::Delete(data) => data.doc_name(), Self::DocumentCreated => &NameType::None, Self::Error(data) => data.doc_name(), + Self::None => &NameType::None, Self::OnAddition(data) => data.doc_name(), Self::OnDelete(data) => data.doc_name(), Self::OnQuery(data) => data.doc_name(), diff --git a/src/document/create.rs b/src/document/create.rs index c919d12..9e9d441 100644 --- a/src/document/create.rs +++ b/src/document/create.rs @@ -403,15 +403,14 @@ impl DocumentFile { let names = docdef.get_document_names(); let id = queue.add_sender(tx); let reg_msg = Register::new(id.clone(), RegMsg::AddDocName(names.clone())); - let rmsg = msg.response(reg_msg.clone()); - queue.send(rmsg.clone()); + queue.send(msg.set_action(reg_msg)); let name_result = rx.recv().unwrap(); let name_id = match name_result.get_action() { MsgAction::Register(data) => match data.get_msg() { RegMsg::DocumentNameID(data) => data, RegMsg::Error(err) => { queue.remove_sender(&id); - queue.send(msg.response(err.clone())); + queue.send(msg.set_action(err.clone())); return; } _ => unreachable!("should only return a name id or an error"), @@ -420,16 +419,15 @@ impl DocumentFile { }; let mut route_action: HashMap = HashMap::new(); for path_action in docdef.iter_routes() { - let request = reg_msg.response(RegMsg::AddRoute(path_action.path())); - let add_route = rmsg.response(request); - queue.send(add_route); + let reg_req = Register::new(id.clone(), RegMsg::AddRoute(path_action.path())); + queue.send(msg.set_action(reg_req)); let result = rx.recv().unwrap(); let route_id = match result.get_action() { MsgAction::Register(data) => match data.get_msg() { RegMsg::RouteID(data) => data, RegMsg::Error(err) => { queue.remove_sender(&id); - queue.send(msg.response(err.clone())); + queue.send(msg.set_action(err.clone())); return; } _ => unreachable!("should only return a route id or an error"), @@ -448,7 +446,7 @@ impl DocumentFile { spawn(move || { doc.listen(); }); - let reply = msg.response(MsgAction::DocumentCreated); + let reply = msg.set_action(MsgAction::DocumentCreated); queue.send(reply.clone()); } @@ -467,7 +465,7 @@ impl DocumentFile { DocFuncType::Delete => self.delete(&msg), DocFuncType::Query => self.query(&msg), DocFuncType::Show => self.queue.send( - msg.response(Reply::new(self.docdef.get_document_names()[0].clone())), + msg.set_action(Reply::new(self.docdef.get_document_names()[0].clone())), ), DocFuncType::Update => self.update(&msg), DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), @@ -512,7 +510,7 @@ impl DocumentFile { Err(mut err) => { err.add_parent(ErrorID::Field(name.clone())); err.add_parent(ErrorID::Document(msg.doc_name().clone())); - let reply = msg.response(err); + let reply = msg.set_action(err); self.queue.send(reply); return; } @@ -522,7 +520,7 @@ impl DocumentFile { Err(mut err) => { err.add_parent(ErrorID::Field(name.clone().into())); err.add_parent(ErrorID::Document(msg.doc_name().clone())); - let reply = msg.response(err); + let reply = msg.set_action(err); self.queue.send(reply); return; } @@ -536,7 +534,7 @@ impl DocumentFile { Err(mut err) => { err.add_parent(ErrorID::Field(field_id.clone().into())); err.add_parent(ErrorID::Document(msg.doc_name().clone())); - let reply = msg.response(err); + let reply = msg.set_action(err); self.queue.send(reply); return; } @@ -559,9 +557,9 @@ impl DocumentFile { self.docs.insert(oid.clone(), holder.clone()); records.insert(oid, holder); } - self.queue.send(msg.response(records.clone())); + self.queue.send(msg.set_action(records.clone())); self.queue - .send(msg.response(MsgAction::OnAddition(records))); + .send(msg.set_action(MsgAction::OnAddition(records))); } fn delete(&mut self, msg: &Message) { @@ -573,7 +571,7 @@ impl DocumentFile { Ok(data) => data, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); - let reply = msg.response(err); + let reply = msg.set_action(err); self.queue.send(reply); return; } @@ -589,8 +587,8 @@ impl DocumentFile { self.docdef.get_field_names().clone(), records, ); - self.queue.send(msg.response(rec.clone())); - self.queue.send(msg.response(MsgAction::OnDelete(rec))); + self.queue.send(msg.set_action(rec.clone())); + self.queue.send(msg.set_action(MsgAction::OnDelete(rec))); } fn run_query(&self, query: &Query) -> Result { @@ -665,7 +663,7 @@ impl DocumentFile { Ok(data) => data, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); - let reply = msg.response(err); + let reply = msg.set_action(err); self.queue.send(reply); return; } @@ -675,8 +673,8 @@ impl DocumentFile { self.docdef.get_field_names().clone(), records, ); - self.queue.send(msg.response(recs.clone())); - self.queue.send(msg.response(MsgAction::OnQuery(recs))); + self.queue.send(msg.set_action(recs.clone())); + self.queue.send(msg.set_action(MsgAction::OnQuery(recs))); } fn run_update( @@ -739,7 +737,7 @@ impl DocumentFile { updates, ); self.queue - .send(msg.response(MsgAction::OnUpdate(recs.clone()))); + .send(msg.set_action(MsgAction::OnUpdate(recs.clone()))); Ok(recs) } @@ -752,7 +750,7 @@ impl DocumentFile { Ok(result) => result, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); - let reply = msg.response(err); + let reply = msg.set_action(err); self.queue.send(reply); return; } @@ -761,12 +759,12 @@ impl DocumentFile { Ok(output) => output, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); - let reply = msg.response(err); + let reply = msg.set_action(err); self.queue.send(reply); return; } }; - self.queue.send(msg.response(data)); + self.queue.send(msg.set_action(data)); } fn existing_query(&mut self, msg: &Message, action: &MsgAction) { @@ -783,8 +781,7 @@ impl DocumentFile { } fn trigger(&self, msg: &Message, action: &MsgAction) { - self.queue - .send(msg.forward(self.name_id.clone(), action.clone())); + self.queue.send(msg.set_action(action.clone())); } } diff --git a/src/document/session.rs b/src/document/session.rs index df38eb2..b03f350 100644 --- a/src/document/session.rs +++ b/src/document/session.rs @@ -55,9 +55,10 @@ impl Session { Include::Just(Action::DocumentCreated), ); let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone())); - queue.send(Message::with_id(msg_id.clone(), reg_msg)); + let msg = Message::new(reg_msg).set_id(msg_id); + queue.send(msg.clone()); rx.recv().unwrap(); // Wait for completion. - queue.send(Message::with_id(msg_id, Self::document_definition())); + queue.send(msg.set_action(Self::document_definition())); rx.recv().unwrap(); // Wait for completion. queue.remove_sender(&sender_id); } diff --git a/src/lib.rs b/src/lib.rs index d2e6156..e8b2ca7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,9 +72,10 @@ impl MTTClient { } None => {} } + let msg = Message::default().set_id(msg_id.clone()); for path in paths.iter().cloned() { let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path)); - queue.send(Message::with_id(msg_id.clone(), reg_msg)); + queue.send(msg.set_action(reg_msg)); let result = rx.recv().unwrap(); } match sess_id { @@ -87,19 +88,19 @@ impl MTTClient { .unwrap(); calc.add_value(data.clone()).unwrap(); qry.add(Session::id_field_names()[0].clone(), calc); - queue.send(Message::with_id(msg_id.clone(), qry)); + queue.send(msg.set_action(qry)); } - Err(_) => queue.send(Message::with_id(msg_id.clone(), add.clone())), + Err(_) => queue.send(msg.set_action(add.clone())), }; } - None => queue.send(Message::with_id(msg_id.clone(), add.clone())), + None => queue.send(msg.set_action(add.clone())), }; let result = rx.recv().unwrap(); let session_id = match result.get_action() { MsgAction::Records(result) => { let mut holder = result.clone(); if holder.len() == 0 { - queue.send(Message::with_id(msg_id.clone(), add)); + queue.send(msg.set_action(add)); let new_sess = rx.recv().unwrap(); holder = match new_sess.get_action() { MsgAction::Records(new_holder) => new_holder.clone(), @@ -140,12 +141,15 @@ impl MTTClient { Include::Just(Action::Error), ), ]; + let msg = Message::default() + .set_id(msg_id) + .set_session(self.session_id.clone().into()); for path in paths.iter() { let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone())); - self.queue.send(Message::with_id(msg_id.clone(), reg_msg)); + self.queue.send(msg.set_action(reg_msg)); self.rx.recv().unwrap(); // Wait for completion. } - self.queue.send(Message::with_id(msg_id.clone(), docdef)); + self.queue.send(msg.set_action(docdef)); match self.rx.recv_timeout(TIMEOUT) { Ok(data) => match data.get_action() { MsgAction::DocumentCreated => Ok(()), @@ -175,9 +179,12 @@ impl MTTClient { Include::Just(Action::Error), ), ]; + let msg = Message::default() + .set_id(msg_id.clone()) + .set_session(self.session_id.clone().into()); for path in paths.iter() { let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone())); - self.queue.send(Message::new(reg_msg)); + self.queue.send(msg.set_action(reg_msg)); let result = self.rx.recv().unwrap(); let action = result.get_action(); match action { @@ -192,7 +199,7 @@ impl MTTClient { _ => unreachable!("got {:?} should have been a registry message", action), } } - self.queue.send(Message::with_id(msg_id, req)); + self.queue.send(msg.set_action(req)); match self.rx.recv_timeout(TIMEOUT) { Ok(data) => match data.get_action() { MsgAction::Records(data) => Ok(data.clone()), diff --git a/src/message/wrapper.rs b/src/message/wrapper.rs index 960bb83..da02e66 100644 --- a/src/message/wrapper.rs +++ b/src/message/wrapper.rs @@ -42,7 +42,7 @@ pub struct Message { msg_id: MessageID, action: MsgAction, route: Route, - // session: Option + session: Field, } impl Message { @@ -50,19 +50,33 @@ impl Message { where A: Into, { - let msg_id = MessageID::new(); - Self::with_id(msg_id, action) + Self { + msg_id: MessageID::new(), + action: action.into(), + route: Route::default(), + session: Field::None, + } } - pub fn with_id(msg_id: MessageID, action: A) -> Self + pub fn set_id(&self, msg_id: MessageID) -> Self { + let mut output = self.clone(); + output.msg_id = msg_id; + output + } + + pub fn set_session(&self, session: Field) -> Self { + let mut output = self.clone(); + output.session = session; + output + } + + pub fn set_action(&self, action: A) -> Self where A: Into, { - Self { - msg_id: msg_id, - action: action.into(), - route: Route::default(), - } + let mut output = self.clone(); + output.action = action.into(); + output } pub fn get_message_id(&self) -> &MessageID { @@ -88,27 +102,15 @@ impl Message { pub fn set_route(&mut self, route: Route) { self.route = route; } +} - pub fn response(&self, action: A) -> Self - where - A: Into, - { +impl Default for Message { + fn default() -> Self { Self { - msg_id: self.msg_id.clone(), - action: action.into(), - route: Route::default(), - } - } - - pub fn forward(&self, doc_id: D, action: A) -> Self - where - D: Into, - A: Into, - { - Self { - msg_id: self.msg_id.clone(), - action: action.into(), + msg_id: MessageID::new(), + action: MsgAction::None, route: Route::default(), + session: Field::None, } } } @@ -127,6 +129,62 @@ mod messages { name::{name_id_support::test_name_id, Name}, }; + fn is_there_a_default_message() { + let msg = Message::default(); + match msg.action { + MsgAction::None => {} + _ => panic!("should have been no action"), + } + assert_eq!(msg.session, Field::None); + } + + #[test] + fn can_create_new_messsage() { + let doc_name = Name::english("something"); + let qry = Query::new(doc_name.clone()); + let msg = Message::new(qry); + let expected: NameType = doc_name.into(); + match msg.action { + MsgAction::Query(data) => assert_eq!(data.doc_name(), &expected), + _ => unreachable!("should have been a query"), + } + match msg.session { + Field::None => {} + _ => unreachable!("should have been none"), + } + } + + #[test] + fn can_id_be_set() { + let doc_name = Name::english("identification"); + let qry = Query::new(doc_name.clone()); + let msg_id = MessageID::new(); + let msg = Message::new(qry).set_id(msg_id.clone()); + assert_eq!(msg.msg_id, msg_id); + } + + #[test] + fn can_session_be_set() { + let doc_name = Name::english("identification"); + let qry = Query::new(doc_name.clone()); + let sess_id: Field = Uuid::new_v4().into(); + let msg = Message::new(qry).set_session(sess_id.clone()); + assert_eq!(msg.session, sess_id); + } + + #[test] + fn can_action_be_set() { + let doc_name = Name::english("action"); + let expected: NameType = doc_name.clone().into(); + let docdef = DocDef::new(doc_name.clone()); + let qry = Query::new(doc_name.clone()); + let msg = Message::new(docdef).set_action(qry); + match msg.action { + MsgAction::Query(data) => assert_eq!(data.doc_name(), &expected), + _ => unreachable!("should have been a query"), + } + } + #[test] fn can_the_document_be_a_named_reference() { let dts = [Name::english("one"), Name::english("two")]; @@ -204,7 +262,7 @@ mod messages { let name = Name::english("testing"); let msg = Message::new(Query::new(name.clone())); let responce = Reply::new(Name::english("something")); - let reply = msg.response(responce); + let reply = msg.set_action(responce); assert_eq!(reply.get_message_id(), msg.get_message_id()); match reply.get_action() { MsgAction::Reply(_) => {} @@ -217,7 +275,7 @@ mod messages { let name = Name::english("testing"); let msg = Message::new(Query::new(name.clone())); let err_msg = Uuid::new_v4().to_string(); - let result = msg.response(MTTError::new(ErrorID::DocumentNotFound)); + let result = msg.set_action(MTTError::new(ErrorID::DocumentNotFound)); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_action() { MsgAction::Error(data) => match data.get_error_ids().back().unwrap() { @@ -233,8 +291,8 @@ mod messages { let doc_id = test_name_id(); let msg = Message::new(Query::new(doc_id.clone())); let data = Uuid::new_v4().to_string(); - let result1 = msg.response(MTTError::new(ErrorID::DocumentNotFound)); - let result2 = msg.response(Reply::new(NameType::None)); + let result1 = msg.set_action(MTTError::new(ErrorID::DocumentNotFound)); + let result2 = msg.set_action(Reply::new(NameType::None)); assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result2.get_message_id(), msg.get_message_id()); let action1 = result1.get_action(); diff --git a/src/queue/data_director.rs b/src/queue/data_director.rs index ca25a92..76c6d4d 100644 --- a/src/queue/data_director.rs +++ b/src/queue/data_director.rs @@ -474,7 +474,7 @@ impl DocRegistry { match msg.get_action() { MsgAction::Register(data) => { let id = data.get_sender_id(); - let reply = msg.response(self.register_action(data)); + let reply = msg.set_action(self.register_action(data)); self.queue.forward(id, reply); } _ => match self.path_to_route(&msg.get_path()) { @@ -484,7 +484,7 @@ impl DocRegistry { self.queue.forward(sender_id, msg.clone()); } } - Err(err) => self.queue.send(msg.response(MsgAction::Error(err))), + Err(err) => self.queue.send(msg.set_action(MsgAction::Error(err))), }, } }