diff --git a/src/data_director.rs b/src/data_director.rs index 5231cf6..b79a3d6 100644 --- a/src/data_director.rs +++ b/src/data_director.rs @@ -1,6 +1,13 @@ use crate::{ - message::{Action, MsgAction}, - name::NameType, + message::{Action, Message, MsgAction}, + mtterror::MTTError, + name::{Name, NameType, Names}, + router::Queue, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::mpsc::Receiver, + thread::spawn, }; use uuid::Uuid; @@ -39,6 +46,48 @@ mod includes { } } +#[derive(Clone, Debug)] +pub enum RegMsg { + AddRoute(Path), + AddDocName(Vec), + DocumentNameID(Uuid), + Error(MTTError), + GetNameID(Name), + Ok, + RemoveSender(Uuid), + RouteID(RouteID), +} + +#[derive(Clone, Debug)] +pub struct Register { + msg: RegMsg, + sender_id: Uuid, +} + +impl Register { + pub fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self { + Self { + msg: reg_msg, + sender_id: sender_id, + } + } + + pub fn get_msg(&self) -> &RegMsg { + &self.msg + } + + pub fn get_sender_id(&self) -> &Uuid { + &self.sender_id + } + + pub fn response(&self, reg_msg: RegMsg) -> Self { + Self { + msg: reg_msg, + sender_id: self.sender_id.clone(), + } + } +} + #[derive(Clone, Debug)] pub struct Path { pub msg_id: Include, @@ -112,3 +161,333 @@ mod paths { } } } + +#[derive(Clone, Debug, PartialEq)] +pub struct Route { + pub action: Include, + pub doc_id: Include, + pub msg_id: Include, +} + +impl Route { + pub fn new(msg_id: Include, doc: Include, action: Include) -> Self { + Self { + action: action, + doc_id: doc, + msg_id: msg_id, + } + } +} + +impl Default for Route { + fn default() -> Self { + Self { + action: Include::All, + doc_id: Include::All, + msg_id: Include::All, + } + } +} + +impl From for Route { + fn from(value: RouteID) -> Self { + Self::from(&value) + } +} + +impl From<&RouteID> for Route { + fn from(value: &RouteID) -> Self { + Self { + action: match &value.action { + Some(data) => Include::Just(data.clone()), + None => Include::All, + }, + doc_id: match &value.doc_id { + Some(doc) => Include::Just(doc.clone()), + None => Include::All, + }, + msg_id: match &value.msg_id { + Some(msg) => Include::Just(msg.clone()), + None => Include::All, + }, + } + } +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct RouteID { + action: Option, + doc_id: Option, + msg_id: Option, +} + +impl From for RouteID { + fn from(value: Route) -> Self { + Self { + action: match value.action { + Include::All => None, + Include::Just(action) => Some(action.clone()), + }, + doc_id: match value.doc_id { + Include::All => None, + Include::Just(doc) => Some(doc.clone()), + }, + msg_id: match value.msg_id { + Include::All => None, + Include::Just(id) => Some(id.clone()), + }, + } + } +} + +struct RouteStorage { + data: HashMap>, +} + +impl RouteStorage { + fn new() -> Self { + Self { + data: HashMap::new(), + } + } + + fn add(&mut self, route: Route, sender_id: Uuid) -> RouteID { + let route_id: RouteID = route.into(); + let set = match self.data.get_mut(&route_id) { + Some(result) => result, + None => { + let holder = HashSet::new(); + self.data.insert(route_id.clone(), holder); + self.data.get_mut(&route_id).unwrap() + } + }; + set.insert(sender_id); + route_id + } + + fn remove_sender_id(&mut self, sender_id: &Uuid) { + let mut removal: Vec = Vec::new(); + for (route_id, set) in self.data.iter_mut() { + set.remove(sender_id); + if set.is_empty() { + removal.push(route_id.clone()); + } + } + for route_id in removal.iter() { + self.data.remove(route_id); + } + } + + fn get(&self, route: Route) -> HashSet { + let mut output = HashSet::new(); + for (route_id, set) in self.data.iter() { + if route == route_id.into() { + output = output.union(set).cloned().collect(); + } + } + output + } +} + +#[cfg(test)] +mod route_storeage { + use super::*; + + #[test] + fn can_add_routes() { + let mut routes = RouteStorage::new(); + let id1 = Uuid::new_v4(); + let id2 = Uuid::new_v4(); + let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + let route_id1 = routes.add(route1.clone(), id1.clone()); + let route_id2 = routes.add(route2.clone(), id2.clone()); + let result1 = routes.get(route1.clone()); + assert_eq!(result1.len(), 1); + assert!( + result1.contains(&id1), + "{:?} not found in {:?}", + id1, + result1 + ); + assert_eq!(route_id1, route1.into()); + let result2 = routes.get(route2.clone()); + assert_eq!(result2.len(), 1); + assert!( + result2.contains(&id2), + "{:?} not found in {:?}", + id2, + result2 + ); + assert_eq!(route_id2, route2.into()); + } + + #[test] + fn returns_empty_set_when_nothing_is_available() { + let routes = RouteStorage::new(); + let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + let result = routes.get(route); + assert_eq!(result.len(), 0); + } + + #[test] + fn returns_all_entries_using_the_same_route() { + let count = 5; + let mut routes = RouteStorage::new(); + let mut ids: HashSet = HashSet::new(); + while ids.len() < count { + ids.insert(Uuid::new_v4()); + } + let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + for id in ids.iter() { + routes.add(route.clone(), id.clone()); + } + let result = routes.get(route); + assert_eq!(result, ids); + } + + #[test] + fn routes_are_not_duplicated() { + let count = 5; + let mut routes = RouteStorage::new(); + let id = Uuid::new_v4(); + let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + for _ in 0..count { + routes.add(route.clone(), id.clone()); + } + let result = routes.get(route); + assert_eq!(result.len(), 1); + assert!(result.contains(&id), "{:?} not found in {:?}", id, result); + } + + #[test] + fn overlapping_routes_are_combined() { + let mut routes = RouteStorage::new(); + let id1 = Uuid::new_v4(); + let id2 = Uuid::new_v4(); + let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + routes.add(route1.clone(), id1.clone()); + routes.add(route2.clone(), id2.clone()); + let retrieve = Route::new(Include::All, Include::All, Include::All); + let result = routes.get(retrieve); + assert_eq!(result.len(), 2); + assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result); + assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result); + } + + #[test] + fn can_remove_sender_id() { + let mut routes = RouteStorage::new(); + let count = 5; + let mut ids: HashSet = HashSet::new(); + while ids.len() < count { + ids.insert(Uuid::new_v4()); + } + let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + for id in ids.iter() { + routes.add(route.clone(), id.clone()); + } + let removed = ids.iter().last().unwrap().clone(); + ids.remove(&removed); + routes.remove_sender_id(&removed); + let result = routes.get(route); + assert_eq!(result, ids); + } + + #[test] + fn empty_routes_are_release_memory() { + let mut routes = RouteStorage::new(); + let id = Uuid::new_v4(); + let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); + routes.add(route.clone(), id.clone()); + routes.remove_sender_id(&id); + assert_eq!(routes.data.len(), 0); + } +} + +pub struct DocRegistry { + doc_names: Names, + queue: Queue, + receiver: Receiver, + routes: RouteStorage, +} + +impl DocRegistry { + fn new(queue: Queue, rx: Receiver) -> Self { + Self { + doc_names: Names::new(), + queue: queue, + receiver: rx, + routes: RouteStorage::new(), + } + } + + pub fn start(queue: Queue, rx: Receiver) { + let mut doc_names = DocRegistry::new(queue, rx); + spawn(move || { + doc_names.listen(); + }); + } + + fn listen(&mut self) { + loop { + let mut msg = self.receiver.recv().unwrap(); + match msg.get_action() { + MsgAction::Register(data) => { + let id = data.get_sender_id(); + let reply = msg.response(self.register_action(data)); + self.queue.forward(id, reply); + } + _ => match self.path_to_route(&msg.get_path()) { + Ok(route) => { + msg.set_route(route.clone()); + for sender_id in self.routes.get(route).iter() { + self.queue.forward(sender_id, msg.clone()); + } + } + Err(err) => self + .queue + .send(msg.response(MsgAction::Error(err))) + .unwrap(), + }, + } + } + } + + fn path_to_route(&self, path: &Path) -> Result { + let doc_id = match &path.doc { + Include::Just(name) => match self.doc_names.get_id(name) { + Ok(id) => Include::Just(id), + Err(err) => return Err(err), + }, + Include::All => Include::All, + }; + Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone())) + } + + fn register_action(&mut self, reg: &Register) -> Register { + match reg.get_msg() { + RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) { + Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), + Err(err) => reg.response(RegMsg::Error(err)), + }, + RegMsg::AddRoute(path) => { + // let route = self.doc_names.path_to_route(path).unwrap(); + let route = self.path_to_route(path).unwrap(); + reg.response(RegMsg::RouteID( + self.routes.add(route, reg.get_sender_id().clone()), + )) + } + RegMsg::GetNameID(name) => match self.doc_names.get_id(name) { + Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), + Err(err) => reg.response(RegMsg::Error(err)), + }, + RegMsg::RemoveSender(sender_id) => { + self.routes.remove_sender_id(sender_id); + reg.response(RegMsg::Ok) + } + _ => reg.response(RegMsg::Ok), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 7cb0ffb..fba49e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,10 +4,10 @@ mod mtterror; mod name; mod router; -use data_director::{Include, Path}; +use data_director::{Include, Path, RegMsg, Register}; use message::{ Action, Addition, CalcValue, Calculation, Clock, CreateDoc, Field, FieldType, Message, Operand, - RegMsg, Register, Session, + Session, }; pub use message::{MsgAction, Query}; use mtterror::MTTError; diff --git a/src/message.rs b/src/message.rs index c04bbe4..3c42337 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,6 +1,6 @@ use super::MTTError; use crate::{ - data_director::{Include, Path}, + data_director::{Include, Path, RegMsg, Register, Route, RouteID}, name::{Name, NameType, Names}, router::Queue, }; @@ -216,6 +216,7 @@ pub struct Message { msg_id: Uuid, document_id: NameType, action: MsgAction, + route: Route, // session: Option } @@ -229,6 +230,7 @@ impl Message { msg_id: Uuid::new_v4(), document_id: doc_id.into(), action: action.into(), + route: Route::default(), } } @@ -236,15 +238,11 @@ impl Message { &self.msg_id } - fn get_document_id(&self) -> &NameType { - &self.document_id - } - pub fn get_action(&self) -> &MsgAction { &self.action } - fn get_path(&self) -> Path { + pub fn get_path(&self) -> Path { Path::new( Include::Just(self.msg_id.clone()), Include::Just(self.document_id.clone()), @@ -252,11 +250,12 @@ impl Message { ) } - fn reset_name_id(&mut self, name: NT) - where - NT: Into, - { - self.document_id = name.into(); + pub fn get_route(&self) -> Route { + self.route.clone() + } + + pub fn set_route(&mut self, route: Route) { + self.route = route; } pub fn response(&self, action: A) -> Self @@ -267,6 +266,7 @@ impl Message { msg_id: self.msg_id.clone(), document_id: self.document_id.clone(), action: action.into(), + route: Route::default(), } } @@ -279,6 +279,7 @@ impl Message { msg_id: self.msg_id.clone(), document_id: doc_id.into(), action: action.into(), + route: Route::default(), } } } @@ -288,14 +289,14 @@ mod messages { use super::*; #[test] - fn can_the_document_be_a_stringi_reference() { + fn can_the_document_be_a_named_reference() { let dts = [Name::english("one"), Name::english("two")]; for document in dts.into_iter() { let msg = Message::new( document.clone(), MsgAction::Create(DocDef::new(document.clone())), ); - match msg.get_document_id() { + match &msg.document_id { NameType::Name(data) => assert_eq!(data, &document), _ => unreachable!("should have been a string id"), } @@ -310,8 +311,8 @@ mod messages { fn can_the_document_be_an_id() { let document = Uuid::new_v4(); let msg = Message::new(document.clone(), Query::new()); - match msg.get_document_id() { - NameType::ID(data) => assert_eq!(data, &document), + match msg.document_id { + NameType::ID(data) => assert_eq!(data, document), _ => unreachable!("should have been an id"), } match msg.get_action() { @@ -320,6 +321,44 @@ mod messages { } } + #[test] + fn do_messages_contain_routes() { + let mut msg = Message::new(Name::english("whatever"), Query::new()); + let default_route = msg.get_route(); + match default_route.msg_id { + Include::Just(_) => unreachable!("should defalt to all"), + Include::All => {} + } + match default_route.doc_id { + Include::Just(_) => unreachable!("should defalt to all"), + Include::All => {} + } + match default_route.action { + Include::Just(_) => unreachable!("should defalt to all"), + Include::All => {} + } + let doc_id = Uuid::new_v4(); + let route = Route::new( + Include::Just(msg.get_message_id().clone()), + Include::Just(doc_id.clone()), + Include::Just(msg.get_action().into()), + ); + msg.set_route(route); + let result = msg.get_route(); + match result.msg_id { + Include::Just(data) => assert_eq!(&data, msg.get_message_id()), + Include::All => unreachable!("should have message id"), + } + match result.doc_id { + Include::Just(data) => assert_eq!(data, doc_id), + Include::All => unreachable!("should have document id"), + } + match result.action { + Include::Just(data) => assert_eq!(data, msg.get_action().into()), + Include::All => unreachable!("should have action"), + } + } + #[test] fn is_the_message_id_random() { let mut ids: Vec = Vec::new(); @@ -338,7 +377,7 @@ mod messages { let responce = Reply::new(); let reply = msg.response(responce); assert_eq!(reply.get_message_id(), msg.get_message_id()); - match reply.get_document_id() { + match &reply.document_id { NameType::Name(data) => assert_eq!(data, &name), _ => unreachable!("should have been a name"), } @@ -355,7 +394,7 @@ mod messages { let err_msg = Uuid::new_v4().to_string(); let result = msg.response(MTTError::DocumentNotFound(err_msg.clone())); assert_eq!(result.get_message_id(), msg.get_message_id()); - match result.get_document_id() { + match &result.document_id { NameType::Name(data) => assert_eq!(data, &name), _ => unreachable!("should have been a name"), } @@ -377,8 +416,8 @@ mod messages { let result2 = msg.response(Reply::new()); assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result2.get_message_id(), msg.get_message_id()); - assert_eq!(result1.get_document_id(), msg.get_document_id()); - assert_eq!(result2.get_document_id(), msg.get_document_id()); + assert_eq!(result1.document_id, msg.document_id); + assert_eq!(result2.document_id, msg.document_id); let action1 = result1.get_action(); match action1 { MsgAction::Error(err) => match err { @@ -393,1224 +432,8 @@ mod messages { _ => unreachable!("got {:?}: should have received a reply", action2), } } - - #[test] - fn can_reset_document_id() { - let mut msg = Message::new(Name::english("something"), Query::new()); - let id = Uuid::new_v4(); - msg.reset_name_id(&id); - let name_id = msg.get_document_id(); - match name_id { - NameType::ID(data) => assert_eq!(data, &id), - _ => unreachable!("got {:?}, should have been an id", name_id), - } - } } -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct RouteID { - action: Option, - doc_type: Option, - msg_id: Option, -} - -impl From for RouteID { - fn from(value: Route) -> Self { - Self { - action: match value.action { - Include::All => None, - Include::Just(action) => Some(action.clone()), - }, - doc_type: match value.doc_type { - Include::All => None, - Include::Just(doc) => Some(doc.clone()), - }, - msg_id: match value.msg_id { - Include::All => None, - Include::Just(id) => Some(id.clone()), - }, - } - } -} - -impl TryFrom for RouteID { - type Error = MTTError; - - fn try_from(value: Message) -> Result { - let doc_id = match value.get_document_id() { - NameType::ID(data) => data, - _ => return Err(MTTError::CannotConvertMessageToRouteID), - }; - Ok(RouteID { - action: Some(value.get_action().into()), - doc_type: Some(doc_id.clone()), - msg_id: Some(value.get_message_id().clone()), - }) - } -} - -#[cfg(test)] -mod route_ids { - use super::*; - - #[test] - fn can_get_message_route_id() { - let actions: Vec = vec![Query::new().into(), MsgAction::Show]; - let doc_id = Uuid::new_v4(); - for action in actions.iter() { - let msg = Message::new(doc_id.clone(), action.clone()); - let route_id: RouteID = msg.clone().try_into().unwrap(); - match route_id.msg_id { - Some(data) => assert_eq!(&data, msg.get_message_id()), - None => unreachable!("should have had a message id"), - } - match route_id.doc_type { - Some(data) => assert_eq!(data, doc_id), - None => unreachable!("should have had doc type"), - } - match route_id.action { - Some(data) => assert_eq!(data, action.into()), - None => unreachable!("should have had doc type"), - } - } - } - - #[test] - fn errors_when_doc_id_is_not_id() { - let msg = Message::new(Name::english("nope"), Query::new()); - match TryInto::::try_into(msg) { - Ok(_) => unreachable!("should be an error"), - Err(err) => match err { - MTTError::CannotConvertMessageToRouteID => {} - _ => unreachable!("got {:?}, should have been covert error", err), - }, - } - } -} - -#[derive(Clone, Debug)] -pub enum RegMsg { - AddRoute(Path), - AddDocName(Vec), - DocumentNameID(Uuid), - Error(MTTError), - GetNameID(Name), - Ok, - RemoveSender(Uuid), - RouteID(RouteID), -} - -#[derive(Clone, Debug)] -pub struct Register { - msg: RegMsg, - sender_id: Uuid, -} - -impl Register { - pub fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self { - Self { - msg: reg_msg, - sender_id: sender_id, - } - } - - pub fn get_msg(&self) -> &RegMsg { - &self.msg - } - - fn get_sender_id(&self) -> &Uuid { - &self.sender_id - } - - fn response(&self, reg_msg: RegMsg) -> Self { - Self { - msg: reg_msg, - sender_id: self.sender_id.clone(), - } - } -} - -#[derive(Clone, Debug, PartialEq)] -pub struct Route { - action: Include, - doc_type: Include, - msg_id: Include, -} - -impl Route { - pub fn new(msg_id: Include, doc: Include, action: Include) -> Self { - Self { - action: action, - doc_type: doc, - msg_id: msg_id, - } - } -} - -impl From for Route { - fn from(value: RouteID) -> Self { - Self { - action: match value.action { - Some(data) => Include::Just(data.clone()), - None => Include::All, - }, - doc_type: match value.doc_type { - Some(doc) => Include::Just(doc.clone()), - None => Include::All, - }, - msg_id: match value.msg_id { - Some(msg) => Include::Just(msg.clone()), - None => Include::All, - }, - } - } -} - -impl From<&RouteID> for Route { - fn from(value: &RouteID) -> Self { - Self { - action: match &value.action { - Some(data) => Include::Just(data.clone()), - None => Include::All, - }, - doc_type: match &value.doc_type { - Some(doc) => Include::Just(doc.clone()), - None => Include::All, - }, - msg_id: match &value.msg_id { - Some(msg) => Include::Just(msg.clone()), - None => Include::All, - }, - } - } -} - -impl TryFrom for Route { - type Error = MTTError; - - fn try_from(value: Path) -> Result { - let doc = match value.doc { - Include::Just(data) => match data { - NameType::ID(id) => Include::Just(id.clone()), - _ => return Err(MTTError::RouteRequiresDocumentID), - }, - Include::All => Include::All, - }; - Ok(Self { - action: value.action, - doc_type: doc, - msg_id: value.msg_id, - }) - } -} - -#[cfg(test)] -mod routes { - use super::*; - - #[test] - fn can_a_route_set_action() { - let actions = [Action::Query, Action::Reply]; - for action in actions.into_iter() { - let route = Route::new(Include::All, Include::All, Include::Just(action.clone())); - match route.msg_id { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - match route.doc_type { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - match route.action { - Include::All => unreachable!("should be a specific value"), - Include::Just(result) => assert_eq!(result, action), - } - } - } - - #[test] - fn can_route_set_document_by_name() { - let doc_id = Uuid::new_v4(); - let route = Route::new(Include::All, Include::Just(doc_id.clone()), Include::All); - match route.msg_id { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - match route.doc_type { - Include::All => unreachable!("should be a specific value"), - Include::Just(result) => assert_eq!(result, doc_id), - } - match route.action { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - } - - #[test] - fn can_route_set_document_by_id() { - let id = Uuid::new_v4(); - let route = Route::new(Include::All, Include::Just(id.clone()), Include::All); - match route.msg_id { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - match route.doc_type { - Include::All => unreachable!("should be a specific value"), - Include::Just(result) => assert_eq!(result, id), - } - match route.action { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - } - - #[test] - fn can_route_be_set_by_message_id() { - let id = Uuid::new_v4(); - let route = Route::new(Include::Just(id.clone()), Include::All, Include::All); - match route.msg_id { - Include::All => unreachable!("should be a specific value"), - Include::Just(result) => assert_eq!(result, id), - } - match route.doc_type { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - match route.action { - Include::All => {} - Include::Just(_) => unreachable!("should have been all"), - } - } -} - -struct RouteStorage { - data: HashMap>, -} - -impl RouteStorage { - fn new() -> Self { - Self { - data: HashMap::new(), - } - } - - fn add(&mut self, route: Route, sender_id: Uuid) -> RouteID { - let route_id: RouteID = route.into(); - let set = match self.data.get_mut(&route_id) { - Some(result) => result, - None => { - let holder = HashSet::new(); - self.data.insert(route_id.clone(), holder); - self.data.get_mut(&route_id).unwrap() - } - }; - set.insert(sender_id); - route_id - } - - fn remove_sender_id(&mut self, sender_id: &Uuid) { - let mut removal: Vec = Vec::new(); - for (route_id, set) in self.data.iter_mut() { - set.remove(sender_id); - if set.is_empty() { - removal.push(route_id.clone()); - } - } - for route_id in removal.iter() { - self.data.remove(route_id); - } - } - - fn get(&self, route: Route) -> HashSet { - let mut output = HashSet::new(); - for (route_id, set) in self.data.iter() { - if route == route_id.into() { - output = output.union(set).cloned().collect(); - } - } - output - } -} - -#[cfg(test)] -mod route_storeage { - use super::*; - - #[test] - fn can_add_routes() { - let mut routes = RouteStorage::new(); - let id1 = Uuid::new_v4(); - let id2 = Uuid::new_v4(); - let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - let route_id1 = routes.add(route1.clone(), id1.clone()); - let route_id2 = routes.add(route2.clone(), id2.clone()); - let result1 = routes.get(route1.clone()); - assert_eq!(result1.len(), 1); - assert!( - result1.contains(&id1), - "{:?} not found in {:?}", - id1, - result1 - ); - assert_eq!(route_id1, route1.into()); - let result2 = routes.get(route2.clone()); - assert_eq!(result2.len(), 1); - assert!( - result2.contains(&id2), - "{:?} not found in {:?}", - id2, - result2 - ); - assert_eq!(route_id2, route2.into()); - } - - #[test] - fn returns_empty_set_when_nothing_is_available() { - let routes = RouteStorage::new(); - let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - let result = routes.get(route); - assert_eq!(result.len(), 0); - } - - #[test] - fn returns_all_entries_using_the_same_route() { - let count = 5; - let mut routes = RouteStorage::new(); - let mut ids: HashSet = HashSet::new(); - while ids.len() < count { - ids.insert(Uuid::new_v4()); - } - let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - for id in ids.iter() { - routes.add(route.clone(), id.clone()); - } - let result = routes.get(route); - assert_eq!(result, ids); - } - - #[test] - fn routes_are_not_duplicated() { - let count = 5; - let mut routes = RouteStorage::new(); - let id = Uuid::new_v4(); - let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - for _ in 0..count { - routes.add(route.clone(), id.clone()); - } - let result = routes.get(route); - assert_eq!(result.len(), 1); - assert!(result.contains(&id), "{:?} not found in {:?}", id, result); - } - - #[test] - fn overlapping_routes_are_combined() { - let mut routes = RouteStorage::new(); - let id1 = Uuid::new_v4(); - let id2 = Uuid::new_v4(); - let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - routes.add(route1.clone(), id1.clone()); - routes.add(route2.clone(), id2.clone()); - let retrieve = Route::new(Include::All, Include::All, Include::All); - let result = routes.get(retrieve); - assert_eq!(result.len(), 2); - assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result); - assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result); - } - - #[test] - fn can_remove_sender_id() { - let mut routes = RouteStorage::new(); - let count = 5; - let mut ids: HashSet = HashSet::new(); - while ids.len() < count { - ids.insert(Uuid::new_v4()); - } - let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - for id in ids.iter() { - routes.add(route.clone(), id.clone()); - } - let removed = ids.iter().last().unwrap().clone(); - ids.remove(&removed); - routes.remove_sender_id(&removed); - let result = routes.get(route); - assert_eq!(result, ids); - } - - #[test] - fn empty_routes_are_release_memory() { - let mut routes = RouteStorage::new(); - let id = Uuid::new_v4(); - let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); - routes.add(route.clone(), id.clone()); - routes.remove_sender_id(&id); - assert_eq!(routes.data.len(), 0); - } -} - -pub struct DocRegistry { - doc_names: Names, - queue: Queue, - receiver: Receiver, - routes: RouteStorage, -} - -impl DocRegistry { - fn new(queue: Queue, rx: Receiver) -> Self { - Self { - doc_names: Names::new(), - queue: queue, - receiver: rx, - routes: RouteStorage::new(), - } - } - - pub fn start(queue: Queue, rx: Receiver) { - let mut doc_names = DocRegistry::new(queue, rx); - spawn(move || { - doc_names.listen(); - }); - } - - fn listen(&mut self) { - loop { - let mut msg = self.receiver.recv().unwrap(); - match msg.get_action() { - MsgAction::Register(data) => { - let id = data.get_sender_id(); - let reply = msg.response(self.register_action(data)); - self.queue.forward(id, reply); - } - _ => match self.doc_names.get_id(msg.get_document_id()) { - Ok(doc_id) => { - msg.reset_name_id(doc_id); - let route: Route = msg.get_path().try_into().unwrap(); - for sender_id in self.routes.get(route).iter() { - self.queue.forward(sender_id, msg.clone()); - } - } - Err(err) => self - .queue - .send(msg.response(MsgAction::Error(err))) - .unwrap(), - }, - } - } - } - - fn register_action(&mut self, reg: &Register) -> Register { - match reg.get_msg() { - RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) { - Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), - Err(err) => reg.response(RegMsg::Error(err)), - }, - RegMsg::AddRoute(path) => { - let route = self.doc_names.path_to_route(path).unwrap(); - reg.response(RegMsg::RouteID( - self.routes.add(route, reg.get_sender_id().clone()), - )) - } - RegMsg::GetNameID(name) => match self.doc_names.get_id(name) { - Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), - Err(err) => reg.response(RegMsg::Error(err)), - }, - RegMsg::RemoveSender(sender_id) => { - self.routes.remove_sender_id(sender_id); - reg.response(RegMsg::Ok) - } - _ => reg.response(RegMsg::Ok), - } - } -} - -/* -struct Router { - doc_registry: Sender, - senders: HashMap>, -} - -impl Router { - fn new(tx: Sender) -> Self { - Self { - doc_registry: tx, - senders: HashMap::new(), - } - } - - fn add_sender(&mut self, sender: Sender) -> Uuid { - let mut id = Uuid::new_v4(); - while self.senders.contains_key(&id) { - id = Uuid::new_v4(); - } - self.senders.insert(id.clone(), sender); - id - } - - fn remove_sender(&mut self, id: &Uuid) { - let action = Register::new(Uuid::nil(), RegMsg::RemoveSender(id.clone())); - self.doc_registry - .send(Message::new(NameType::None, action)) - .unwrap(); - self.senders.remove(id); - } - - fn forward(&self, id: &Uuid, msg: Message) { - if id == &Uuid::nil() { - return; - } - self.senders.get(id).unwrap().send(msg).unwrap(); - } - - fn send(&self, msg: Message) { - self.doc_registry.send(msg).unwrap(); - } -} - -#[cfg(test)] -mod routers { - use super::*; - use crate::support_tests::TIMEOUT; - - #[test] - fn can_pass_message() { - let (tx, rx) = channel(); - let router = Router::new(tx); - let msg = Message::new(Name::english("task"), Query::new()); - router.send(msg.clone()); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - } - - #[test] - fn can_forward_message() { - let (tx, _) = channel(); - let mut router = Router::new(tx); - let (sender, receiver) = channel(); - let id = router.add_sender(sender); - let msg = Message::new(Name::english("wiki"), Query::new()); - router.forward(&id, msg.clone()); - let result = receiver.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - } - - #[test] - fn sender_ids_are_unique() { - let (tx, _) = channel(); - let mut router = Router::new(tx); - let count = 10; - let mut holder: HashSet = HashSet::new(); - for _ in 0..count { - let (tx, _) = channel(); - holder.insert(router.add_sender(tx)); - } - assert_eq!(holder.len(), count, "had duplicate keys"); - } - - #[test] - fn can_remove_sender() { - let (tx, rx) = channel(); - let mut router = Router::new(tx); - let (data, _) = channel(); - let id = router.add_sender(data); - assert_eq!(router.senders.len(), 1, "should have only one sender"); - router.remove_sender(&id); - assert_eq!(router.senders.len(), 0, "should have no senders."); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Register(reg_msg) => { - let reg_action = reg_msg.get_msg(); - match reg_action { - RegMsg::RemoveSender(result) => assert_eq!(result, &id), - _ => unreachable!("got {:?}, should have been remove sender", reg_action), - } - } - _ => unreachable!("got {:?}, should have been registry message", action), - } - } - - #[test] - fn ignores_bad_id_removals() { - let (tx, rx) = channel(); - let mut router = Router::new(tx); - router.remove_sender(&Uuid::new_v4()); - assert_eq!(router.senders.len(), 0, "should have no senders."); - rx.recv_timeout(TIMEOUT).unwrap(); - } -} - -#[derive(Clone)] -pub struct Queue { - router: Arc>, -} - -impl Queue { - pub fn new() -> Self { - let (tx, rx) = channel(); - let output = Self { - router: Arc::new(RwLock::new(Router::new(tx))), - }; - DocRegistry::start(output.clone(), rx); - output - } - - pub fn add_sender(&mut self, sender: Sender) -> Uuid { - let mut router = self.router.write().unwrap(); - router.add_sender(sender) - } - - pub fn remove_sender(&mut self, id: &Uuid) { - let mut router = self.router.write().unwrap(); - router.remove_sender(id); - } - - fn forward(&self, id: &Uuid, msg: Message) { - let router = self.router.read().unwrap(); - router.forward(id, msg); - } - - pub fn send(&self, msg: Message) -> Result<(), MTTError> { - let router = self.router.read().unwrap(); - router.send(msg.clone()); - Ok(()) - } -} - -#[cfg(test)] -mod queues { - use super::*; - use crate::support_tests::TIMEOUT; - use std::sync::mpsc::RecvTimeoutError; - - struct TestQueue { - sender_id: Uuid, - queue: Queue, - receiver: Receiver, - doc_names: HashMap, - doc_tx_id: HashMap, - doc_rx: HashMap>, - } - - impl TestQueue { - fn new() -> Self { - let mut queue = Queue::new(); - let (tx, rx) = channel(); - let id = queue.add_sender(tx); - Self { - sender_id: id, - queue: queue, - receiver: rx, - doc_names: HashMap::new(), - doc_tx_id: HashMap::new(), - doc_rx: HashMap::new(), - } - } - - fn add_document(&mut self, name: Name) { - let (tx, rx) = channel(); - let id = self.queue.add_sender(tx); - let reg_msg = Register::new(id.clone(), RegMsg::AddDocName([name.clone()].to_vec())); - let msg = Message::new(NameType::None, reg_msg); - self.queue.send(msg.clone()).unwrap(); - match rx.recv_timeout(TIMEOUT).unwrap().get_action() { - MsgAction::Register(doc_data) => match doc_data.get_msg() { - RegMsg::DocumentNameID(data) => { - self.doc_names.insert(name.clone(), data.clone()) - } - _ => panic!("should not get here"), - }, - _ => panic!("should not get here"), - }; - self.doc_tx_id.insert(name.clone(), id); - self.doc_rx.insert(name.clone(), rx); - } - - fn get_preset_id(&self) -> &Uuid { - &self.sender_id - } - - fn get_preset_rx(&self) -> &Receiver { - &self.receiver - } - - fn get_doc_id(&self, name: &Name) -> &Uuid { - self.doc_names.get(name).unwrap() - } - - fn get_doc_rx_id(&self, name: &Name) -> &Uuid { - self.doc_tx_id.get(name).unwrap() - } - - fn get_doc_rx(&self, name: &Name) -> &Receiver { - self.doc_rx.get(name).unwrap() - } - - fn get_queue(&self) -> Queue { - self.queue.clone() - } - } - - #[test] - fn can_forward_message() { - let tester = TestQueue::new(); - let queue = tester.get_queue(); - let msg = Message::new(Name::english("wiki"), Query::new()); - queue.forward(tester.get_preset_id(), msg.clone()); - let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - } - - #[test] - fn sender_ids_are_unique() { - let mut queue = Queue::new(); - let count = 10; - let mut holder: HashSet = HashSet::new(); - for _ in 0..count { - let (tx, _) = channel(); - holder.insert(queue.add_sender(tx)); - } - assert_eq!(holder.len(), count, "had duplicate keys"); - let router = queue.router.read().unwrap(); - assert_eq!( - router.senders.len(), - count, - "should contain all of the senders" - ); - } - - #[test] - fn senders_can_be_removed() { - let mut queue = Queue::new(); - let (tx, _) = channel(); - let id = queue.add_sender(tx); - queue.remove_sender(&id); - let router = queue.router.read().unwrap(); - assert_eq!(router.senders.len(), 0, "should contain no senders"); - } - - #[test] - fn document_names_have_unique_id() { - let tester = TestQueue::new(); - let queue = tester.get_queue(); - let names = [ - Name::english("one"), - Name::english("two"), - Name::english("three"), - Name::english("four"), - Name::english("five"), - ]; - let mut ids: Vec = Vec::new(); - for name in names.iter() { - let reg_msg = Register::new( - tester.get_preset_id().clone(), - RegMsg::AddDocName([name.clone()].to_vec()), - ); - let msg = Message::new(NameType::None, reg_msg); - queue.send(msg.clone()).unwrap(); - let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - let action = result.get_action(); - match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(data) => { - assert!(!ids.contains(data), "{} already in {:?}", data, ids); - ids.push(data.clone()); - } - _ => unreachable!("got {:?}, should have been register ok", action), - }, - _ => unreachable!("got {:?}, should have been register ok", action), - } - } - } - - #[test] - fn does_name_id_get_updated() { - let tester = TestQueue::new(); - let queue = tester.get_queue(); - let doc_name = Name::english("test"); - let reg_msg = Register::new( - tester.get_preset_id().clone(), - RegMsg::AddDocName([doc_name.clone()].to_vec()), - ); - let msg = Message::new(NameType::None, reg_msg); - queue.send(msg.clone()).unwrap(); - let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - let id = match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(data) => data.clone(), - _ => unreachable!("got {:?}, should have been register ok", action), - }, - _ => unreachable!("got {:?}, should have been register ok", action), - }; - let reg_msg = Register::new( - tester.get_preset_id().clone(), - RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), - ); - let msg = Message::new(NameType::None, reg_msg); - queue.send(msg.clone()).unwrap(); - tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); - let msg = Message::new(doc_name.clone(), Query::new()); - queue.send(msg.clone()).unwrap(); - let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - let name_id: NameType = id.into(); - assert_eq!(result.get_document_id(), &name_id); - } - - #[test] - fn can_register_multiple_names_at_once() { - let tester = TestQueue::new(); - let queue = tester.get_queue(); - let names = [Name::english("one"), Name::japanese("δΈ€")].to_vec(); - let reg_msg = Register::new( - tester.get_preset_id().clone(), - RegMsg::AddDocName(names.clone()), - ); - let msg = Message::new(NameType::None, reg_msg); - queue.send(msg.clone()).unwrap(); - let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - let id = match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(data) => data, - _ => unreachable!("got {:?}, should have returned id", data), - }, - _ => unreachable!("got {:?}, should have returned id", action), - }; - for name in names.iter() { - let reg_msg = Register::new( - tester.get_preset_id().clone(), - RegMsg::GetNameID(name.clone()), - ); - let msg = Message::new(NameType::None, reg_msg); - queue.send(msg.clone()).unwrap(); - let result = tester.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - let result = match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(data) => data, - _ => unreachable!("got {:?}, should have returned id", data), - }, - _ => unreachable!("got {:?}, should have returned id", action), - }; - assert_eq!(result, id); - } - } - - #[test] - fn errors_on_duplicate_names() { - let tester = TestQueue::new(); - let queue = tester.get_queue(); - let receiver = tester.get_preset_rx(); - let doc_name = Name::english(Uuid::new_v4().to_string().as_str()); - let reg_msg = Register::new( - tester.get_preset_id().clone(), - RegMsg::AddDocName([doc_name.clone()].to_vec()), - ); - let msg = Message::new(NameType::None, reg_msg.clone()); - queue.send(msg.clone()).unwrap(); - receiver.recv_timeout(TIMEOUT).unwrap(); - let msg2 = Message::new(NameType::None, reg_msg.clone()); - queue.send(msg2.clone()).unwrap(); - let result = receiver.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg2.get_message_id()); - let action = result.get_action(); - match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::Error(err) => match err { - MTTError::NameDuplicate(name) => { - assert_eq!(name.to_string(), doc_name.to_string()) - } - _ => unreachable!("got {:?}, should have been duplicate error", err), - }, - _ => unreachable!("got {:?}, should have been error", data), - }, - _ => unreachable!("got {:?}, should have been register ok", action), - } - } - - #[test] - fn can_register_routes() { - let mut tester = TestQueue::new(); - let queue = tester.get_queue(); - let names = [Name::english("task"), Name::english("recipe")]; - for name in names.iter() { - tester.add_document(name.clone()); - } - let route_req = Path::new(Include::All, Include::All, Include::All); - let reg_msg = RegMsg::AddRoute(route_req); - let reg = Register::new(tester.get_doc_rx_id(&names[0]).clone(), reg_msg); - let msg = Message::new(NameType::None, reg); - queue.send(msg).unwrap(); - tester.get_doc_rx(&names[0]).recv_timeout(TIMEOUT).unwrap(); - let msg = Message::new(NameType::None, Query::new()); - queue.send(msg.clone()).unwrap(); - let result = tester.get_doc_rx(&names[0]).recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - match tester.get_doc_rx(&names[1]).recv_timeout(TIMEOUT) { - Ok(msg) => unreachable!("should not receive: {:?}", msg), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("should have timed out"), - }, - } - } - - #[test] - fn can_multiple_register_for_the_same_route() { - let mut tester = TestQueue::new(); - let queue = tester.get_queue(); - let names = [Name::english("task"), Name::english("recipe")]; - let route_req = Path::new(Include::All, Include::All, Include::All); - let reg_msg = RegMsg::AddRoute(route_req); - for name in names.iter() { - tester.add_document(name.clone()); - let reg = Register::new(tester.get_doc_rx_id(name).clone(), reg_msg.clone()); - let msg = Message::new(NameType::None, reg); - queue.send(msg).unwrap(); - tester.get_doc_rx(name).recv_timeout(TIMEOUT).unwrap(); - } - let msg = Message::new(NameType::None, Query::new()); - queue.send(msg.clone()).unwrap(); - for name in names.iter() { - let result = tester.get_doc_rx(name).recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - } - } - - #[test] - fn does_receiver_only_receives_the_message_once() { - let mut tester = TestQueue::new(); - let queue = tester.get_queue(); - let name = Name::english("something"); - tester.add_document(name.clone()); - let paths = [ - Path::new(Include::All, Include::All, Include::All), - Path::new( - Include::All, - Include::Just(name.clone().into()), - Include::All, - ), - ]; - for path in paths.iter() { - let reg_msg = RegMsg::AddRoute(path.clone()); - let reg = Register::new(tester.get_doc_rx_id(&name).clone(), reg_msg); - let msg = Message::new(NameType::None, reg); - queue.send(msg).unwrap(); - tester.get_doc_rx(&name).recv_timeout(TIMEOUT).unwrap(); - } - let msg = Message::new(name.clone(), Query::new()); - queue.send(msg.clone()).unwrap(); - let result = tester.get_doc_rx(&name).recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - match tester.get_doc_rx(&name).recv_timeout(TIMEOUT) { - Ok(msg) => unreachable!("should not receive: {:?}", msg), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("should have timed out"), - }, - } - } - - #[test] - fn can_routing_be_based_on_message_id() { - let mut tester = TestQueue::new(); - let queue = tester.get_queue(); - let names = [Name::english("one"), Name::english("two")]; - let mut inputs: HashMap = HashMap::new(); - for name in names.iter() { - tester.add_document(name.clone()); - let input = Message::new(name.clone(), Query::new()); - let path = Path::new( - Include::Just(input.get_message_id().clone()), - Include::All, - Include::All, - ); - let reg_msg = RegMsg::AddRoute(path); - let reg = Register::new(tester.get_doc_rx_id(&name).clone(), reg_msg); - let msg = Message::new(NameType::None, reg); - queue.send(msg).unwrap(); - let result = tester.get_doc_rx(&name).recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::RouteID(output) => { - let expected = RouteID { - action: None, - doc_type: None, - msg_id: Some(input.get_message_id().clone()), - }; - assert_eq!(output, &expected); - } - _ => unreachable!("got {:?}, should have been route id", data), - }, - _ => unreachable!("got {:?}, should have been route id", action), - } - inputs.insert(name.clone(), input); - } - for msg in inputs.values() { - queue.send(msg.clone()).unwrap(); - } - for (name, msg) in inputs.iter() { - let rx = tester.get_doc_rx(&name); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - match rx.recv_timeout(TIMEOUT) { - Ok(msg) => unreachable!("should not receive: {:?}", msg), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("should have timed out"), - }, - } - } - } - - #[test] - fn can_routing_be_based_on_document_name() { - let mut tester = TestQueue::new(); - let queue = tester.get_queue(); - let names = [Name::english("one"), Name::english("two")]; - let mut inputs: HashMap = HashMap::new(); - for name in names.iter() { - tester.add_document(name.clone()); - let input = Message::new(name.clone(), Query::new()); - let path = Path::new( - Include::All, - Include::Just(name.clone().into()), - Include::All, - ); - let reg_msg = RegMsg::AddRoute(path); - let reg = Register::new(tester.get_doc_rx_id(&name).clone(), reg_msg); - let msg = Message::new(NameType::None, reg); - queue.send(msg).unwrap(); - let result = tester.get_doc_rx(&name).recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::RouteID(output) => { - let expected = RouteID { - action: None, - doc_type: Some(tester.get_doc_id(name).clone()), - msg_id: None, - }; - assert_eq!(output, &expected); - } - _ => unreachable!("got {:?}, should have been route id", data), - }, - _ => unreachable!("got {:?}, should have been route id", action), - } - inputs.insert(name.clone(), input); - } - for msg in inputs.values() { - queue.send(msg.clone()).unwrap(); - } - for (name, msg) in inputs.iter() { - let rx = tester.get_doc_rx(&name); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - match rx.recv_timeout(TIMEOUT) { - Ok(msg) => unreachable!("should not receive: {:?}", msg), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("should have timed out"), - }, - } - } - } - - #[test] - fn can_routing_be_based_on_action() { - let mut tester = TestQueue::new(); - let queue = tester.get_queue(); - let names = [Name::english("one"), Name::english("two")]; - let paths = [ - Path::new(Include::All, Include::All, Include::Just(Action::Reply)), - Path::new(Include::All, Include::All, Include::Just(Action::Error)), - ]; - let actions = [ - MsgAction::Reply(Reply::new()), - MsgAction::Error(MTTError::NameDuplicate(names[0].clone())), - ]; - let mut inputs: HashMap = HashMap::new(); - let mut count = 0; - for name in names.iter() { - tester.add_document(name.clone()); - let input = Message::new(NameType::None, actions[count].clone()); - let path = paths[count].clone(); - let reg_msg = RegMsg::AddRoute(path); - let reg = Register::new(tester.get_doc_rx_id(&name).clone(), reg_msg); - let msg = Message::new(NameType::None, reg); - queue.send(msg).unwrap(); - let result = tester.get_doc_rx(&name).recv_timeout(TIMEOUT).unwrap(); - let action = result.get_action(); - match action { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::RouteID(output) => { - let expected = RouteID { - action: Some(actions[count].clone().into()), - doc_type: None, - msg_id: None, - }; - assert_eq!(output, &expected); - } - _ => unreachable!("got {:?}, should have been route id", data), - }, - _ => unreachable!("got {:?}, should have been route id", action), - } - inputs.insert(name.clone(), input); - count += 1; - } - for msg in inputs.values() { - queue.send(msg.clone()).unwrap(); - } - for (name, msg) in inputs.iter() { - let rx = tester.get_doc_rx(&name); - let result = rx.recv_timeout(TIMEOUT).unwrap(); - assert_eq!(result.get_message_id(), msg.get_message_id()); - match rx.recv_timeout(TIMEOUT) { - Ok(msg) => unreachable!("should not receive: {:?}", msg), - Err(err) => match err { - RecvTimeoutError::Timeout => {} - _ => unreachable!("should have timed out"), - }, - } - } - } - - #[test] - fn does_removing_sender_id_remove_from_document_registry() { - let mut tester = TestQueue::new(); - let mut queue = tester.get_queue(); - let (tx, rx) = channel(); - let sender_id = queue.add_sender(tx); - let name = Name::english("testing"); - tester.add_document(name.clone()); - let path = Path::new( - Include::All, - Include::Just(name.clone().into()), - Include::All, - ); - let reg_msg = RegMsg::AddRoute(path); - let reg = Register::new(sender_id.clone(), reg_msg); - let msg = Message::new(NameType::None, reg); - queue.send(msg).unwrap(); - rx.recv_timeout(TIMEOUT).unwrap(); - queue.remove_sender(&sender_id); - let msg = Message::new(name, Query::new()); - queue.send(msg).unwrap(); - match rx.recv_timeout(TIMEOUT) { - Err(err) => match err { - RecvTimeoutError::Disconnected => {} - _ => unreachable!("got {:?}, should have been disconnected", err), - }, - Ok(data) => unreachable!("got {:?}, should have been an error", data), - } - } -} -*/ - pub struct CreateDoc { queue: Queue, rx: Receiver, @@ -3592,12 +2415,6 @@ impl InternalRecord { self.data.get(id) } - /* - fn keys(&self) -> impl Iterator { - self.data.keys() - } - */ - fn is_empty(&self) -> bool { self.data.is_empty() } @@ -3643,12 +2460,6 @@ impl InternalRecords { self.data.contains_key(oid) } - /* - fn is_empty(&self) -> bool { - self.data.is_empty() - } - */ - fn len(&self) -> usize { self.data.len() } @@ -3788,36 +2599,8 @@ impl Document { fn iter(&self) -> impl Iterator { self.data.iter() } - - /* - fn is_empty(&self) -> bool { - self.data.is_empty() - } - */ } -/* -struct DocIter { - storage: Vec<(NameType, Field)>, -} - -impl DocIter { - fn new(doc: &Document) -> Self { - Self { - storage: doc.get_all(), - } - } -} - -impl Iterator for DocIter { - type Item = (NameType, Field); - - fn next(&mut self) -> Option { - self.storage.pop() - } -} -*/ - #[cfg(test)] mod documents { use super::*; @@ -4060,22 +2843,6 @@ impl Indexes { self.data.keys().collect::>() } - /* - fn get_index(&self, field_id: &Uuid) -> &Index { - self.data.get(field_id).unwrap() - } - - fn get_index_mut(&mut self, field_id: &Uuid) -> &mut Index { - self.data.get_mut(field_id).unwrap() - } - - fn append(&mut self, indexes: &Self) { - for (field_id, index) in indexes.iter() { - self.data.get_mut(field_id).unwrap().append(index); - } - } - */ - fn pull(&self, field_id: &Uuid, calc: &Calculation) -> Result, MTTError> { self.data.get(field_id).unwrap().pull(calc) } @@ -4108,12 +2875,6 @@ impl Indexes { Ok(()) } - /* - fn iter(&self) -> impl Iterator { - self.data.iter() - } - */ - fn iter_mut(&mut self) -> impl Iterator { self.data.iter_mut() } @@ -4333,7 +3094,7 @@ impl DocumentFile { fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); - let route: Route = msg.get_path().try_into().unwrap(); + let route = msg.get_route(); for (route_id, doc_func) in self.routes.clone().iter() { if route == route_id.into() { match doc_func { @@ -4350,12 +3111,6 @@ impl DocumentFile { } } - /* - fn get_docdef(&self) -> &DocDef { - &self.docdef - } - */ - fn validate(&self, field_name: NT, value: &Field) -> Result where NT: Into, @@ -4375,31 +3130,6 @@ impl DocumentFile { Ok(output) } - /* - fn add_field_to_error(key: String, err: MTTError) -> MTTError { - match err { - MTTError::DocumentFieldMissing(_) => MTTError::DocumentFieldMissing(key), - _ => err.clone(), - } - } - - fn add_to_index(&mut self, field_name: NT, field: Field, oid: Oid) - where - NT: Into, - { - let field_id = self.docdef.get_field_id(field_name).unwrap(); - self.indexes.add_to_index(&field_id, field, oid).unwrap(); - } - - fn remove_from_index(&mut self, field_name: NT, field: &Field, oid: &Oid) - where - NT: Into, - { - let field_id = self.docdef.get_field_id(field_name).unwrap(); - self.indexes.remove_from_index(&field_id, field, oid); - } - */ - fn add_document(&mut self, msg: &Message) { let addition = match msg.get_action() { MsgAction::Addition(data) => data, @@ -4702,21 +3432,6 @@ mod document_files { &self.rx } - fn get_name_id(&self) -> Uuid { - let reg_request = RegMsg::GetNameID(self.docdef.get_document_names()[0].clone()); - let reg_msg = Register::new(self.get_sender_id(), reg_request); - let msg = Message::new(NameType::None, reg_msg); - self.queue.send(msg).unwrap(); - let result = self.rx.recv().unwrap(); - match result.get_action() { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(output) => output.clone(), - _ => unreachable!("should return a name id"), - }, - _ => unreachable!("should return a name id"), - } - } - fn get_sender_id(&self) -> Uuid { self.sender_id.clone() } @@ -4901,7 +3616,6 @@ mod document_files { )] .to_vec(); test_doc.start(routes); - let name_id: NameType = test_doc.get_name_id().into(); for item in data.iter() { test_doc.populate([item.clone()].to_vec()); } @@ -4913,11 +3627,6 @@ mod document_files { msg.get_message_id(), "message ids should match" ); - assert_eq!( - result.get_document_id(), - &name_id, - "document name ids should match" - ); match result.get_action() { MsgAction::OnQuery(output) => { assert_eq!( @@ -4947,7 +3656,6 @@ mod document_files { Include::Just(Action::OnAddition), )]; test_doc.start(routes); - let name_id: NameType = test_doc.get_name_id().into(); let mut add = Addition::new(); add.add_field(field_name.clone(), data.clone()); let msg = Message::new(doc_name.clone(), add); @@ -4958,11 +3666,6 @@ mod document_files { msg.get_message_id(), "message ids should match" ); - assert_eq!( - result.get_document_id(), - &name_id, - "document name ids should match" - ); match result.get_action() { MsgAction::OnAddition(output) => { assert_eq!(output.len(), 1, "wrong number of entries: got {:?}", output); @@ -4992,7 +3695,6 @@ mod document_files { )] .to_vec(); test_doc.start(routes); - let name_id: NameType = test_doc.get_name_id().into(); for item in data.iter() { test_doc.populate([item.clone()].to_vec()); } @@ -5004,11 +3706,6 @@ mod document_files { msg.get_message_id(), "message ids should match" ); - assert_eq!( - result.get_document_id(), - &name_id, - "document name ids should match" - ); match result.get_action() { MsgAction::OnDelete(output) => { assert_eq!( @@ -5044,7 +3741,6 @@ mod document_files { )] .to_vec(); test_doc.start(routes); - let name_id: NameType = test_doc.get_name_id().into(); for item in data.iter() { test_doc.populate([item.clone()].to_vec()); } @@ -5060,11 +3756,6 @@ mod document_files { msg.get_message_id(), "message ids should match" ); - assert_eq!( - result.get_document_id(), - &name_id, - "document name ids should match" - ); match result.get_action() { MsgAction::OnUpdate(output) => { assert_eq!( @@ -6354,21 +5045,6 @@ mod createdocs { &self.rx } - fn get_document_id(&self, name: &Name) -> Uuid { - let reg_request = Register::new(self.rx_id.clone(), RegMsg::GetNameID(name.clone())); - self.queue - .send(Message::new(NameType::None, reg_request)) - .unwrap(); - let info = self.rx.recv_timeout(TIMEOUT).unwrap(); - match info.get_action() { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(ident) => ident.clone(), - _ => unreachable!("should not get here"), - }, - _ => unreachable!("should not get here"), - } - } - fn register_paths(&self, paths: Vec) { for path in paths.iter() { let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone())); @@ -6405,12 +5081,10 @@ mod createdocs { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } - let doc_id: NameType = doc_creator.get_document_id(&name).into(); let msg2 = Message::new(name, Query::new()); queue.send(msg2.clone()).unwrap(); let result2 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result2.get_message_id(), msg2.get_message_id()); - assert_eq!(result2.get_document_id(), &doc_id); match result2.get_action() { MsgAction::Records(data) => assert_eq!(data.len(), 0), _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), @@ -6444,15 +5118,6 @@ mod createdocs { }, _ => unreachable!("got {:?}: should have been a reply.", result.get_action()), } - /* - let router = queue.router.read().unwrap(); - assert_eq!( - router.senders.len(), - 3, - "there should only be 3 registered senders: createdoc, testing rx, and {:?}", - name - ); - */ } } @@ -6522,20 +5187,8 @@ mod clocks { queue .send(Message::new(NameType::None, reg_request)) .unwrap(); - let info = rx.recv_timeout(TIMEOUT).unwrap(); - let doc_id = match info.get_action() { - MsgAction::Register(data) => match data.get_msg() { - RegMsg::DocumentNameID(ident) => ident.clone(), - _ => unreachable!("should not get here"), - }, - _ => unreachable!("should not get here"), - }; + rx.recv_timeout(TIMEOUT).unwrap(); for msg in holder.iter() { - let name_id = msg.get_document_id(); - match name_id { - NameType::ID(data) => assert_eq!(data, &doc_id), - _ => unreachable!("got {:?}, should have been clock", name_id), - } let action = msg.get_action(); match action { MsgAction::OnUpdate(result) => assert_eq!(result.len(), 0), diff --git a/src/name.rs b/src/name.rs index 3774499..2e52c72 100644 --- a/src/name.rs +++ b/src/name.rs @@ -1,8 +1,4 @@ -use crate::{ - data_director::{Include, Path}, - message::Route, - mtterror::MTTError, -}; +use crate::mtterror::MTTError; use isolang::Language; use std::collections::HashMap; use uuid::Uuid; @@ -142,30 +138,6 @@ impl Names { NameType::None => Ok(Uuid::nil()), } } - - pub fn path_to_route(&self, path: &Path) -> Result { - let doc_id = match &path.doc { - Include::Just(id_info) => match id_info { - NameType::ID(id) => { - if self.ids.contains_key(&id) { - Include::Just(id.clone()) - } else { - return Err(MTTError::NameInvalidID(id.clone())); - } - } - NameType::Name(name) => { - let id = match self.get_id(name) { - Ok(data) => data, - Err(err) => return Err(err), - }; - Include::Just(id.clone()) - } - NameType::None => Include::Just(Uuid::nil()), - }, - Include::All => Include::All, - }; - Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone())) - } } #[cfg(test)] diff --git a/src/router.rs b/src/router.rs index 06df870..9eda15d 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,5 +1,6 @@ use crate::{ - message::{DocRegistry, Message, RegMsg, Register}, + data_director::{DocRegistry, RegMsg, Register}, + message::Message, mtterror::MTTError, name::NameType, };