use crate::field::Field; use std::{ collections::HashMap, sync::{ mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::spawn, }; use uuid::Uuid; #[derive(Clone, Debug)] enum MTTError { DocumentAlreadyExists(String), DocumentNotFound(String), RouteNoListeners, } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum Action { Create, // NewDocumentType, Query, Reply, Update, } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum NameID { ID(Uuid), Name(String), } impl From<&str> for NameID { fn from(value: &str) -> Self { Self::Name(value.to_string()) } } impl From for NameID { fn from(value: String) -> Self { Self::Name(value) } } impl From for NameID { fn from(value: Uuid) -> Self { Self::ID(value) } } impl From<&NameID> for NameID { fn from(value: &NameID) -> Self { value.clone() } } #[derive(Clone)] struct Message { msg_id: Uuid, document_id: NameID, action: Action, //instructions: ?, } impl Message { fn new(doc_id: D, action: Action) -> Self where D: Into, { Self { msg_id: Uuid::new_v4(), document_id: doc_id.into(), action: action, } } fn get_message_id(&self) -> &Uuid { &self.msg_id } fn get_document_id(&self) -> &NameID { &self.document_id } fn get_action(&self) -> &Action { &self.action } } #[cfg(test)] mod messages { use super::*; #[test] fn can_the_document_be_a_stringi_reference() { let dts = ["one", "two"]; for document in dts.into_iter() { let msg = Message::new(document, Action::NewDocumentType); match msg.get_document_id() { NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, document), } assert_eq!(msg.get_action(), &Action::NewDocumentType); } } #[test] fn can_the_document_be_a_string() { let dts = ["one".to_string(), "two".to_string()]; for document in dts.into_iter() { let msg = Message::new(document.clone(), Action::Update); match msg.get_document_id() { NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, &document), } assert_eq!(msg.get_action(), &Action::Update); } } #[test] fn can_the_document_be_an_id() { let document = Uuid::new_v4(); let msg = Message::new(document.clone(), Action::Query); match msg.get_document_id() { NameID::ID(data) => assert_eq!(data, &document), NameID::Name(_) => unreachable!("should have been an id"), } assert_eq!(msg.action, Action::Query); } #[test] fn is_the_message_id_random() { let mut ids: Vec = Vec::new(); for _ in 0..5 { let msg = Message::new("tester", Action::NewDocumentType); let id = msg.get_message_id().clone(); assert!(!ids.contains(&id), "{:?} containts {}", ids, id); ids.push(id); } } } #[derive(Clone, Debug)] enum Include { All, Some(T), } impl PartialEq for Include { fn eq(&self, other: &Self) -> bool { match self { Include::All => true, Include::Some(data) => match other { Include::All => true, Include::Some(other_data) => data == other_data, }, } } } #[cfg(test)] mod includes { use super::*; #[test] fn does_all_equal_evberything() { let a: Include = Include::All; let b: Include = Include::Some(5); let c: Include = Include::Some(7); assert!(a == a, "all should equal all"); assert!(a == b, "all should equal some"); assert!(b == a, "some should equal all"); assert!(b == b, "same some should equal"); assert!(b != c, "different somes do not equal"); } } #[derive(Eq, Hash, PartialEq)] struct RouteID { action: Option, doc_type: Option, msg_id: Option, } impl From for RouteID { fn from(value: Route) -> Self { Self { action: match value.action { Include::All => None, Include::Some(action) => Some(action.clone()), }, doc_type: match value.doc_type { Include::All => None, Include::Some(doc) => Some(doc.clone()), }, msg_id: match value.msg_id{ Include::All => None, Include::Some(id) => Some(id.clone()), }, } } } #[derive(Clone, Debug, PartialEq)] struct Route { action: Include, doc_type: Include, msg_id: Include, } impl Route { fn new(msg_id: Include, doc: Include, action: Include) -> Self { Self { action: action, doc_type: doc, msg_id: msg_id, } } } impl From for Route { fn from(value: RouteID) -> Self { Self { action: match value.action { Some(data) => Include::Some(data.clone()), None => Include::All, }, doc_type: match value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, msg_id: match value.msg_id { Some(msg) => Include::Some(msg.clone()), None => Include::All, }, } } } impl From<&RouteID> for Route { fn from(value: &RouteID) -> Self { Self { action: match &value.action { Some(data) => Include::Some(data.clone()), None => Include::All, }, doc_type: match &value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, msg_id: match &value.msg_id { Some(msg) => Include::Some(msg.clone()), None => Include::All, }, } } } #[cfg(test)] mod roiutes { use super::*; #[test] fn can_a_route_set_action() { let actions = [Action::Query, Action::Reply]; for action in actions.into_iter() { let route = Route::new(Include::All, Include::All, Include::Some(action.clone())); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.action { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, action), } } } #[test] fn can_route_set_document_by_name() { let doc_id = Uuid::new_v4(); let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, doc_id), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } #[test] fn can_route_set_document_by_id() { let id = Uuid::new_v4(); let route = Route::new(Include::All, Include::Some(id.clone()), Include::All); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, id), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } #[test] fn can_route_be_set_by_message_id() { let id = Uuid::new_v4(); let route = Route::new(Include::Some(id.clone()), Include::All, Include::All); match route.msg_id { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, id), } match route.doc_type { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } } struct QueueData { senders: HashMap>, names: HashMap, routes: HashMap>, } impl QueueData { fn new() -> Self { Self { senders: HashMap::new(), names: HashMap::new(), routes: HashMap::new(), } } fn get_doc_id(&self, nameid: N) -> Result where N: Into, { let sender_id = match nameid.into() { NameID::Name(name) => match self.names.get(&name) { Some(id) => id.clone(), None => return Err(MTTError::DocumentNotFound(name.clone())), }, NameID::ID(id) => id.clone(), }; if self.senders.contains_key(&sender_id) { Ok(sender_id) } else { Err(MTTError::DocumentNotFound(sender_id.to_string())) } } fn register(&mut self, name: String, tx: Sender) -> Result { match self.get_doc_id(name.as_str()) { Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)), Err(_) => (), } let mut id = Uuid::new_v4(); while self.senders.contains_key(&id) { id = Uuid::new_v4(); } self.senders.insert(id.clone(), tx); self.names.insert(name, id.clone()); Ok(id) } fn send(&self, msg: Message) -> Result<(), MTTError> { let doc_id: Include = match self.get_doc_id(msg.get_document_id()) { Ok(id) => Include::Some(id.clone()), Err(err) => { if msg.get_action() == &Action::Create { Include::Some(Uuid::nil()) } else { return Err(err) } }, }; let route = Route::new(Include::Some(msg.get_message_id().clone()), doc_id, Include::Some(msg.get_action().clone())); for (send_route, send_ids) in self.routes.iter() { if route == send_route.into() { for send_id in send_ids { let tx = self.senders.get(&send_id).unwrap(); tx.send(msg.clone()).unwrap(); } } } Ok(()) } fn add_route( &mut self, sender_id: &Uuid, doc_type: Include, action: Action, ) -> Result<(), MTTError> where N: Into, { let doc_id = match doc_type { Include::Some(data) => match self.get_doc_id(data) { Ok(id) => Include::Some(id.clone()), Err(err) => return Err(err), }, Include::All => Include::All, }; let route = Route::new(Include::All, doc_id, Include::Some(action)); let route_id = route.into(); match self.routes.get_mut(&route_id) { Some(mut senders) => senders.push(sender_id.clone()), None => { self.routes .insert(route_id.into(), [sender_id.clone()].to_vec()); } } Ok(()) } } #[cfg(test)] mod queuedatas { use super::*; use std::{sync::mpsc::RecvTimeoutError, time::Duration}; static TIMEOUT: Duration = Duration::from_millis(500); #[test] fn can_a_new_document_type_be_rgistered() { let name = Uuid::new_v4().to_string(); let action = Action::Query; let (tx, rx) = channel(); let mut queuedata = QueueData::new(); let id = queuedata.register(name.clone(), tx).unwrap(); queuedata.add_route(&id, Include::Some(name.clone()), action); let msg = Message::new(name.clone(), Action::Query); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); let msg = Message::new(id.clone(), Action::Query); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); } #[test] fn does_a_bad_document_name_fail() { let docname = Uuid::new_v4().to_string(); let queuedata = QueueData::new(); let msg = Message::new(docname.clone(), Action::Query); match queuedata.send(msg) { Ok(_) => unreachable!("should have been an error"), Err(data) => match data { MTTError::DocumentNotFound(doc) => assert_eq!(doc, docname), _ => unreachable!("should have been a not found error"), }, } } #[test] fn should_error_on_duplicate_name_registration() { let name = Uuid::new_v4().to_string(); let (tx1, _) = channel(); let (tx2, _) = channel(); let mut queuedata = QueueData::new(); queuedata.register(name.clone(), tx1).unwrap(); match queuedata.register(name.clone(), tx2) { Ok(_) => unreachable!("should have been an weeoe"), Err(data) => match data { MTTError::DocumentAlreadyExists(output) => assert_eq!(output, name), _ => unreachable!("should have been an already exists errorr"), }, } } #[test] fn is_send_okay_if_no_one_is_listening() { let mut queuedata = QueueData::new(); let name = "something"; let (tx, _) = channel(); queuedata.register(name.to_string(), tx).unwrap(); let msg = Message::new("something", Action::NewDocumentType); match queuedata.send(msg) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should not error", err), } } #[test] fn can_certain_messages_be_ignored() { let mut queuedata = QueueData::new(); let doctype = "test"; let (tx, rx) = channel(); let id = queuedata.register(doctype.to_string(), tx).unwrap(); queuedata.add_route(&id, Include::Some(doctype.to_string()), Action::Query); let msg = Message::new(doctype, Action::Query); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); let msg = Message::new(doctype, Action::Reply); match rx.recv_timeout(TIMEOUT) { Ok(_) => unreachable!("should timeout"), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should timeout"), }, } } #[test] fn can_more_than_one_document_respond() { let mut queuedata = QueueData::new(); let name1 = "task"; let name2 = "work"; let action = Action::Query; let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); let id1 = queuedata.register(name1.to_string(), tx1).unwrap(); let id2 = queuedata.register(name2.to_string(), tx2).unwrap(); queuedata.add_route(&id1, Include::Some(name1.to_string()), action.clone()); queuedata.add_route(&id2, Include::Some(name1.to_string()), action.clone()); let msg = Message::new(name1, action.clone()); queuedata.send(msg.clone()).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); let result2 = rx2.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), result2.get_message_id()); } #[test] fn can_a_route_be_generally_set() { let mut queuedata = QueueData::new(); let doctype = "something"; let action = Action::Query; let (tx, rx) = channel(); let id = queuedata.register(doctype.to_string(), tx).unwrap(); let data: Include = Include::All; queuedata.add_route(&id, data, action.clone()); let msg = Message::new(doctype, action); queuedata.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); } } #[derive(Clone)] struct Queue { queue_data: Arc>, } impl Queue { fn new() -> Self { Self { queue_data: Arc::new(RwLock::new(QueueData::new())), } } fn register(&mut self, name: String, tx: Sender) -> Result { let mut queuedata = self.queue_data.write().unwrap(); queuedata.register(name, tx) } fn send(&self, msg: Message) -> Result<(), MTTError> { let queuedata = self.queue_data.read().unwrap(); queuedata.send(msg) } fn add_route( &mut self, sender_id: &Uuid, doc_type: Include, action: Action, ) -> Result<(), MTTError> where N: Into, { let mut queuedata = self.queue_data.write().unwrap(); queuedata.add_route(sender_id, doc_type, action) } } #[cfg(test)] mod queues { use super::*; #[test] fn create_a_queue() { Queue::new(); } } struct Document { queue: Queue, rx: Receiver, } impl Document { fn new(queue: Queue, rx: Receiver) -> Self { Self { queue: queue, rx: rx, } } fn start(mut queue: Queue) { let (tx, rx) = channel(); queue.register("document".to_string(), tx); let doc = Document::new(queue, rx); spawn(move || { doc.listen(); }); } fn listen(&self) { loop { self.rx.recv().unwrap(); } } } #[cfg(test)] mod documents { use super::*; //#[test] fn create_document_creation() { let queue = Queue::new(); Document::start(queue.clone()); let name = "project"; let msg = Message::new(name, Action::Create); queue.send(msg).unwrap(); let msg2 = Message::new(name, Action::Query); queue.send(msg2).unwrap(); } } // Create a double hash map. posswible names that leads to an id that is int eh ids // \and the second is the id and the sender to be used.and a third for who wants to // listen to what. // // The queue has a read write lock on the abbove strucutee. A clone of this is given to // every process.