Added session id to message.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Has been cancelled

This commit is contained in:
2026-03-26 12:18:38 -04:00
parent 678e433632
commit 837cea4ce0
7 changed files with 137 additions and 70 deletions

View File

@@ -7,6 +7,7 @@ pub enum Action {
Delete, Delete,
DocumentCreated, DocumentCreated,
Error, Error,
None,
OnAddition, OnAddition,
OnDelete, OnDelete,
OnQuery, OnQuery,
@@ -27,6 +28,7 @@ impl From<MsgAction> for Action {
MsgAction::Delete(_) => Action::Delete, MsgAction::Delete(_) => Action::Delete,
MsgAction::DocumentCreated => Action::DocumentCreated, MsgAction::DocumentCreated => Action::DocumentCreated,
MsgAction::Error(_) => Action::Error, MsgAction::Error(_) => Action::Error,
MsgAction::None => Action::None,
MsgAction::OnAddition(_) => Action::OnAddition, MsgAction::OnAddition(_) => Action::OnAddition,
MsgAction::OnDelete(_) => Action::OnDelete, MsgAction::OnDelete(_) => Action::OnDelete,
MsgAction::OnQuery(_) => Action::OnQuery, MsgAction::OnQuery(_) => Action::OnQuery,

View File

@@ -10,6 +10,7 @@ pub enum MsgAction {
Delete(Delete), Delete(Delete),
DocumentCreated, DocumentCreated,
Error(MTTError), Error(MTTError),
None,
OnAddition(Records), OnAddition(Records),
OnDelete(Records), OnDelete(Records),
OnQuery(Records), OnQuery(Records),
@@ -30,6 +31,7 @@ impl MessageAction for MsgAction {
Self::Delete(data) => data.doc_name(), Self::Delete(data) => data.doc_name(),
Self::DocumentCreated => &NameType::None, Self::DocumentCreated => &NameType::None,
Self::Error(data) => data.doc_name(), Self::Error(data) => data.doc_name(),
Self::None => &NameType::None,
Self::OnAddition(data) => data.doc_name(), Self::OnAddition(data) => data.doc_name(),
Self::OnDelete(data) => data.doc_name(), Self::OnDelete(data) => data.doc_name(),
Self::OnQuery(data) => data.doc_name(), Self::OnQuery(data) => data.doc_name(),

View File

@@ -403,15 +403,14 @@ impl DocumentFile {
let names = docdef.get_document_names(); let names = docdef.get_document_names();
let id = queue.add_sender(tx); let id = queue.add_sender(tx);
let reg_msg = Register::new(id.clone(), RegMsg::AddDocName(names.clone())); let reg_msg = Register::new(id.clone(), RegMsg::AddDocName(names.clone()));
let rmsg = msg.response(reg_msg.clone()); queue.send(msg.set_action(reg_msg));
queue.send(rmsg.clone());
let name_result = rx.recv().unwrap(); let name_result = rx.recv().unwrap();
let name_id = match name_result.get_action() { let name_id = match name_result.get_action() {
MsgAction::Register(data) => match data.get_msg() { MsgAction::Register(data) => match data.get_msg() {
RegMsg::DocumentNameID(data) => data, RegMsg::DocumentNameID(data) => data,
RegMsg::Error(err) => { RegMsg::Error(err) => {
queue.remove_sender(&id); queue.remove_sender(&id);
queue.send(msg.response(err.clone())); queue.send(msg.set_action(err.clone()));
return; return;
} }
_ => unreachable!("should only return a name id or an error"), _ => unreachable!("should only return a name id or an error"),
@@ -420,16 +419,15 @@ impl DocumentFile {
}; };
let mut route_action: HashMap<RouteID, DocFuncType> = HashMap::new(); let mut route_action: HashMap<RouteID, DocFuncType> = HashMap::new();
for path_action in docdef.iter_routes() { for path_action in docdef.iter_routes() {
let request = reg_msg.response(RegMsg::AddRoute(path_action.path())); let reg_req = Register::new(id.clone(), RegMsg::AddRoute(path_action.path()));
let add_route = rmsg.response(request); queue.send(msg.set_action(reg_req));
queue.send(add_route);
let result = rx.recv().unwrap(); let result = rx.recv().unwrap();
let route_id = match result.get_action() { let route_id = match result.get_action() {
MsgAction::Register(data) => match data.get_msg() { MsgAction::Register(data) => match data.get_msg() {
RegMsg::RouteID(data) => data, RegMsg::RouteID(data) => data,
RegMsg::Error(err) => { RegMsg::Error(err) => {
queue.remove_sender(&id); queue.remove_sender(&id);
queue.send(msg.response(err.clone())); queue.send(msg.set_action(err.clone()));
return; return;
} }
_ => unreachable!("should only return a route id or an error"), _ => unreachable!("should only return a route id or an error"),
@@ -448,7 +446,7 @@ impl DocumentFile {
spawn(move || { spawn(move || {
doc.listen(); doc.listen();
}); });
let reply = msg.response(MsgAction::DocumentCreated); let reply = msg.set_action(MsgAction::DocumentCreated);
queue.send(reply.clone()); queue.send(reply.clone());
} }
@@ -467,7 +465,7 @@ impl DocumentFile {
DocFuncType::Delete => self.delete(&msg), DocFuncType::Delete => self.delete(&msg),
DocFuncType::Query => self.query(&msg), DocFuncType::Query => self.query(&msg),
DocFuncType::Show => self.queue.send( DocFuncType::Show => self.queue.send(
msg.response(Reply::new(self.docdef.get_document_names()[0].clone())), msg.set_action(Reply::new(self.docdef.get_document_names()[0].clone())),
), ),
DocFuncType::Update => self.update(&msg), DocFuncType::Update => self.update(&msg),
DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action),
@@ -512,7 +510,7 @@ impl DocumentFile {
Err(mut err) => { Err(mut err) => {
err.add_parent(ErrorID::Field(name.clone())); err.add_parent(ErrorID::Field(name.clone()));
err.add_parent(ErrorID::Document(msg.doc_name().clone())); err.add_parent(ErrorID::Document(msg.doc_name().clone()));
let reply = msg.response(err); let reply = msg.set_action(err);
self.queue.send(reply); self.queue.send(reply);
return; return;
} }
@@ -522,7 +520,7 @@ impl DocumentFile {
Err(mut err) => { Err(mut err) => {
err.add_parent(ErrorID::Field(name.clone().into())); err.add_parent(ErrorID::Field(name.clone().into()));
err.add_parent(ErrorID::Document(msg.doc_name().clone())); err.add_parent(ErrorID::Document(msg.doc_name().clone()));
let reply = msg.response(err); let reply = msg.set_action(err);
self.queue.send(reply); self.queue.send(reply);
return; return;
} }
@@ -536,7 +534,7 @@ impl DocumentFile {
Err(mut err) => { Err(mut err) => {
err.add_parent(ErrorID::Field(field_id.clone().into())); err.add_parent(ErrorID::Field(field_id.clone().into()));
err.add_parent(ErrorID::Document(msg.doc_name().clone())); err.add_parent(ErrorID::Document(msg.doc_name().clone()));
let reply = msg.response(err); let reply = msg.set_action(err);
self.queue.send(reply); self.queue.send(reply);
return; return;
} }
@@ -559,9 +557,9 @@ impl DocumentFile {
self.docs.insert(oid.clone(), holder.clone()); self.docs.insert(oid.clone(), holder.clone());
records.insert(oid, holder); records.insert(oid, holder);
} }
self.queue.send(msg.response(records.clone())); self.queue.send(msg.set_action(records.clone()));
self.queue self.queue
.send(msg.response(MsgAction::OnAddition(records))); .send(msg.set_action(MsgAction::OnAddition(records)));
} }
fn delete(&mut self, msg: &Message) { fn delete(&mut self, msg: &Message) {
@@ -573,7 +571,7 @@ impl DocumentFile {
Ok(data) => data, Ok(data) => data,
Err(mut err) => { Err(mut err) => {
err.add_parent(ErrorID::Document(msg.doc_name().into())); err.add_parent(ErrorID::Document(msg.doc_name().into()));
let reply = msg.response(err); let reply = msg.set_action(err);
self.queue.send(reply); self.queue.send(reply);
return; return;
} }
@@ -589,8 +587,8 @@ impl DocumentFile {
self.docdef.get_field_names().clone(), self.docdef.get_field_names().clone(),
records, records,
); );
self.queue.send(msg.response(rec.clone())); self.queue.send(msg.set_action(rec.clone()));
self.queue.send(msg.response(MsgAction::OnDelete(rec))); self.queue.send(msg.set_action(MsgAction::OnDelete(rec)));
} }
fn run_query(&self, query: &Query) -> Result<InternalRecords, MTTError> { fn run_query(&self, query: &Query) -> Result<InternalRecords, MTTError> {
@@ -665,7 +663,7 @@ impl DocumentFile {
Ok(data) => data, Ok(data) => data,
Err(mut err) => { Err(mut err) => {
err.add_parent(ErrorID::Document(msg.doc_name().into())); err.add_parent(ErrorID::Document(msg.doc_name().into()));
let reply = msg.response(err); let reply = msg.set_action(err);
self.queue.send(reply); self.queue.send(reply);
return; return;
} }
@@ -675,8 +673,8 @@ impl DocumentFile {
self.docdef.get_field_names().clone(), self.docdef.get_field_names().clone(),
records, records,
); );
self.queue.send(msg.response(recs.clone())); self.queue.send(msg.set_action(recs.clone()));
self.queue.send(msg.response(MsgAction::OnQuery(recs))); self.queue.send(msg.set_action(MsgAction::OnQuery(recs)));
} }
fn run_update( fn run_update(
@@ -739,7 +737,7 @@ impl DocumentFile {
updates, updates,
); );
self.queue self.queue
.send(msg.response(MsgAction::OnUpdate(recs.clone()))); .send(msg.set_action(MsgAction::OnUpdate(recs.clone())));
Ok(recs) Ok(recs)
} }
@@ -752,7 +750,7 @@ impl DocumentFile {
Ok(result) => result, Ok(result) => result,
Err(mut err) => { Err(mut err) => {
err.add_parent(ErrorID::Document(msg.doc_name().into())); err.add_parent(ErrorID::Document(msg.doc_name().into()));
let reply = msg.response(err); let reply = msg.set_action(err);
self.queue.send(reply); self.queue.send(reply);
return; return;
} }
@@ -761,12 +759,12 @@ impl DocumentFile {
Ok(output) => output, Ok(output) => output,
Err(mut err) => { Err(mut err) => {
err.add_parent(ErrorID::Document(msg.doc_name().into())); err.add_parent(ErrorID::Document(msg.doc_name().into()));
let reply = msg.response(err); let reply = msg.set_action(err);
self.queue.send(reply); self.queue.send(reply);
return; return;
} }
}; };
self.queue.send(msg.response(data)); self.queue.send(msg.set_action(data));
} }
fn existing_query(&mut self, msg: &Message, action: &MsgAction) { fn existing_query(&mut self, msg: &Message, action: &MsgAction) {
@@ -783,8 +781,7 @@ impl DocumentFile {
} }
fn trigger(&self, msg: &Message, action: &MsgAction) { fn trigger(&self, msg: &Message, action: &MsgAction) {
self.queue self.queue.send(msg.set_action(action.clone()));
.send(msg.forward(self.name_id.clone(), action.clone()));
} }
} }

View File

@@ -55,9 +55,10 @@ impl Session {
Include::Just(Action::DocumentCreated), Include::Just(Action::DocumentCreated),
); );
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone())); let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path.clone()));
queue.send(Message::with_id(msg_id.clone(), reg_msg)); let msg = Message::new(reg_msg).set_id(msg_id);
queue.send(msg.clone());
rx.recv().unwrap(); // Wait for completion. rx.recv().unwrap(); // Wait for completion.
queue.send(Message::with_id(msg_id, Self::document_definition())); queue.send(msg.set_action(Self::document_definition()));
rx.recv().unwrap(); // Wait for completion. rx.recv().unwrap(); // Wait for completion.
queue.remove_sender(&sender_id); queue.remove_sender(&sender_id);
} }

View File

@@ -72,9 +72,10 @@ impl MTTClient {
} }
None => {} None => {}
} }
let msg = Message::default().set_id(msg_id.clone());
for path in paths.iter().cloned() { for path in paths.iter().cloned() {
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path)); let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path));
queue.send(Message::with_id(msg_id.clone(), reg_msg)); queue.send(msg.set_action(reg_msg));
let result = rx.recv().unwrap(); let result = rx.recv().unwrap();
} }
match sess_id { match sess_id {
@@ -87,19 +88,19 @@ impl MTTClient {
.unwrap(); .unwrap();
calc.add_value(data.clone()).unwrap(); calc.add_value(data.clone()).unwrap();
qry.add(Session::id_field_names()[0].clone(), calc); qry.add(Session::id_field_names()[0].clone(), calc);
queue.send(Message::with_id(msg_id.clone(), qry)); queue.send(msg.set_action(qry));
} }
Err(_) => queue.send(Message::with_id(msg_id.clone(), add.clone())), Err(_) => queue.send(msg.set_action(add.clone())),
}; };
} }
None => queue.send(Message::with_id(msg_id.clone(), add.clone())), None => queue.send(msg.set_action(add.clone())),
}; };
let result = rx.recv().unwrap(); let result = rx.recv().unwrap();
let session_id = match result.get_action() { let session_id = match result.get_action() {
MsgAction::Records(result) => { MsgAction::Records(result) => {
let mut holder = result.clone(); let mut holder = result.clone();
if holder.len() == 0 { if holder.len() == 0 {
queue.send(Message::with_id(msg_id.clone(), add)); queue.send(msg.set_action(add));
let new_sess = rx.recv().unwrap(); let new_sess = rx.recv().unwrap();
holder = match new_sess.get_action() { holder = match new_sess.get_action() {
MsgAction::Records(new_holder) => new_holder.clone(), MsgAction::Records(new_holder) => new_holder.clone(),
@@ -140,12 +141,15 @@ impl MTTClient {
Include::Just(Action::Error), Include::Just(Action::Error),
), ),
]; ];
let msg = Message::default()
.set_id(msg_id)
.set_session(self.session_id.clone().into());
for path in paths.iter() { for path in paths.iter() {
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone())); let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
self.queue.send(Message::with_id(msg_id.clone(), reg_msg)); self.queue.send(msg.set_action(reg_msg));
self.rx.recv().unwrap(); // Wait for completion. self.rx.recv().unwrap(); // Wait for completion.
} }
self.queue.send(Message::with_id(msg_id.clone(), docdef)); self.queue.send(msg.set_action(docdef));
match self.rx.recv_timeout(TIMEOUT) { match self.rx.recv_timeout(TIMEOUT) {
Ok(data) => match data.get_action() { Ok(data) => match data.get_action() {
MsgAction::DocumentCreated => Ok(()), MsgAction::DocumentCreated => Ok(()),
@@ -175,9 +179,12 @@ impl MTTClient {
Include::Just(Action::Error), Include::Just(Action::Error),
), ),
]; ];
let msg = Message::default()
.set_id(msg_id.clone())
.set_session(self.session_id.clone().into());
for path in paths.iter() { for path in paths.iter() {
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone())); let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
self.queue.send(Message::new(reg_msg)); self.queue.send(msg.set_action(reg_msg));
let result = self.rx.recv().unwrap(); let result = self.rx.recv().unwrap();
let action = result.get_action(); let action = result.get_action();
match action { match action {
@@ -192,7 +199,7 @@ impl MTTClient {
_ => unreachable!("got {:?} should have been a registry message", action), _ => unreachable!("got {:?} should have been a registry message", action),
} }
} }
self.queue.send(Message::with_id(msg_id, req)); self.queue.send(msg.set_action(req));
match self.rx.recv_timeout(TIMEOUT) { match self.rx.recv_timeout(TIMEOUT) {
Ok(data) => match data.get_action() { Ok(data) => match data.get_action() {
MsgAction::Records(data) => Ok(data.clone()), MsgAction::Records(data) => Ok(data.clone()),

View File

@@ -42,7 +42,7 @@ pub struct Message {
msg_id: MessageID, msg_id: MessageID,
action: MsgAction, action: MsgAction,
route: Route, route: Route,
// session: Option<?> session: Field,
} }
impl Message { impl Message {
@@ -50,19 +50,33 @@ impl Message {
where where
A: Into<MsgAction>, A: Into<MsgAction>,
{ {
let msg_id = MessageID::new(); Self {
Self::with_id(msg_id, action) msg_id: MessageID::new(),
action: action.into(),
route: Route::default(),
session: Field::None,
}
} }
pub fn with_id<A>(msg_id: MessageID, action: A) -> Self pub fn set_id(&self, msg_id: MessageID) -> Self {
let mut output = self.clone();
output.msg_id = msg_id;
output
}
pub fn set_session(&self, session: Field) -> Self {
let mut output = self.clone();
output.session = session;
output
}
pub fn set_action<A>(&self, action: A) -> Self
where where
A: Into<MsgAction>, A: Into<MsgAction>,
{ {
Self { let mut output = self.clone();
msg_id: msg_id, output.action = action.into();
action: action.into(), output
route: Route::default(),
}
} }
pub fn get_message_id(&self) -> &MessageID { pub fn get_message_id(&self) -> &MessageID {
@@ -88,27 +102,15 @@ impl Message {
pub fn set_route(&mut self, route: Route) { pub fn set_route(&mut self, route: Route) {
self.route = route; self.route = route;
} }
}
pub fn response<A>(&self, action: A) -> Self impl Default for Message {
where fn default() -> Self {
A: Into<MsgAction>,
{
Self { Self {
msg_id: self.msg_id.clone(), msg_id: MessageID::new(),
action: action.into(), action: MsgAction::None,
route: Route::default(),
}
}
pub fn forward<D, A>(&self, doc_id: D, action: A) -> Self
where
D: Into<NameType>,
A: Into<MsgAction>,
{
Self {
msg_id: self.msg_id.clone(),
action: action.into(),
route: Route::default(), route: Route::default(),
session: Field::None,
} }
} }
} }
@@ -127,6 +129,62 @@ mod messages {
name::{name_id_support::test_name_id, Name}, name::{name_id_support::test_name_id, Name},
}; };
fn is_there_a_default_message() {
let msg = Message::default();
match msg.action {
MsgAction::None => {}
_ => panic!("should have been no action"),
}
assert_eq!(msg.session, Field::None);
}
#[test]
fn can_create_new_messsage() {
let doc_name = Name::english("something");
let qry = Query::new(doc_name.clone());
let msg = Message::new(qry);
let expected: NameType = doc_name.into();
match msg.action {
MsgAction::Query(data) => assert_eq!(data.doc_name(), &expected),
_ => unreachable!("should have been a query"),
}
match msg.session {
Field::None => {}
_ => unreachable!("should have been none"),
}
}
#[test]
fn can_id_be_set() {
let doc_name = Name::english("identification");
let qry = Query::new(doc_name.clone());
let msg_id = MessageID::new();
let msg = Message::new(qry).set_id(msg_id.clone());
assert_eq!(msg.msg_id, msg_id);
}
#[test]
fn can_session_be_set() {
let doc_name = Name::english("identification");
let qry = Query::new(doc_name.clone());
let sess_id: Field = Uuid::new_v4().into();
let msg = Message::new(qry).set_session(sess_id.clone());
assert_eq!(msg.session, sess_id);
}
#[test]
fn can_action_be_set() {
let doc_name = Name::english("action");
let expected: NameType = doc_name.clone().into();
let docdef = DocDef::new(doc_name.clone());
let qry = Query::new(doc_name.clone());
let msg = Message::new(docdef).set_action(qry);
match msg.action {
MsgAction::Query(data) => assert_eq!(data.doc_name(), &expected),
_ => unreachable!("should have been a query"),
}
}
#[test] #[test]
fn can_the_document_be_a_named_reference() { fn can_the_document_be_a_named_reference() {
let dts = [Name::english("one"), Name::english("two")]; let dts = [Name::english("one"), Name::english("two")];
@@ -204,7 +262,7 @@ mod messages {
let name = Name::english("testing"); let name = Name::english("testing");
let msg = Message::new(Query::new(name.clone())); let msg = Message::new(Query::new(name.clone()));
let responce = Reply::new(Name::english("something")); let responce = Reply::new(Name::english("something"));
let reply = msg.response(responce); let reply = msg.set_action(responce);
assert_eq!(reply.get_message_id(), msg.get_message_id()); assert_eq!(reply.get_message_id(), msg.get_message_id());
match reply.get_action() { match reply.get_action() {
MsgAction::Reply(_) => {} MsgAction::Reply(_) => {}
@@ -217,7 +275,7 @@ mod messages {
let name = Name::english("testing"); let name = Name::english("testing");
let msg = Message::new(Query::new(name.clone())); let msg = Message::new(Query::new(name.clone()));
let err_msg = Uuid::new_v4().to_string(); let err_msg = Uuid::new_v4().to_string();
let result = msg.response(MTTError::new(ErrorID::DocumentNotFound)); let result = msg.set_action(MTTError::new(ErrorID::DocumentNotFound));
assert_eq!(result.get_message_id(), msg.get_message_id()); assert_eq!(result.get_message_id(), msg.get_message_id());
match result.get_action() { match result.get_action() {
MsgAction::Error(data) => match data.get_error_ids().back().unwrap() { MsgAction::Error(data) => match data.get_error_ids().back().unwrap() {
@@ -233,8 +291,8 @@ mod messages {
let doc_id = test_name_id(); let doc_id = test_name_id();
let msg = Message::new(Query::new(doc_id.clone())); let msg = Message::new(Query::new(doc_id.clone()));
let data = Uuid::new_v4().to_string(); let data = Uuid::new_v4().to_string();
let result1 = msg.response(MTTError::new(ErrorID::DocumentNotFound)); let result1 = msg.set_action(MTTError::new(ErrorID::DocumentNotFound));
let result2 = msg.response(Reply::new(NameType::None)); let result2 = msg.set_action(Reply::new(NameType::None));
assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), msg.get_message_id());
assert_eq!(result2.get_message_id(), msg.get_message_id()); assert_eq!(result2.get_message_id(), msg.get_message_id());
let action1 = result1.get_action(); let action1 = result1.get_action();

View File

@@ -474,7 +474,7 @@ impl DocRegistry {
match msg.get_action() { match msg.get_action() {
MsgAction::Register(data) => { MsgAction::Register(data) => {
let id = data.get_sender_id(); let id = data.get_sender_id();
let reply = msg.response(self.register_action(data)); let reply = msg.set_action(self.register_action(data));
self.queue.forward(id, reply); self.queue.forward(id, reply);
} }
_ => match self.path_to_route(&msg.get_path()) { _ => match self.path_to_route(&msg.get_path()) {
@@ -484,7 +484,7 @@ impl DocRegistry {
self.queue.forward(sender_id, msg.clone()); self.queue.forward(sender_id, msg.clone());
} }
} }
Err(err) => self.queue.send(msg.response(MsgAction::Error(err))), Err(err) => self.queue.send(msg.set_action(MsgAction::Error(err))),
}, },
} }
} }