diff --git a/src/message.rs b/src/message.rs index 9c86fc2..4dd77f0 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,4 +1,3 @@ -use crate::field::Field; use std::{ collections::HashMap, sync::{ @@ -19,13 +18,29 @@ enum MTTError { #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum Action { Create, + Query, // NewDocumentType, - Query, Reply, Update, } +impl From for Action { + fn from(value: MsgAction) -> Self { + match value { + MsgAction::Create(_) => Action::Create, + MsgAction::Query(_) => Action::Query, + } + } +} + +impl From<&MsgAction> for Action { + fn from(value: &MsgAction) -> Self { + let action = value.clone(); + Self::from(action) + } +} + #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum NameID { ID(Uuid), @@ -56,16 +71,21 @@ impl From<&NameID> for NameID { } } +#[derive(Clone)] +enum MsgAction { + Create(DocDef), + Query(Access), +} + #[derive(Clone)] struct Message { msg_id: Uuid, document_id: NameID, - action: Action, - //instructions: ?, + action: MsgAction, } impl Message { - fn new(doc_id: D, action: Action) -> Self + fn new(doc_id: D, action: MsgAction) -> Self where D: Into, { @@ -84,7 +104,7 @@ impl Message { &self.document_id } - fn get_action(&self) -> &Action { + fn get_action(&self) -> &MsgAction { &self.action } } @@ -97,12 +117,15 @@ mod messages { fn can_the_document_be_a_stringi_reference() { let dts = ["one", "two"]; for document in dts.into_iter() { - let msg = Message::new(document, Action::NewDocumentType); + let msg = Message::new(document, MsgAction::Create(DocDef::new())); match msg.get_document_id() { NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, document), } - assert_eq!(msg.get_action(), &Action::NewDocumentType); + match msg.get_action() { + MsgAction::Create(_) => {} + _ => unreachable!("should have been a create document"), + } } } @@ -110,31 +133,37 @@ mod messages { fn can_the_document_be_a_string() { let dts = ["one".to_string(), "two".to_string()]; for document in dts.into_iter() { - let msg = Message::new(document.clone(), Action::Update); + let msg = Message::new(document.clone(), MsgAction::Query(Access::new())); match msg.get_document_id() { NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, &document), } - assert_eq!(msg.get_action(), &Action::Update); + match msg.get_action() { + MsgAction::Query(_) => {} + _ => unreachable!("should have been an access query"), + } } } #[test] fn can_the_document_be_an_id() { let document = Uuid::new_v4(); - let msg = Message::new(document.clone(), Action::Query); + let msg = Message::new(document.clone(), MsgAction::Query(Access::new())); match msg.get_document_id() { NameID::ID(data) => assert_eq!(data, &document), NameID::Name(_) => unreachable!("should have been an id"), } - assert_eq!(msg.action, Action::Query); + match msg.get_action() { + MsgAction::Query(_) => {} + _ => unreachable!("should have been an access query"), + } } #[test] fn is_the_message_id_random() { let mut ids: Vec = Vec::new(); for _ in 0..5 { - let msg = Message::new("tester", Action::NewDocumentType); + let msg = Message::new("tester", MsgAction::Create(DocDef::new())); let id = msg.get_message_id().clone(); assert!(!ids.contains(&id), "{:?} containts {}", ids, id); ids.push(id); @@ -185,23 +214,23 @@ struct RouteID { } impl From for RouteID { -fn from(value: Route) -> Self { - Self { - action: match value.action { - Include::All => None, - Include::Some(action) => Some(action.clone()), - }, - doc_type: match value.doc_type { - Include::All => None, - Include::Some(doc) => Some(doc.clone()), - }, - msg_id: match value.msg_id{ - Include::All => None, - Include::Some(id) => Some(id.clone()), - }, + fn from(value: Route) -> Self { + Self { + action: match value.action { + Include::All => None, + Include::Some(action) => Some(action.clone()), + }, + doc_type: match value.doc_type { + Include::All => None, + Include::Some(doc) => Some(doc.clone()), + }, + msg_id: match value.msg_id { + Include::All => None, + Include::Some(id) => Some(id.clone()), + }, + } } } -} #[derive(Clone, Debug, PartialEq)] struct Route { @@ -211,8 +240,7 @@ struct Route { } impl Route { - fn new(msg_id: Include, doc: Include, action: Include) -> Self - { + fn new(msg_id: Include, doc: Include, action: Include) -> Self { Self { action: action, doc_type: doc, @@ -243,7 +271,7 @@ impl From for Route { impl From<&RouteID> for Route { fn from(value: &RouteID) -> Self { Self { - action: match &value.action { + action: match &value.action { Some(data) => Include::Some(data.clone()), None => Include::All, }, @@ -286,19 +314,19 @@ mod roiutes { #[test] fn can_route_set_document_by_name() { let doc_id = Uuid::new_v4(); - let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All); - match route.msg_id { - Include::All => {} - Include::Some(_) => unreachable!("should have been all"), - } - match route.doc_type { - Include::All => unreachable!("should be a specific value"), - Include::Some(result) => assert_eq!(result, doc_id), - } - match route.action { - Include::All => {} - Include::Some(_) => unreachable!("should have been all"), - } + let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All); + match route.msg_id { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + match route.doc_type { + Include::All => unreachable!("should be a specific value"), + Include::Some(result) => assert_eq!(result, doc_id), + } + match route.action { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } } #[test] @@ -389,14 +417,19 @@ impl QueueData { let doc_id: Include = match self.get_doc_id(msg.get_document_id()) { Ok(id) => Include::Some(id.clone()), Err(err) => { - if msg.get_action() == &Action::Create { + let action: Action = msg.get_action().into(); + if action == Action::Create { Include::Some(Uuid::nil()) } else { - return Err(err) + return Err(err); } - }, + } }; - let route = Route::new(Include::Some(msg.get_message_id().clone()), doc_id, Include::Some(msg.get_action().clone())); + let route = Route::new( + Include::Some(msg.get_message_id().clone()), + doc_id, + Include::Some(msg.get_action().into()), + ); for (send_route, send_ids) in self.routes.iter() { if route == send_route.into() { for send_id in send_ids { @@ -452,11 +485,11 @@ mod queuedatas { let mut queuedata = QueueData::new(); let id = queuedata.register(name.clone(), tx).unwrap(); queuedata.add_route(&id, Include::Some(name.clone()), action); - let msg = Message::new(name.clone(), Action::Query); + let msg = Message::new(name.clone(), MsgAction::Query(Access::new())); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); - let msg = Message::new(id.clone(), Action::Query); + let msg = Message::new(id.clone(), MsgAction::Query(Access::new())); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); @@ -466,7 +499,7 @@ mod queuedatas { fn does_a_bad_document_name_fail() { let docname = Uuid::new_v4().to_string(); let queuedata = QueueData::new(); - let msg = Message::new(docname.clone(), Action::Query); + let msg = Message::new(docname.clone(), MsgAction::Query(Access::new())); match queuedata.send(msg) { Ok(_) => unreachable!("should have been an error"), Err(data) => match data { @@ -498,7 +531,7 @@ mod queuedatas { let name = "something"; let (tx, _) = channel(); queuedata.register(name.to_string(), tx).unwrap(); - let msg = Message::new("something", Action::NewDocumentType); + let msg = Message::new("something", MsgAction::Create(DocDef::new())); match queuedata.send(msg) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should not error", err), @@ -512,11 +545,11 @@ mod queuedatas { let (tx, rx) = channel(); let id = queuedata.register(doctype.to_string(), tx).unwrap(); queuedata.add_route(&id, Include::Some(doctype.to_string()), Action::Query); - let msg = Message::new(doctype, Action::Query); + let msg = Message::new(doctype, MsgAction::Query(Access::new())); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); - let msg = Message::new(doctype, Action::Reply); + let msg = Message::new(doctype, MsgAction::Query(Access::new())); match rx.recv_timeout(TIMEOUT) { Ok(_) => unreachable!("should timeout"), Err(err) => match err { @@ -531,13 +564,21 @@ mod queuedatas { let mut queuedata = QueueData::new(); let name1 = "task"; let name2 = "work"; - let action = Action::Query; + let action = MsgAction::Query(Access::new()); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); let id1 = queuedata.register(name1.to_string(), tx1).unwrap(); let id2 = queuedata.register(name2.to_string(), tx2).unwrap(); - queuedata.add_route(&id1, Include::Some(name1.to_string()), action.clone()); - queuedata.add_route(&id2, Include::Some(name1.to_string()), action.clone()); + queuedata.add_route( + &id1, + Include::Some(name1.to_string()), + action.clone().into(), + ); + queuedata.add_route( + &id2, + Include::Some(name1.to_string()), + action.clone().into(), + ); let msg = Message::new(name1, action.clone()); queuedata.send(msg.clone()).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); @@ -550,11 +591,11 @@ mod queuedatas { fn can_a_route_be_generally_set() { let mut queuedata = QueueData::new(); let doctype = "something"; - let action = Action::Query; + let action = MsgAction::Query(Access::new()); let (tx, rx) = channel(); let id = queuedata.register(doctype.to_string(), tx).unwrap(); let data: Include = Include::All; - queuedata.add_route(&id, data, action.clone()); + queuedata.add_route(&id, data, action.clone().into()); let msg = Message::new(doctype, action); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); @@ -608,6 +649,59 @@ mod queues { } } +struct CreateDoc { + queue: Queue, + rx: Receiver, +} + +impl CreateDoc { + fn new(queue: Queue, rx: Receiver) -> Self { + Self { + queue: queue, + rx: rx, + } + } + + fn start(mut queue: Queue) { + let (tx, rx) = channel(); + let id = queue.register("document".to_string(), tx).unwrap(); + let nameid: Include = Include::All; + queue.add_route(&id, nameid, Action::Create); + let doc = CreateDoc::new(queue, rx); + spawn(move || { + doc.listen(); + }); + } + + fn listen(&self) { + loop { + let msg = self.rx.recv().unwrap(); + match msg.get_document_id() { + NameID::Name(name) => Document::start(self.queue.clone(), name.clone()), + NameID::ID(_) => unreachable!("should be a name"), + } + } + } +} + +#[derive(Clone)] +struct DocDef; + +impl DocDef { + fn new() -> Self { + Self {} + } +} + +#[derive(Clone)] +struct Access; + +impl Access { + fn new() -> Self { + Self {} + } +} + struct Document { queue: Queue, rx: Receiver, @@ -621,9 +715,9 @@ impl Document { } } - fn start(mut queue: Queue) { + fn start(mut queue: Queue, name: String) { let (tx, rx) = channel(); - queue.register("document".to_string(), tx); + queue.register(name, tx); let doc = Document::new(queue, rx); spawn(move || { doc.listen(); @@ -638,24 +732,19 @@ impl Document { } #[cfg(test)] -mod documents { +mod createdocs { use super::*; + use std::{thread::sleep, time::Duration}; - //#[test] + #[test] fn create_document_creation() { let queue = Queue::new(); - Document::start(queue.clone()); + CreateDoc::start(queue.clone()); let name = "project"; - let msg = Message::new(name, Action::Create); + let msg = Message::new(name, MsgAction::Create(DocDef::new())); queue.send(msg).unwrap(); - let msg2 = Message::new(name, Action::Query); + sleep(Duration::from_secs(1)); + let msg2 = Message::new(name, MsgAction::Query(Access::new())); queue.send(msg2).unwrap(); } } - -// Create a double hash map. posswible names that leads to an id that is int eh ids -// \and the second is the id and the sender to be used.and a third for who wants to -// listen to what. -// -// The queue has a read write lock on the abbove strucutee. A clone of this is given to -// every process.